ports.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import ipaddress
  2. from typing import TYPE_CHECKING
  3. if TYPE_CHECKING:
  4. from render import Render
  5. try:
  6. from .error import RenderError
  7. from .validations import (
  8. valid_ip_or_raise,
  9. valid_port_mode_or_raise,
  10. valid_port_or_raise,
  11. valid_port_protocol_or_raise,
  12. )
  13. except ImportError:
  14. from error import RenderError
  15. from validations import (
  16. valid_ip_or_raise,
  17. valid_port_mode_or_raise,
  18. valid_port_or_raise,
  19. valid_port_protocol_or_raise,
  20. )
  21. class Ports:
  22. def __init__(self, render_instance: "Render"):
  23. self._render_instance = render_instance
  24. self._ports: dict[str, dict] = {}
  25. def _gen_port_key(self, host_port: int, host_ip: str, proto: str, ip_family: int) -> str:
  26. return f"{host_port}_{host_ip}_{proto}_{ip_family}"
  27. def _is_wildcard_ip(self, ip: str) -> bool:
  28. return ip in ["0.0.0.0", "::"]
  29. def _get_opposite_wildcard(self, ip: str) -> str:
  30. return "0.0.0.0" if ip == "::" else "::"
  31. def _get_sort_key(self, p: dict) -> str:
  32. return f"{p['published']}_{p['target']}_{p['protocol']}_{p.get('host_ip', '_')}"
  33. def _is_ports_same(self, port1: dict, port2: dict) -> bool:
  34. return (
  35. port1["published"] == port2["published"]
  36. and port1["target"] == port2["target"]
  37. and port1["protocol"] == port2["protocol"]
  38. and port1.get("host_ip", "_") == port2.get("host_ip", "_")
  39. )
  40. def _has_opposite_family_port(self, port_config: dict, wildcard_ports: dict) -> bool:
  41. comparison_port = port_config.copy()
  42. comparison_port["host_ip"] = self._get_opposite_wildcard(port_config["host_ip"])
  43. for p in wildcard_ports.values():
  44. if self._is_ports_same(comparison_port, p):
  45. return True
  46. return False
  47. def _check_port_conflicts(self, port_config: dict, ip_family: int) -> None:
  48. host_port = port_config["published"]
  49. host_ip = port_config["host_ip"]
  50. proto = port_config["protocol"]
  51. key = self._gen_port_key(host_port, host_ip, proto, ip_family)
  52. if key in self._ports.keys():
  53. raise RenderError(f"Port [{host_port}/{proto}/ipv{ip_family}] already added for [{host_ip}]")
  54. wildcard_ip = "0.0.0.0" if ip_family == 4 else "::"
  55. if host_ip != wildcard_ip:
  56. # Check if there is a port with same details but with wildcard IP of the same family
  57. wildcard_key = self._gen_port_key(host_port, wildcard_ip, proto, ip_family)
  58. if wildcard_key in self._ports.keys():
  59. raise RenderError(
  60. f"Cannot bind port [{host_port}/{proto}/ipv{ip_family}] to [{host_ip}], "
  61. f"already bound to [{wildcard_ip}]"
  62. )
  63. else:
  64. # We are adding a port with wildcard IP
  65. # Check if there is a port with same details but with specific IP of the same family
  66. for p in self._ports.values():
  67. # Skip if the port is not for the same family
  68. if ip_family != ipaddress.ip_address(p["host_ip"]).version:
  69. continue
  70. # Make a copy of the port config
  71. search_port = p.copy()
  72. # Replace the host IP with wildcard IP
  73. search_port["host_ip"] = wildcard_ip
  74. # If the ports match, means that a port for specific IP is already added
  75. # and we are trying to add it again with wildcard IP. Raise an error
  76. if self._is_ports_same(search_port, port_config):
  77. raise RenderError(
  78. f"Cannot bind port [{host_port}/{proto}/ipv{ip_family}] to [{host_ip}], "
  79. f"already bound to [{p['host_ip']}]"
  80. )
  81. def _add_port(self, host_port: int, container_port: int, config: dict | None = None):
  82. config = config or {}
  83. host_port = valid_port_or_raise(host_port)
  84. container_port = valid_port_or_raise(container_port)
  85. proto = valid_port_protocol_or_raise(config.get("protocol", "tcp"))
  86. mode = valid_port_mode_or_raise(config.get("mode", "ingress"))
  87. host_ip = valid_ip_or_raise(config.get("host_ip", ""))
  88. ip = ipaddress.ip_address(host_ip)
  89. port_config = {
  90. "published": host_port,
  91. "target": container_port,
  92. "protocol": proto,
  93. "mode": mode,
  94. "host_ip": host_ip,
  95. }
  96. self._check_port_conflicts(port_config, ip.version)
  97. key = self._gen_port_key(host_port, host_ip, proto, ip.version)
  98. self._ports[key] = port_config
  99. # After all the local validations, lets validate the port with the TrueNAS API
  100. self._render_instance.client.validate_ip_port_combo(host_ip, host_port)
  101. def has_ports(self):
  102. return len(self._ports) > 0
  103. def render(self):
  104. specific_ports = []
  105. wildcard_ports = {}
  106. for port_config in self._ports.values():
  107. if self._is_wildcard_ip(port_config["host_ip"]):
  108. wildcard_ports[id(port_config)] = port_config.copy()
  109. else:
  110. specific_ports.append(port_config.copy())
  111. processed_ports = specific_ports.copy()
  112. for wild_port in wildcard_ports.values():
  113. processed_port = wild_port.copy()
  114. # Check if there's a matching wildcard port for the opposite IP family
  115. has_opposite_family = self._has_opposite_family_port(wild_port, wildcard_ports)
  116. if has_opposite_family:
  117. processed_port.pop("host_ip")
  118. if processed_port not in processed_ports:
  119. processed_ports.append(processed_port)
  120. return sorted(processed_ports, key=self._get_sort_key)