functions.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. import re
  2. import copy
  3. import bcrypt
  4. import secrets
  5. import urllib.parse
  6. from base64 import b64encode
  7. from typing import TYPE_CHECKING
  8. if TYPE_CHECKING:
  9. from render import Render
  10. try:
  11. from .error import RenderError
  12. from .volume_sources import HostPathSource, IxVolumeSource
  13. except ImportError:
  14. from error import RenderError
  15. from volume_sources import HostPathSource, IxVolumeSource
  16. class Functions:
  17. def __init__(self, render_instance: "Render"):
  18. self._render_instance = render_instance
  19. def _bcrypt_hash(self, password):
  20. hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
  21. return hashed
  22. def _htpasswd(self, username, password):
  23. hashed = self._bcrypt_hash(password)
  24. return username + ":" + hashed
  25. def _secure_string(self, length):
  26. return secrets.token_urlsafe(length)[:length]
  27. def _basic_auth(self, username, password):
  28. return b64encode(f"{username}:{password}".encode("utf-8")).decode("utf-8")
  29. def _basic_auth_header(self, username, password):
  30. return f"Basic {self._basic_auth(username, password)}"
  31. def _fail(self, message):
  32. raise RenderError(message)
  33. def _camel_case(self, string):
  34. return string.title()
  35. def _auto_cast(self, value):
  36. lower_str_value = str(value).lower()
  37. if lower_str_value in ["true", "false"]:
  38. return lower_str_value == "true"
  39. try:
  40. float_value = float(value)
  41. if float_value.is_integer():
  42. return int(float_value)
  43. else:
  44. return float(value)
  45. except ValueError:
  46. pass
  47. return value
  48. def _match_regex(self, value, regex):
  49. if not re.match(regex, value):
  50. return False
  51. return True
  52. def _must_match_regex(self, value, regex):
  53. if not self._match_regex(value, regex):
  54. raise RenderError(f"Expected [{value}] to match [{regex}]")
  55. return value
  56. def _is_boolean(self, string):
  57. return string.lower() in ["true", "false"]
  58. def _is_number(self, string):
  59. try:
  60. float(string)
  61. return True
  62. except ValueError:
  63. return False
  64. def _copy_dict(self, dict):
  65. return copy.deepcopy(dict)
  66. def _merge_dicts(self, *dicts):
  67. merged_dict = {}
  68. for dictionary in dicts:
  69. merged_dict.update(dictionary)
  70. return merged_dict
  71. def _disallow_chars(self, string: str, chars: list[str], key: str):
  72. for char in chars:
  73. if char in string:
  74. raise RenderError(f"Disallowed character [{char}] in [{key}]")
  75. return string
  76. def _or_default(self, value, default):
  77. if not value:
  78. return default
  79. return value
  80. def _url_to_dict(self, url: str, v6_brackets: bool = False):
  81. try:
  82. # Try parsing as-is first
  83. parsed = urllib.parse.urlparse(url)
  84. # If we didn't get a hostname, try with http:// prefix
  85. if not parsed.hostname:
  86. parsed = urllib.parse.urlparse(f"http://{url}")
  87. # Final check that we have a valid result
  88. if not parsed.hostname:
  89. raise RenderError(
  90. f"Failed to parse URL [{url}]. Ensure it is a valid URL with a hostname and optional port."
  91. )
  92. result = {
  93. "host": parsed.hostname,
  94. "port": parsed.port,
  95. }
  96. if v6_brackets and parsed.hostname and ":" in parsed.hostname:
  97. result["host"] = f"[{parsed.hostname}]"
  98. result["host_no_brackets"] = parsed.hostname
  99. return result
  100. except Exception:
  101. raise RenderError(
  102. f"Failed to parse URL [{url}]. Ensure it is a valid URL with a hostname and optional port."
  103. )
  104. def _require_unique(self, values, key, split_char=""):
  105. new_values = []
  106. for value in values:
  107. new_values.append(value.split(split_char)[0] if split_char else value)
  108. if len(new_values) != len(set(new_values)):
  109. raise RenderError(f"Expected values in [{key}] to be unique, but got [{', '.join(values)}]")
  110. def _require_no_reserved(self, values, key, reserved, split_char="", starts_with=False):
  111. new_values = []
  112. for value in values:
  113. new_values.append(value.split(split_char)[0] if split_char else value)
  114. if starts_with:
  115. for arg in new_values:
  116. for reserved_value in reserved:
  117. if arg.startswith(reserved_value):
  118. raise RenderError(f"Value [{reserved_value}] is reserved and cannot be set in [{key}]")
  119. return
  120. for reserved_value in reserved:
  121. if reserved_value in new_values:
  122. raise RenderError(f"Value [{reserved_value}] is reserved and cannot be set in [{key}]")
  123. def _url_encode(self, string):
  124. return urllib.parse.quote_plus(string)
  125. def _temp_config(self, name):
  126. if not name:
  127. raise RenderError("Expected [name] to be set when calling [temp_config].")
  128. return {"type": "temporary", "volume_config": {"volume_name": name}}
  129. def _get_host_path(self, storage):
  130. source_type = storage.get("type", "")
  131. if not source_type:
  132. raise RenderError("Expected [type] to be set for volume mounts.")
  133. match source_type:
  134. case "host_path":
  135. mount_config = storage.get("host_path_config")
  136. if mount_config is None:
  137. raise RenderError("Expected [host_path_config] to be set for [host_path] type.")
  138. host_source = HostPathSource(self._render_instance, mount_config).get()
  139. return host_source
  140. case "ix_volume":
  141. mount_config = storage.get("ix_volume_config")
  142. if mount_config is None:
  143. raise RenderError("Expected [ix_volume_config] to be set for [ix_volume] type.")
  144. ix_source = IxVolumeSource(self._render_instance, mount_config).get()
  145. return ix_source
  146. case _:
  147. raise RenderError(f"Storage type [{source_type}] does not support host path.")
  148. def func_map(self):
  149. return {
  150. "auto_cast": self._auto_cast,
  151. "basic_auth_header": self._basic_auth_header,
  152. "basic_auth": self._basic_auth,
  153. "bcrypt_hash": self._bcrypt_hash,
  154. "camel_case": self._camel_case,
  155. "copy_dict": self._copy_dict,
  156. "fail": self._fail,
  157. "htpasswd": self._htpasswd,
  158. "is_boolean": self._is_boolean,
  159. "is_number": self._is_number,
  160. "match_regex": self._match_regex,
  161. "merge_dicts": self._merge_dicts,
  162. "must_match_regex": self._must_match_regex,
  163. "secure_string": self._secure_string,
  164. "disallow_chars": self._disallow_chars,
  165. "get_host_path": self._get_host_path,
  166. "or_default": self._or_default,
  167. "temp_config": self._temp_config,
  168. "require_unique": self._require_unique,
  169. "require_no_reserved": self._require_no_reserved,
  170. "url_encode": self._url_encode,
  171. "url_to_dict": self._url_to_dict,
  172. }