feat: imeplement parser using compose-spec schema

This commit is contained in:
Xyphuz 2022-05-27 17:29:07 +08:00
parent b2b5cd49ba
commit d4a9c99f79

View file

@ -1,8 +1,7 @@
import re
from typing import Dict, List, Optional
from ruamel.yaml import YAML
from typing import List, Optional
import compose_viz.spec.compose_spec as spec
from compose_viz.models.compose import Compose, Service
from compose_viz.models.extends import Extends
from compose_viz.models.port import Port, Protocol
@ -14,127 +13,64 @@ class Parser:
pass
def parse(self, file_path: str) -> Compose:
# load the yaml file
with open(file_path, "r") as f:
try:
yaml = YAML(typ="safe", pure=True)
yaml_data = yaml.load(f)
except Exception as e:
raise RuntimeError(f"Error parsing file '{file_path}': {e}")
# validate the yaml file
if not yaml_data:
raise RuntimeError("Empty yaml file, aborting.")
if not yaml_data.get("services"):
raise RuntimeError("No services found, aborting.")
# parse services data into Service objects
services = self.parse_service_data(yaml_data["services"])
# create Compose object
compose = Compose(services)
return compose
def parse_service_data(self, services_yaml_data: Dict[str, dict]) -> List[Service]:
compose_data: spec.ComposeSpecification = spec.ComposeSpecification.parse_file(file_path)
services: List[Service] = []
for service, service_name in zip(services_yaml_data.values(), services_yaml_data.keys()):
assert compose_data.services is not None, "No services found, aborting."
for service_name, service_data in compose_data.services.items():
service_name = str(service_name)
service_image: Optional[str] = None
if service.get("build"):
if type(service["build"]) is str:
service_image = f"build from '{service['build']}'"
elif type(service["build"]) is dict:
if service["build"].get("context") and service["build"].get("dockerfile"):
if service_data.build is not None:
if type(service_data.build) is str:
service_image = f"build from '{service_data.build}'"
elif type(service_data.build) is spec.BuildItem:
if service_data.build.context is not None and service_data.build.dockerfile is not None:
service_image = (
f"build from '{service['build']['context']}' using '{service['build']['dockerfile']}'"
f"build from '{service_data.build.context}' using '{service_data.build.dockerfile}'"
)
elif service["build"].get("context"):
service_image = f"build from '{service['build']['context']}'"
if service.get("image"):
if service_image:
service_image += ", image: " + service["image"]
elif service_data.build.context is not None:
service_image = f"build from '{service_data.build.context}'"
if service_data.image is not None:
if service_image is not None:
service_image += ", image: " + service_data.image
else:
service_image = service["image"]
service_image = service_data.image
service_networks: List[str] = []
if service.get("networks"):
if type(service["networks"]) is list:
service_networks = service["networks"]
elif type(service["networks"]) is dict:
service_networks = list(service["networks"].keys())
if service_data.networks is not None:
if type(service_data.networks) is spec.ListOfStrings:
service_networks = service_data.networks.__root__
elif type(service_data.networks) is dict:
service_networks = list(service_data.networks.keys())
service_extends: Optional[Extends] = None
if service.get("extends"):
assert type(service["extends"]) is dict, "Invalid extends format, aborting."
assert service["extends"]["service"], "Missing extends service, aborting."
extend_service_name = str(service["extends"]["service"])
extend_from_file: Optional[str] = None
if service["extends"].get("file"):
assert service["extends"]["file"], "Missing extends file, aborting."
extend_from_file = str(service["extends"]["file"])
service_extends = Extends(service_name=extend_service_name, from_file=extend_from_file)
if service_data.extends is not None:
if type(service_data.extends) is str:
service_extends = Extends(service_name=service_data.extends)
elif type(service_data.extends) is spec.Extend:
service_extends = Extends(
service_name=service_data.extends.service, from_file=service_data.extends.file
)
service_ports: List[Port] = []
if service.get("ports"):
assert type(service["ports"]) is list
for port_data in service["ports"]:
if type(port_data) is dict:
# define a nested function to limit variable scope
def long_syntax():
assert type(port_data) is dict
assert port_data["target"]
if service_data.ports is not None:
for port_data in service_data.ports:
host_ip: Optional[str] = None
host_port: Optional[str] = None
container_port: Optional[str] = None
protocol: Optional[str] = None
container_port: str = str(port_data["target"])
host_port: str = ""
protocol: Protocol = Protocol.any
if port_data.get("published"):
host_port = str(port_data["published"])
else:
host_port = container_port
if port_data.get("host_ip"):
host_ip = str(port_data["host_ip"])
host_port = f"{host_ip}:{host_port}"
else:
host_port = f"0.0.0.0:{host_port}"
if port_data.get("protocol"):
protocol = Protocol[str(port_data["protocol"])]
assert host_port, "Error parsing port, aborting."
service_ports.append(
Port(
host_port=host_port,
container_port=container_port,
protocol=protocol,
)
)
long_syntax()
elif type(port_data) is str:
# ports that needs to parse using regex:
# - "3000"
# - "3000-3005"
# - "8000:8000"
# - "9090-9091:8080-8081"
# - "49100:22"
# - "127.0.0.1:8001:8001"
# - "127.0.0.1:5000-5010:5000-5010"
# - "6060:6060/udp"
def short_syntax():
assert type(port_data) is str
if type(port_data) is str:
regex = r"(?P<host_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:)?((?P<host_port>\d+(\-\d+)?):)?((?P<container_port>\d+(\-\d+)?))?(/(?P<protocol>\w+))?" # noqa: E501
match = re.match(regex, port_data)
if match:
host_ip: Optional[str] = match.group("host_ip")
host_port: Optional[str] = match.group("host_port")
container_port: Optional[str] = match.group("container_port")
protocol: Optional[str] = match.group("protocol")
host_ip = match.group("host_ip")
host_port = match.group("host_port")
container_port = match.group("container_port")
protocol = match.group("protocol")
assert container_port, "Invalid port format, aborting."
@ -145,10 +81,36 @@ class Parser:
host_port = f"{host_ip}{host_port}"
else:
host_port = f"0.0.0.0:{host_port}"
elif type(port_data) is spec.Port:
assert port_data.target is not None, "Invalid port format, aborting."
assert host_port, "Error while parsing port, aborting."
if type(port_data.published) is int:
host_port = str(port_data.published)
elif type(port_data.published) is str:
host_port = port_data.published
if type(port_data.target) is int:
container_port = str(port_data.target)
elif type(port_data.target) is str:
container_port = port_data.target
host_ip = port_data.host_ip
protocol = port_data.protocol
if container_port and not host_port:
host_port = container_port
if host_ip:
host_port = f"{host_ip}{host_port}"
else:
host_port = f"0.0.0.0:{host_port}"
assert host_port is not None, "Error while parsing port, aborting."
assert container_port is not None, "Error while parsing port, aborting."
if protocol is None:
protocol = "any"
if protocol:
service_ports.append(
Port(
host_port=host_port,
@ -156,40 +118,19 @@ class Parser:
protocol=Protocol[protocol],
)
)
else:
service_ports.append(
Port(
host_port=host_port,
container_port=container_port,
)
)
short_syntax()
service_depends_on: List[str] = []
if service.get("depends_on"):
if type(service["depends_on"]) is list:
for depends_on in service["depends_on"]:
if service_data.depends_on is not None:
if type(service_data.depends_on) is spec.ListOfStrings:
service_depends_on = service_data.depends_on.__root__
elif type(service_data.depends_on) is dict:
for depends_on in service_data.depends_on.keys():
service_depends_on.append(str(depends_on))
elif type(service["depends_on"]) is dict:
service_depends_on = list(service["depends_on"].keys())
service_volumes: List[Volume] = []
if service.get("volumes"):
assert type(service["volumes"]) is list
for volume_data in service["volumes"]:
if type(volume_data) is dict:
assert volume_data["source"] and volume_data["target"], "Invalid volume input, aborting."
volume_source: str = str(volume_data["source"])
volume_target: str = str(volume_data["target"])
volume_type: VolumeType = VolumeType.volume
if volume_data.get("type"):
volume_type = VolumeType[str(volume_data["type"])]
service_volumes.append(Volume(source=volume_source, target=volume_target, type=volume_type))
elif type(volume_data) is str:
if service_data.volumes is not None:
for volume_data in service_data.volumes:
if type(volume_data) is str:
assert ":" in volume_data, "Invalid volume input, aborting."
spilt_data = volume_data.split(":")
@ -197,12 +138,31 @@ class Parser:
service_volumes.append(Volume(source=spilt_data[0], target=spilt_data[1]))
elif len(spilt_data) == 3:
service_volumes.append(
Volume(source=spilt_data[0], target=spilt_data[1], access_mode=spilt_data[2])
Volume(
source=spilt_data[0],
target=spilt_data[1],
access_mode=spilt_data[2],
)
)
elif type(volume_data) is spec.ServiceVolume:
assert volume_data.target is not None, "Invalid volume input, aborting."
if volume_data.source is None:
volume_data.source = volume_data.target
assert volume_data.source is not None
service_volumes.append(
Volume(
source=volume_data.source,
target=volume_data.target,
type=VolumeType[volume_data.type],
)
)
service_links: List[str] = []
if service.get("links"):
service_links = service["links"]
if service_data.links is not None:
service_links = service_data.links
services.append(
Service(
@ -216,14 +176,5 @@ class Parser:
links=service_links,
)
)
# Service print debug
# print("--------------------")
# print("Service name: {}".format(service_name))
# print("image: {}".format(service_image))
# print("networks: {}".format(service_networks))
# print("image: {}".format(service_image))
# print("extends: {}".format(service_extends))
# print("ports: {}".format(service_ports))
# print("depends: {}".format(service_depends_on))
return services
return Compose(services=services)