storage.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. def migrate_storage_item(storage_item, include_read_only=False):
  2. if not storage_item:
  3. raise ValueError("Expected [storage_item] to be set")
  4. result = {}
  5. if storage_item["type"] == "ixVolume":
  6. if storage_item.get("ixVolumeConfig"):
  7. result = migrate_ix_volume_type(storage_item)
  8. elif storage_item.get("datasetName"):
  9. result = migrate_old_ix_volume_type(storage_item)
  10. else:
  11. raise ValueError(
  12. "Expected [ix_volume] to have [ixVolumeConfig] or [datasetName] set"
  13. )
  14. elif storage_item["type"] == "hostPath":
  15. if storage_item.get("hostPathConfig"):
  16. result = migrate_host_path_type(storage_item)
  17. elif storage_item.get("hostPath"):
  18. result = migrate_old_host_path_type(storage_item)
  19. else:
  20. raise ValueError(
  21. "Expected [host_path] to have [hostPathConfig] or [hostPath] set"
  22. )
  23. elif storage_item["type"] == "emptyDir":
  24. result = migrate_empty_dir_type(storage_item)
  25. elif storage_item["type"] == "smb-pv-pvc":
  26. result = migrate_smb_pv_pvc_type(storage_item)
  27. mount_path = storage_item.get("mountPath", "")
  28. if mount_path:
  29. result.update({"mount_path": mount_path})
  30. if include_read_only:
  31. result.update({"read_only": storage_item.get("readOnly", False)})
  32. return result
  33. def migrate_smb_pv_pvc_type(smb_pv_pvc):
  34. smb_config = smb_pv_pvc.get("smbConfig", {})
  35. if not smb_config:
  36. raise ValueError("Expected [smb_pv_pvc] to have [smbConfig] set")
  37. return {
  38. "type": "cifs",
  39. "cifs_config": {
  40. "server": smb_config["server"],
  41. "path": smb_config["share"],
  42. "domain": smb_config.get("domain", ""),
  43. "username": smb_config["username"],
  44. "password": smb_config["password"],
  45. },
  46. }
  47. def migrate_empty_dir_type(empty_dir):
  48. empty_dir_config = empty_dir.get("emptyDirConfig", {})
  49. if not empty_dir_config:
  50. raise ValueError("Expected [empty_dir] to have [emptyDirConfig] set")
  51. if empty_dir_config.get("medium", "") == "Memory":
  52. # Convert Gi to Mi
  53. size = empty_dir_config.get("size", 0.5) * 1024
  54. return {
  55. "type": "tmpfs",
  56. "tmpfs_config": {"size": size},
  57. }
  58. return {"type": "temporary"}
  59. def migrate_old_ix_volume_type(ix_volume):
  60. if not ix_volume.get("datasetName"):
  61. raise ValueError("Expected [ix_volume] to have [datasetName] set")
  62. return {
  63. "type": "ix_volume",
  64. "ix_volume_config": {
  65. "acl_enable": False,
  66. "dataset_name": ix_volume["datasetName"],
  67. },
  68. }
  69. def migrate_ix_volume_type(ix_volume):
  70. vol_config = ix_volume.get("ixVolumeConfig", {})
  71. if not vol_config:
  72. raise ValueError("Expected [ix_volume] to have [ixVolumeConfig] set")
  73. result = {
  74. "type": "ix_volume",
  75. "ix_volume_config": {
  76. "acl_enable": vol_config.get("aclEnable", False),
  77. "dataset_name": vol_config.get("datasetName", ""),
  78. },
  79. }
  80. if vol_config.get("aclEnable", False):
  81. result["ix_volume_config"].update(
  82. {"acl_entries": migrate_acl_entries(vol_config["aclEntries"])}
  83. )
  84. return result
  85. def migrate_old_host_path_type(host_path):
  86. if not host_path.get("hostPath"):
  87. raise ValueError("Expected [host_path] to have [hostPath] set")
  88. return {
  89. "type": "host_path",
  90. "host_path_config": {
  91. "acl_enable": False,
  92. "path": host_path["hostPath"],
  93. },
  94. }
  95. def migrate_host_path_type(host_path):
  96. path_config = host_path.get("hostPathConfig", {})
  97. if not path_config:
  98. raise ValueError("Expected [host_path] to have [hostPathConfig] set")
  99. result = {
  100. "type": "host_path",
  101. "host_path_config": {
  102. "acl_enable": path_config.get("aclEnable", False),
  103. },
  104. }
  105. if path_config.get("aclEnable", False):
  106. result["host_path_config"].update(
  107. {"acl": migrate_acl_entries(path_config.get("acl", {}))}
  108. )
  109. else:
  110. result["host_path_config"].update({"path": path_config["hostPath"]})
  111. return result
  112. def migrate_acl_entries(acl_entries: dict) -> dict:
  113. entries = []
  114. for entry in acl_entries.get("entries", []):
  115. entries.append(
  116. {
  117. "access": entry["access"],
  118. "id": entry["id"],
  119. "id_type": entry["id_type"],
  120. }
  121. )
  122. return {
  123. "entries": entries,
  124. "options": {"force": acl_entries.get("options", {}).get("force", False)},
  125. "path": acl_entries["path"],
  126. }