compose-viz/compose_viz/parser.py

279 lines
12 KiB
Python
Raw Normal View History

import re
2022-12-15 11:45:49 +01:00
from typing import Any, Dict, List, Optional, Union
2022-05-07 18:42:14 +02:00
2024-04-27 10:55:24 +02:00
from pydantic_yaml import parse_yaml_raw_as
2022-05-27 13:56:30 +02:00
import compose_viz.spec.compose_spec as spec
2022-05-27 08:17:04 +02:00
from compose_viz.models.compose import Compose, Service
2022-06-06 18:42:30 +02:00
from compose_viz.models.device import Device
2022-05-27 08:17:04 +02:00
from compose_viz.models.extends import Extends
2024-04-27 18:42:34 +02:00
from compose_viz.models.port import AppProtocol, Port, Protocol
2022-05-27 08:17:04 +02:00
from compose_viz.models.volume import Volume, VolumeType
2022-05-21 11:47:35 +02:00
2022-05-21 16:32:59 +02:00
2022-05-07 18:42:14 +02:00
class Parser:
def __init__(self):
pass
2022-12-15 11:45:49 +01:00
@staticmethod
def _unwrap_depends_on(data_depends_on: Union[spec.ListOfStrings, Dict[Any, spec.DependsOn], None]) -> List[str]:
service_depends_on = []
if type(data_depends_on) is spec.ListOfStrings:
2024-04-27 10:55:24 +02:00
service_depends_on = data_depends_on.root
2022-12-15 11:45:49 +01:00
elif type(data_depends_on) is dict:
for depends_on in data_depends_on.keys():
service_depends_on.append(str(depends_on))
return service_depends_on
@staticmethod
2023-01-10 17:54:06 +01:00
def compile_dependencies(service_name: str, services: Dict[Any, spec.Service], file_path: str) -> List[str]:
assert service_name in services, f"Service '{service_name}' not found in given compose file: '{file_path}'"
2022-12-15 11:45:49 +01:00
dependencies = []
2023-01-10 17:54:06 +01:00
for dependency in Parser._unwrap_depends_on(services[service_name].depends_on):
2022-12-15 11:45:49 +01:00
if dependency:
dependencies.append(dependency)
2023-01-10 17:54:06 +01:00
dependencies.extend(Parser.compile_dependencies(dependency, services, file_path))
2022-12-15 11:45:49 +01:00
return dependencies
def parse(self, file_path: str, root_service: Optional[str] = None) -> Compose:
2022-05-27 13:56:30 +02:00
compose_data: spec.ComposeSpecification
try:
2024-04-27 10:55:24 +02:00
with open(file_path, "r") as file:
file_content = file.read()
compose_data = parse_yaml_raw_as(spec.ComposeSpecification, file_content)
2024-04-27 11:29:14 +02:00
except Exception as e:
2022-05-27 13:56:30 +02:00
raise RuntimeError(f"Error parsing file '{file_path}': {e}")
2022-05-21 16:32:59 +02:00
services: List[Service] = []
assert compose_data.services is not None, "No services found, aborting."
root_dependencies: List[str] = []
2022-12-15 11:45:49 +01:00
if root_service:
2023-01-10 17:54:06 +01:00
root_dependencies = Parser.compile_dependencies(root_service, compose_data.services, file_path)
2022-12-15 11:45:49 +01:00
root_dependencies.append(root_service)
2023-01-10 17:54:06 +01:00
root_dependencies = list(set(root_dependencies))
2022-12-15 11:45:49 +01:00
for service_name, service_data in compose_data.services.items():
service_name = str(service_name)
2022-12-15 11:45:49 +01:00
if root_service and service_name not in root_dependencies:
continue
2022-05-21 16:32:59 +02:00
2022-05-18 17:28:18 +02:00
service_image: Optional[str] = None
if service_data.build is not None:
if type(service_data.build) is str:
service_image = f"build from '{service_data.build}'"
2024-04-27 10:55:24 +02:00
elif type(service_data.build) is spec.Build:
if service_data.build.context is not None and service_data.build.dockerfile is not None:
service_image = (
f"build from '{service_data.build.context}' using '{service_data.build.dockerfile}'"
)
elif service_data.build.context is not None:
service_image = f"build from '{service_data.build.context}'"
if service_data.image is not None:
if service_image is not None:
service_image += ", image: " + service_data.image
else:
service_image = service_data.image
2022-05-18 17:28:18 +02:00
service_networks: List[str] = []
if service_data.networks is not None:
if type(service_data.networks) is spec.ListOfStrings:
2024-04-27 10:55:24 +02:00
service_networks = service_data.networks.root
elif type(service_data.networks) is dict:
service_networks = list(service_data.networks.keys())
2022-05-21 08:32:07 +02:00
service_extends: Optional[Extends] = None
if service_data.extends is not None:
# https://github.com/compose-spec/compose-spec/blob/master/spec.md#extends
# The value of the extends key MUST be a dictionary.
2024-04-27 10:55:24 +02:00
assert type(service_data.extends) is spec.Extends
service_extends = Extends(
service_name=service_data.extends.service, from_file=service_data.extends.file
)
2022-05-18 17:28:18 +02:00
2022-05-21 19:57:21 +02:00
service_ports: List[Port] = []
if service_data.ports is not None:
for port_data in service_data.ports:
host_ip: Optional[str] = None
host_port: Optional[str] = None
container_port: Optional[str] = None
protocol: Optional[str] = None
2024-04-27 18:42:34 +02:00
app_protocol: Optional[str] = None
2022-05-27 11:37:39 +02:00
if type(port_data) is float:
container_port = str(int(port_data))
host_port = f"0.0.0.0:{container_port}"
elif type(port_data) is str:
regex = r"((?P<host_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:|(\$\{([^}]+)\}):)|:|)?((?P<host_port>\d+(\-\d+)?):)?((?P<container_port>\d+(\-\d+)?))?(/(?P<protocol>\w+))?" # noqa: E501
match = re.match(regex, port_data)
if match:
host_ip = match.group("host_ip")
host_port = match.group("host_port")
container_port = match.group("container_port")
protocol = match.group("protocol")
assert container_port, "Invalid port format, aborting."
2022-05-27 11:37:39 +02:00
if container_port is not None and host_port is None:
host_port = container_port
2022-05-27 11:37:39 +02:00
if host_ip is not None:
host_port = f"{host_ip}{host_port}"
2022-05-24 10:02:47 +02:00
else:
host_port = f"0.0.0.0:{host_port}"
2024-04-27 10:55:24 +02:00
elif type(port_data) is spec.Ports:
assert port_data.target is not None, "Invalid port format, aborting."
2024-04-27 11:29:14 +02:00
if type(port_data.published) is str or type(port_data.published) is int:
host_port = str(port_data.published)
if type(port_data.target) is int:
container_port = str(port_data.target)
host_ip = port_data.host_ip
protocol = port_data.protocol
2024-04-27 18:42:34 +02:00
app_protocol = port_data.app_protocol
2022-05-27 11:37:39 +02:00
if container_port is not None and host_port is None:
host_port = container_port
2022-05-18 17:28:18 +02:00
2022-05-27 11:37:39 +02:00
if host_ip is not None:
host_port = f"{host_ip}:{host_port}"
else:
host_port = f"0.0.0.0:{host_port}"
2022-05-14 15:30:18 +02:00
assert host_port is not None, "Error while parsing port, aborting."
assert container_port is not None, "Error while parsing port, aborting."
if protocol is None:
protocol = "any"
2024-04-27 18:42:34 +02:00
if app_protocol is None:
app_protocol = "na"
service_ports.append(
Port(
host_port=host_port,
container_port=container_port,
protocol=Protocol[protocol],
2024-04-27 18:42:34 +02:00
app_protocol=AppProtocol[app_protocol],
)
)
service_depends_on: List[str] = []
if service_data.depends_on is not None:
2022-12-15 11:45:49 +01:00
service_depends_on = Parser._unwrap_depends_on(service_data.depends_on)
service_volumes: List[Volume] = []
if service_data.volumes is not None:
for volume_data in service_data.volumes:
if type(volume_data) is str:
assert ":" in volume_data, "Invalid volume input, aborting."
spilt_data = volume_data.split(":")
if len(spilt_data) == 2:
service_volumes.append(Volume(source=spilt_data[0], target=spilt_data[1]))
elif len(spilt_data) == 3:
service_volumes.append(
Volume(
source=spilt_data[0],
target=spilt_data[1],
access_mode=spilt_data[2],
)
)
2024-04-27 10:55:24 +02:00
elif type(volume_data) is spec.Volumes:
assert volume_data.target is not None, "Invalid volume input, aborting."
# https://github.com/compose-spec/compose-spec/blob/master/spec.md#long-syntax-4
# `volume_data.source` is not applicable for a tmpfs mount.
if volume_data.source is None:
volume_data.source = volume_data.target
assert volume_data.source is not None
service_volumes.append(
Volume(
source=volume_data.source,
target=volume_data.target,
type=VolumeType[volume_data.type],
)
)
2022-05-21 16:00:38 +02:00
service_links: List[str] = []
if service_data.links is not None:
service_links = service_data.links
2022-06-03 10:57:58 +02:00
cgroup_parent: Optional[str] = None
if service_data.cgroup_parent is not None:
cgroup_parent = service_data.cgroup_parent
2022-06-03 11:19:43 +02:00
2022-06-03 10:57:58 +02:00
container_name: Optional[str] = None
if service_data.container_name is not None:
container_name = service_data.container_name
2022-06-03 11:19:43 +02:00
2022-06-03 10:57:58 +02:00
env_file: List[str] = []
if service_data.env_file is not None:
2024-04-27 11:29:14 +02:00
if type(service_data.env_file.root) is str:
env_file = [service_data.env_file.root]
elif type(service_data.env_file.root) is list:
for env_file_data in service_data.env_file.root:
if type(env_file_data) is str:
env_file.append(env_file_data)
elif type(env_file_data) is spec.EnvFilePath:
env_file.append(env_file_data.path)
else:
print(f"Invalid env_file data: {service_data.env_file.root}")
2022-06-03 10:57:58 +02:00
expose: List[str] = []
if service_data.expose is not None:
2022-06-03 11:19:43 +02:00
for port in service_data.expose:
expose.append(str(port))
2022-06-03 10:57:58 +02:00
profiles: List[str] = []
if service_data.profiles is not None:
if type(service_data.profiles) is spec.ListOfStrings:
2024-04-27 10:55:24 +02:00
profiles = service_data.profiles.root
2022-06-03 10:57:58 +02:00
2022-06-06 18:42:30 +02:00
devices: List[Device] = []
if service_data.devices is not None:
for device_data in service_data.devices:
if type(device_data) is str:
assert ":" in device_data, "Invalid volume input, aborting."
spilt_data = device_data.split(":")
if len(spilt_data) == 2:
devices.append(Device(host_path=spilt_data[0], container_path=spilt_data[1]))
elif len(spilt_data) == 3:
devices.append(
Device(
host_path=spilt_data[0],
container_path=spilt_data[1],
cgroup_permissions=spilt_data[2],
)
)
2022-05-21 10:08:35 +02:00
services.append(
Service(
name=service_name,
image=service_image,
networks=service_networks,
extends=service_extends,
2022-05-21 19:50:07 +02:00
ports=service_ports,
2022-05-21 10:08:35 +02:00
depends_on=service_depends_on,
volumes=service_volumes,
2022-05-21 16:32:59 +02:00
links=service_links,
2022-06-03 10:57:58 +02:00
cgroup_parent=cgroup_parent,
container_name=container_name,
env_file=env_file,
expose=expose,
profiles=profiles,
2022-06-06 18:42:30 +02:00
devices=devices,
2022-05-21 10:08:35 +02:00
)
)
return Compose(services=services)