From d4a9c99f79066f9d6c361dc8f2c1682bff57bfc0 Mon Sep 17 00:00:00 2001 From: Xyphuz Date: Fri, 27 May 2022 17:29:07 +0800 Subject: [PATCH] feat: imeplement parser using compose-spec schema --- compose_viz/parser.py | 273 +++++++++++++++++------------------------- 1 file changed, 112 insertions(+), 161 deletions(-) diff --git a/compose_viz/parser.py b/compose_viz/parser.py index 96e2667..50627f3 100644 --- a/compose_viz/parser.py +++ b/compose_viz/parser.py @@ -1,8 +1,7 @@ import re -from typing import Dict, List, Optional - -from ruamel.yaml import YAML +from typing import List, Optional +import compose_viz.spec.compose_spec as spec from compose_viz.models.compose import Compose, Service from compose_viz.models.extends import Extends from compose_viz.models.port import Port, Protocol @@ -14,182 +13,124 @@ class Parser: pass def parse(self, file_path: str) -> Compose: - # load the yaml file - with open(file_path, "r") as f: - try: - yaml = YAML(typ="safe", pure=True) - yaml_data = yaml.load(f) - except Exception as e: - raise RuntimeError(f"Error parsing file '{file_path}': {e}") - # validate the yaml file - if not yaml_data: - raise RuntimeError("Empty yaml file, aborting.") - if not yaml_data.get("services"): - raise RuntimeError("No services found, aborting.") - - # parse services data into Service objects - services = self.parse_service_data(yaml_data["services"]) - - # create Compose object - compose = Compose(services) - - return compose - - def parse_service_data(self, services_yaml_data: Dict[str, dict]) -> List[Service]: + compose_data: spec.ComposeSpecification = spec.ComposeSpecification.parse_file(file_path) services: List[Service] = [] - for service, service_name in zip(services_yaml_data.values(), services_yaml_data.keys()): + + assert compose_data.services is not None, "No services found, aborting." + + for service_name, service_data in compose_data.services.items(): + service_name = str(service_name) service_image: Optional[str] = None - if service.get("build"): - if type(service["build"]) is str: - service_image = f"build from '{service['build']}'" - elif type(service["build"]) is dict: - if service["build"].get("context") and service["build"].get("dockerfile"): + if service_data.build is not None: + if type(service_data.build) is str: + service_image = f"build from '{service_data.build}'" + elif type(service_data.build) is spec.BuildItem: + if service_data.build.context is not None and service_data.build.dockerfile is not None: service_image = ( - f"build from '{service['build']['context']}' using '{service['build']['dockerfile']}'" + f"build from '{service_data.build.context}' using '{service_data.build.dockerfile}'" ) - elif service["build"].get("context"): - service_image = f"build from '{service['build']['context']}'" - if service.get("image"): - if service_image: - service_image += ", image: " + service["image"] + elif service_data.build.context is not None: + service_image = f"build from '{service_data.build.context}'" + if service_data.image is not None: + if service_image is not None: + service_image += ", image: " + service_data.image else: - service_image = service["image"] + service_image = service_data.image service_networks: List[str] = [] - if service.get("networks"): - if type(service["networks"]) is list: - service_networks = service["networks"] - elif type(service["networks"]) is dict: - service_networks = list(service["networks"].keys()) + if service_data.networks is not None: + if type(service_data.networks) is spec.ListOfStrings: + service_networks = service_data.networks.__root__ + elif type(service_data.networks) is dict: + service_networks = list(service_data.networks.keys()) service_extends: Optional[Extends] = None - if service.get("extends"): - assert type(service["extends"]) is dict, "Invalid extends format, aborting." - assert service["extends"]["service"], "Missing extends service, aborting." - extend_service_name = str(service["extends"]["service"]) - - extend_from_file: Optional[str] = None - if service["extends"].get("file"): - assert service["extends"]["file"], "Missing extends file, aborting." - extend_from_file = str(service["extends"]["file"]) - - service_extends = Extends(service_name=extend_service_name, from_file=extend_from_file) + if service_data.extends is not None: + if type(service_data.extends) is str: + service_extends = Extends(service_name=service_data.extends) + elif type(service_data.extends) is spec.Extend: + service_extends = Extends( + service_name=service_data.extends.service, from_file=service_data.extends.file + ) service_ports: List[Port] = [] - if service.get("ports"): - assert type(service["ports"]) is list - for port_data in service["ports"]: - if type(port_data) is dict: - # define a nested function to limit variable scope - def long_syntax(): - assert type(port_data) is dict - assert port_data["target"] + if service_data.ports is not None: + for port_data in service_data.ports: + host_ip: Optional[str] = None + host_port: Optional[str] = None + container_port: Optional[str] = None + protocol: Optional[str] = None - container_port: str = str(port_data["target"]) - host_port: str = "" - protocol: Protocol = Protocol.any + if type(port_data) is str: + regex = r"(?P\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:)?((?P\d+(\-\d+)?):)?((?P\d+(\-\d+)?))?(/(?P\w+))?" # noqa: E501 + match = re.match(regex, port_data) - if port_data.get("published"): - host_port = str(port_data["published"]) - else: + if match: + host_ip = match.group("host_ip") + host_port = match.group("host_port") + container_port = match.group("container_port") + protocol = match.group("protocol") + + assert container_port, "Invalid port format, aborting." + + if container_port and not host_port: host_port = container_port - if port_data.get("host_ip"): - host_ip = str(port_data["host_ip"]) - host_port = f"{host_ip}:{host_port}" + if host_ip: + host_port = f"{host_ip}{host_port}" else: host_port = f"0.0.0.0:{host_port}" + elif type(port_data) is spec.Port: + assert port_data.target is not None, "Invalid port format, aborting." - if port_data.get("protocol"): - protocol = Protocol[str(port_data["protocol"])] + if type(port_data.published) is int: + host_port = str(port_data.published) + elif type(port_data.published) is str: + host_port = port_data.published - assert host_port, "Error parsing port, aborting." + if type(port_data.target) is int: + container_port = str(port_data.target) + elif type(port_data.target) is str: + container_port = port_data.target - service_ports.append( - Port( - host_port=host_port, - container_port=container_port, - protocol=protocol, - ) - ) + host_ip = port_data.host_ip + protocol = port_data.protocol - long_syntax() - elif type(port_data) is str: - # ports that needs to parse using regex: - # - "3000" - # - "3000-3005" - # - "8000:8000" - # - "9090-9091:8080-8081" - # - "49100:22" - # - "127.0.0.1:8001:8001" - # - "127.0.0.1:5000-5010:5000-5010" - # - "6060:6060/udp" + if container_port and not host_port: + host_port = container_port - def short_syntax(): - assert type(port_data) is str - regex = r"(?P\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:)?((?P\d+(\-\d+)?):)?((?P\d+(\-\d+)?))?(/(?P\w+))?" # noqa: E501 - match = re.match(regex, port_data) - if match: - host_ip: Optional[str] = match.group("host_ip") - host_port: Optional[str] = match.group("host_port") - container_port: Optional[str] = match.group("container_port") - protocol: Optional[str] = match.group("protocol") + if host_ip: + host_port = f"{host_ip}{host_port}" + else: + host_port = f"0.0.0.0:{host_port}" - assert container_port, "Invalid port format, aborting." + assert host_port is not None, "Error while parsing port, aborting." + assert container_port is not None, "Error while parsing port, aborting." - if container_port and not host_port: - host_port = container_port + if protocol is None: + protocol = "any" - if host_ip: - host_port = f"{host_ip}{host_port}" - else: - host_port = f"0.0.0.0:{host_port}" - - assert host_port, "Error while parsing port, aborting." - - if protocol: - service_ports.append( - Port( - host_port=host_port, - container_port=container_port, - protocol=Protocol[protocol], - ) - ) - else: - service_ports.append( - Port( - host_port=host_port, - container_port=container_port, - ) - ) - - short_syntax() + service_ports.append( + Port( + host_port=host_port, + container_port=container_port, + protocol=Protocol[protocol], + ) + ) service_depends_on: List[str] = [] - if service.get("depends_on"): - if type(service["depends_on"]) is list: - for depends_on in service["depends_on"]: + if service_data.depends_on is not None: + if type(service_data.depends_on) is spec.ListOfStrings: + service_depends_on = service_data.depends_on.__root__ + elif type(service_data.depends_on) is dict: + for depends_on in service_data.depends_on.keys(): service_depends_on.append(str(depends_on)) - elif type(service["depends_on"]) is dict: - service_depends_on = list(service["depends_on"].keys()) service_volumes: List[Volume] = [] - if service.get("volumes"): - assert type(service["volumes"]) is list - for volume_data in service["volumes"]: - if type(volume_data) is dict: - assert volume_data["source"] and volume_data["target"], "Invalid volume input, aborting." - - volume_source: str = str(volume_data["source"]) - volume_target: str = str(volume_data["target"]) - volume_type: VolumeType = VolumeType.volume - - if volume_data.get("type"): - volume_type = VolumeType[str(volume_data["type"])] - - service_volumes.append(Volume(source=volume_source, target=volume_target, type=volume_type)) - elif type(volume_data) is str: + if service_data.volumes is not None: + for volume_data in service_data.volumes: + if type(volume_data) is str: assert ":" in volume_data, "Invalid volume input, aborting." spilt_data = volume_data.split(":") @@ -197,12 +138,31 @@ class Parser: service_volumes.append(Volume(source=spilt_data[0], target=spilt_data[1])) elif len(spilt_data) == 3: service_volumes.append( - Volume(source=spilt_data[0], target=spilt_data[1], access_mode=spilt_data[2]) + Volume( + source=spilt_data[0], + target=spilt_data[1], + access_mode=spilt_data[2], + ) ) + elif type(volume_data) is spec.ServiceVolume: + assert volume_data.target is not None, "Invalid volume input, aborting." + + if volume_data.source is None: + volume_data.source = volume_data.target + + assert volume_data.source is not None + + service_volumes.append( + Volume( + source=volume_data.source, + target=volume_data.target, + type=VolumeType[volume_data.type], + ) + ) service_links: List[str] = [] - if service.get("links"): - service_links = service["links"] + if service_data.links is not None: + service_links = service_data.links services.append( Service( @@ -216,14 +176,5 @@ class Parser: links=service_links, ) ) - # Service print debug - # print("--------------------") - # print("Service name: {}".format(service_name)) - # print("image: {}".format(service_image)) - # print("networks: {}".format(service_networks)) - # print("image: {}".format(service_image)) - # print("extends: {}".format(service_extends)) - # print("ports: {}".format(service_ports)) - # print("depends: {}".format(service_depends_on)) - return services + return Compose(services=services)