compose-viz/compose_viz/parser.py
2024-04-27 16:55:24 +08:00

271 lines
12 KiB
Python

import re
from typing import Any, Dict, List, Optional, Union
from pydantic import ValidationError
from pydantic_yaml import parse_yaml_raw_as
import compose_viz.spec.compose_spec as spec
from compose_viz.models.compose import Compose, Service
from compose_viz.models.device import Device
from compose_viz.models.extends import Extends
from compose_viz.models.port import Port, Protocol
from compose_viz.models.volume import Volume, VolumeType
class Parser:
def __init__(self):
pass
@staticmethod
def _unwrap_depends_on(data_depends_on: Union[spec.ListOfStrings, Dict[Any, spec.DependsOn], None]) -> List[str]:
service_depends_on = []
if type(data_depends_on) is spec.ListOfStrings:
service_depends_on = data_depends_on.root
elif type(data_depends_on) is dict:
for depends_on in data_depends_on.keys():
service_depends_on.append(str(depends_on))
return service_depends_on
@staticmethod
def compile_dependencies(service_name: str, services: Dict[Any, spec.Service], file_path: str) -> List[str]:
assert service_name in services, f"Service '{service_name}' not found in given compose file: '{file_path}'"
dependencies = []
for dependency in Parser._unwrap_depends_on(services[service_name].depends_on):
if dependency:
dependencies.append(dependency)
dependencies.extend(Parser.compile_dependencies(dependency, services, file_path))
return dependencies
def parse(self, file_path: str, root_service: Optional[str] = None) -> Compose:
compose_data: spec.ComposeSpecification
try:
with open(file_path, "r") as file:
file_content = file.read()
compose_data = parse_yaml_raw_as(spec.ComposeSpecification, file_content)
except ValidationError as e:
raise RuntimeError(f"Error parsing file '{file_path}': {e}")
services: List[Service] = []
assert compose_data.services is not None, "No services found, aborting."
root_dependencies: List[str] = []
if root_service:
root_dependencies = Parser.compile_dependencies(root_service, compose_data.services, file_path)
root_dependencies.append(root_service)
root_dependencies = list(set(root_dependencies))
for service_name, service_data in compose_data.services.items():
service_name = str(service_name)
if root_service and service_name not in root_dependencies:
continue
service_image: Optional[str] = None
if service_data.build is not None:
if type(service_data.build) is str:
service_image = f"build from '{service_data.build}'"
elif type(service_data.build) is spec.Build:
if service_data.build.context is not None and service_data.build.dockerfile is not None:
service_image = (
f"build from '{service_data.build.context}' using '{service_data.build.dockerfile}'"
)
elif service_data.build.context is not None:
service_image = f"build from '{service_data.build.context}'"
if service_data.image is not None:
if service_image is not None:
service_image += ", image: " + service_data.image
else:
service_image = service_data.image
service_networks: List[str] = []
if service_data.networks is not None:
if type(service_data.networks) is spec.ListOfStrings:
service_networks = service_data.networks.root
elif type(service_data.networks) is dict:
service_networks = list(service_data.networks.keys())
service_extends: Optional[Extends] = None
if service_data.extends is not None:
# https://github.com/compose-spec/compose-spec/blob/master/spec.md#extends
# The value of the extends key MUST be a dictionary.
assert type(service_data.extends) is spec.Extends
service_extends = Extends(
service_name=service_data.extends.service, from_file=service_data.extends.file
)
service_ports: List[Port] = []
if service_data.ports is not None:
for port_data in service_data.ports:
host_ip: Optional[str] = None
host_port: Optional[str] = None
container_port: Optional[str] = None
protocol: Optional[str] = None
if type(port_data) is float:
container_port = str(int(port_data))
host_port = f"0.0.0.0:{container_port}"
elif type(port_data) is str:
regex = r"((?P<host_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:)|:)?((?P<host_port>\d+(\-\d+)?):)?((?P<container_port>\d+(\-\d+)?))?(/(?P<protocol>\w+))?" # noqa: E501
match = re.match(regex, port_data)
if match:
host_ip = match.group("host_ip")
host_port = match.group("host_port")
container_port = match.group("container_port")
protocol = match.group("protocol")
assert container_port, "Invalid port format, aborting."
if container_port is not None and host_port is None:
host_port = container_port
if host_ip is not None:
host_port = f"{host_ip}{host_port}"
else:
host_port = f"0.0.0.0:{host_port}"
elif type(port_data) is spec.Ports:
assert port_data.target is not None, "Invalid port format, aborting."
# ruamel.yaml does not parse port as int
assert type(port_data.published) is not int
if type(port_data.published) is str:
host_port = port_data.published
if type(port_data.target) is int:
container_port = str(port_data.target)
host_ip = port_data.host_ip
protocol = port_data.protocol
if container_port is not None and host_port is None:
host_port = container_port
if host_ip is not None:
host_port = f"{host_ip}:{host_port}"
else:
host_port = f"0.0.0.0:{host_port}"
assert host_port is not None, "Error while parsing port, aborting."
assert container_port is not None, "Error while parsing port, aborting."
if protocol is None:
protocol = "any"
service_ports.append(
Port(
host_port=host_port,
container_port=container_port,
protocol=Protocol[protocol],
)
)
service_depends_on: List[str] = []
if service_data.depends_on is not None:
service_depends_on = Parser._unwrap_depends_on(service_data.depends_on)
service_volumes: List[Volume] = []
if service_data.volumes is not None:
for volume_data in service_data.volumes:
if type(volume_data) is str:
assert ":" in volume_data, "Invalid volume input, aborting."
spilt_data = volume_data.split(":")
if len(spilt_data) == 2:
service_volumes.append(Volume(source=spilt_data[0], target=spilt_data[1]))
elif len(spilt_data) == 3:
service_volumes.append(
Volume(
source=spilt_data[0],
target=spilt_data[1],
access_mode=spilt_data[2],
)
)
elif type(volume_data) is spec.Volumes:
assert volume_data.target is not None, "Invalid volume input, aborting."
# https://github.com/compose-spec/compose-spec/blob/master/spec.md#long-syntax-4
# `volume_data.source` is not applicable for a tmpfs mount.
if volume_data.source is None:
volume_data.source = volume_data.target
assert volume_data.source is not None
service_volumes.append(
Volume(
source=volume_data.source,
target=volume_data.target,
type=VolumeType[volume_data.type],
)
)
service_links: List[str] = []
if service_data.links is not None:
service_links = service_data.links
cgroup_parent: Optional[str] = None
if service_data.cgroup_parent is not None:
cgroup_parent = service_data.cgroup_parent
container_name: Optional[str] = None
if service_data.container_name is not None:
container_name = service_data.container_name
env_file: List[str] = []
if service_data.env_file is not None:
if type(service_data.env_file) is spec.StringOrList:
if type(service_data.env_file.root) is spec.ListOfStrings:
env_file = service_data.env_file.root.root
elif type(service_data.env_file.root) is str:
env_file.append(service_data.env_file.root)
expose: List[str] = []
if service_data.expose is not None:
for port in service_data.expose:
expose.append(str(port))
profiles: List[str] = []
if service_data.profiles is not None:
if type(service_data.profiles) is spec.ListOfStrings:
profiles = service_data.profiles.root
devices: List[Device] = []
if service_data.devices is not None:
for device_data in service_data.devices:
if type(device_data) is str:
assert ":" in device_data, "Invalid volume input, aborting."
spilt_data = device_data.split(":")
if len(spilt_data) == 2:
devices.append(Device(host_path=spilt_data[0], container_path=spilt_data[1]))
elif len(spilt_data) == 3:
devices.append(
Device(
host_path=spilt_data[0],
container_path=spilt_data[1],
cgroup_permissions=spilt_data[2],
)
)
services.append(
Service(
name=service_name,
image=service_image,
networks=service_networks,
extends=service_extends,
ports=service_ports,
depends_on=service_depends_on,
volumes=service_volumes,
links=service_links,
cgroup_parent=cgroup_parent,
container_name=container_name,
env_file=env_file,
expose=expose,
profiles=profiles,
devices=devices,
)
)
return Compose(services=services)