123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- import json
- import pathlib
- from typing import TYPE_CHECKING
- if TYPE_CHECKING:
- from render import Render
- from storage import IxStorage
- try:
- from .error import RenderError
- from .validations import valid_octal_mode_or_raise, valid_fs_path_or_raise
- except ImportError:
- from error import RenderError
- from validations import valid_octal_mode_or_raise, valid_fs_path_or_raise
- class PermsContainer:
- def __init__(self, render_instance: "Render", name: str):
- self._render_instance = render_instance
- self._name = name
- self.actions: set[str] = set()
- self.parsed_configs: list[dict] = []
- def add_or_skip_action(self, identifier: str, volume_config: "IxStorage", action_config: dict):
- identifier = self.normalize_identifier_for_path(identifier)
- if identifier in self.actions:
- raise RenderError(f"Action with id [{identifier}] already used for another permission action")
- parsed_action = self.parse_action(identifier, volume_config, action_config)
- if parsed_action:
- self.parsed_configs.append(parsed_action)
- self.actions.add(identifier)
- def parse_action(self, identifier: str, volume_config: "IxStorage", action_config: dict):
- valid_modes = [
- "always", # Always set permissions, without checking.
- "check", # Checks if permissions are correct, and set them if not.
- ]
- mode = action_config.get("mode", "check")
- uid = action_config.get("uid", None)
- gid = action_config.get("gid", None)
- chmod = action_config.get("chmod", None)
- recursive = action_config.get("recursive", False)
- mount_path = pathlib.Path("/mnt/permission", identifier).as_posix()
- read_only = volume_config.get("read_only", False)
- is_temporary = False
- vol_type = volume_config.get("type", "")
- match vol_type:
- case "temporary":
- # If it is a temporary volume, we force auto permissions
- # and set is_temporary to True, so it will be cleaned up
- is_temporary = True
- recursive = True
- case "volume":
- if not volume_config.get("volume_config", {}).get("auto_permissions", False):
- return None
- case "host_path":
- host_path_config = volume_config.get("host_path_config", {})
- # Skip when ACL enabled
- if host_path_config.get("acl_enable", False):
- return None
- if not host_path_config.get("auto_permissions", False):
- return None
- case "ix_volume":
- ix_vol_config = volume_config.get("ix_volume_config", {})
- # Skip when ACL enabled
- if ix_vol_config.get("acl_enable", False):
- return None
- # For ix_volumes, we default to auto_permissions = True
- if not ix_vol_config.get("auto_permissions", True):
- return None
- case _:
- # Skip for other types
- return None
- if mode not in valid_modes:
- raise RenderError(f"Expected [mode] to be one of [{', '.join(valid_modes)}], got [{mode}]")
- if not isinstance(uid, int) or not isinstance(gid, int):
- raise RenderError("Expected [uid] and [gid] to be set when [auto_permissions] is enabled")
- if chmod is not None:
- chmod = valid_octal_mode_or_raise(chmod)
- mount_path = valid_fs_path_or_raise(mount_path)
- return {
- "mount_path": mount_path,
- "volume_config": volume_config,
- "action_data": {
- "read_only": read_only,
- "mount_path": mount_path,
- "is_temporary": is_temporary,
- "identifier": identifier,
- "recursive": recursive,
- "mode": mode,
- "uid": uid,
- "gid": gid,
- "chmod": chmod,
- },
- }
- def normalize_identifier_for_path(self, identifier: str):
- return identifier.rstrip("/").lstrip("/").lower().replace("/", "_").replace(".", "-").replace(" ", "-")
- def has_actions(self):
- return bool(self.actions)
- def activate(self):
- if len(self.parsed_configs) != len(self.actions):
- raise RenderError("Number of actions and parsed configs does not match")
- if not self.has_actions():
- raise RenderError("No actions added. Check if there are actions before activating")
- # Add the container and set it up
- c = self._render_instance.add_container(self._name, "python_permissions_image")
- c.set_user(0, 0)
- c.add_caps(["CHOWN", "FOWNER", "DAC_OVERRIDE"])
- c.set_network_mode("none")
- # Don't attach any devices
- c.remove_devices()
- c.deploy.resources.set_profile("medium")
- c.restart.set_policy("on-failure", maximum_retry_count=1)
- c.healthcheck.disable()
- c.set_entrypoint(["python3", "/script/run.py"])
- script = "#!/usr/bin/env python3\n"
- script += get_script()
- c.configs.add("permissions_run_script", script, "/script/run.py", "0700")
- actions_data: list[dict] = []
- for parsed in self.parsed_configs:
- if not parsed["action_data"]["read_only"]:
- c.add_storage(parsed["mount_path"], parsed["volume_config"])
- actions_data.append(parsed["action_data"])
- actions_data_json = json.dumps(actions_data)
- c.configs.add("permissions_actions_data", actions_data_json, "/script/actions.json", "0500")
- def get_script():
- return """
- import os
- import json
- import time
- import shutil
- with open("/script/actions.json", "r") as f:
- actions_data = json.load(f)
- if not actions_data:
- # If this script is called, there should be actions data
- raise ValueError("No actions data found")
- def fix_perms(path, chmod, recursive=False):
- print(f"Changing permissions{' recursively ' if recursive else ' '}to {chmod} on: [{path}]")
- os.chmod(path, int(chmod, 8))
- if recursive:
- for root, dirs, files in os.walk(path):
- for f in files:
- os.chmod(os.path.join(root, f), int(chmod, 8))
- print("Permissions after changes:")
- print_chmod_stat()
- def fix_owner(path, uid, gid, recursive=False):
- print(f"Changing ownership{' recursively ' if recursive else ' '}to {uid}:{gid} on: [{path}]")
- os.chown(path, uid, gid)
- if recursive:
- for root, dirs, files in os.walk(path):
- for f in files:
- os.chown(os.path.join(root, f), uid, gid)
- print("Ownership after changes:")
- print_chown_stat()
- def print_chown_stat():
- curr_stat = os.stat(action["mount_path"])
- print(f"Ownership: [{curr_stat.st_uid}:{curr_stat.st_gid}]")
- def print_chmod_stat():
- curr_stat = os.stat(action["mount_path"])
- print(f"Permissions: [{oct(curr_stat.st_mode)[3:]}]")
- def print_chown_diff(curr_stat, uid, gid):
- print(f"Ownership: wanted [{uid}:{gid}], got [{curr_stat.st_uid}:{curr_stat.st_gid}].")
- def print_chmod_diff(curr_stat, mode):
- print(f"Permissions: wanted [{mode}], got [{oct(curr_stat.st_mode)[3:]}].")
- def perform_action(action):
- if action["read_only"]:
- print(f"Path for action [{action['identifier']}] is read-only, skipping...")
- return
- start_time = time.time()
- print(f"=== Applying configuration on volume with identifier [{action['identifier']}] ===")
- if not os.path.isdir(action["mount_path"]):
- print(f"Path [{action['mount_path']}] is not a directory, skipping...")
- return
- if action["is_temporary"]:
- print(f"Path [{action['mount_path']}] is a temporary directory, ensuring it is empty...")
- for item in os.listdir(action["mount_path"]):
- item_path = os.path.join(action["mount_path"], item)
- # Exclude the safe directory, where we can use to mount files temporarily
- if os.path.basename(item_path) == "ix-safe":
- continue
- if os.path.isdir(item_path):
- shutil.rmtree(item_path)
- else:
- os.remove(item_path)
- if not action["is_temporary"] and os.listdir(action["mount_path"]):
- print(f"Path [{action['mount_path']}] is not empty, skipping...")
- return
- print(f"Current Ownership and Permissions on [{action['mount_path']}]:")
- curr_stat = os.stat(action["mount_path"])
- print_chown_diff(curr_stat, action["uid"], action["gid"])
- print_chmod_diff(curr_stat, action["chmod"])
- print("---")
- if action["mode"] == "always":
- fix_owner(action["mount_path"], action["uid"], action["gid"], action["recursive"])
- if not action["chmod"]:
- print("Skipping permissions check, chmod is falsy")
- else:
- fix_perms(action["mount_path"], action["chmod"], action["recursive"])
- return
- elif action["mode"] == "check":
- if curr_stat.st_uid != action["uid"] or curr_stat.st_gid != action["gid"]:
- print("Ownership is incorrect. Fixing...")
- fix_owner(action["mount_path"], action["uid"], action["gid"], action["recursive"])
- else:
- print("Ownership is correct. Skipping...")
- if not action["chmod"]:
- print("Skipping permissions check, chmod is falsy")
- else:
- if oct(curr_stat.st_mode)[3:] != action["chmod"]:
- print("Permissions are incorrect. Fixing...")
- fix_perms(action["mount_path"], action["chmod"], action["recursive"])
- else:
- print("Permissions are correct. Skipping...")
- print(f"Time taken: {(time.time() - start_time) * 1000:.2f}ms")
- print(f"=== Finished applying configuration on volume with identifier [{action['identifier']}] ==")
- print()
- if __name__ == "__main__":
- start_time = time.time()
- for action in actions_data:
- perform_action(action)
- print(f"Total time taken: {(time.time() - start_time) * 1000:.2f}ms")
- """
|