KYLIN-6037 migrate upgrade metadata to system metadata

Co-authored-by: xuchen.xia <xuchen.xia@kyligence.io>
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 4cfbc62..a11e44d 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -81,6 +81,11 @@
     public static final String METASTORE_IMAGE_META_KEY_TAG = "_image";
     public static final String METASTORE_IMAGE = mergeKeyWithType(METASTORE_IMAGE_META_KEY_TAG, MetadataType.SYSTEM);
     public static final String METASTORE_UUID_META_KEY_TAG = "UUID";
+    // if KE ever upgraded from 4.1,
+    // there is a record with data-permission-separate in backup core-meta/_global/upgrade dir.
+    public static final String UPGRADE_META_KEY_PREFIX = "/_global/upgrade/";
+    // for data permission separate
+    public static final String UPGRADE_META_KEY_TAG = "acl_version";
     public static final String METASTORE_UUID_TAG = mergeKeyWithType(METASTORE_UUID_META_KEY_TAG, MetadataType.SYSTEM);
     public static final String METASTORE_TRASH_RECORD_KEY = "trash_record";
     public static final String METASTORE_TRASH_RECORD = mergeKeyWithType(METASTORE_TRASH_RECORD_KEY,
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/MigrateKEMetadataTool.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/MigrateKEMetadataTool.java
index 81b087e..b519528 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/MigrateKEMetadataTool.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/MigrateKEMetadataTool.java
@@ -20,6 +20,8 @@
 import static org.apache.kylin.common.persistence.ResourceStore.METASTORE_IMAGE_META_KEY_TAG;
 import static org.apache.kylin.common.persistence.ResourceStore.METASTORE_UUID_META_KEY_TAG;
 import static org.apache.kylin.common.persistence.ResourceStore.REC_FILE;
+import static org.apache.kylin.common.persistence.ResourceStore.UPGRADE_META_KEY_PREFIX;
+import static org.apache.kylin.common.persistence.ResourceStore.UPGRADE_META_KEY_TAG;
 import static org.apache.kylin.common.persistence.ResourceStore.VERSION_FILE_META_KEY_TAG;
 import static org.apache.kylin.common.persistence.metadata.FileSystemMetadataStore.Type.DIR;
 
@@ -104,6 +106,7 @@
 
     public static final String ZIP_SUFFIX = ".zip";
     public static final String PROJECT_KEY = "project";
+    public static final String SEPRATOR = "/";
     public final Map<String, Map<String, String>> modelUuidMap = new ConcurrentHashMap<>();
     private final Map<String, Map<String, String>> segUuidMap = new ConcurrentHashMap<>();
 
@@ -238,7 +241,7 @@
         }
     }
 
-    private RawResource loadByStream(String resourcePath, long ts, MetadataStore.MemoryMetaData data, DataInputStream in)
+    RawResource loadByStream(String resourcePath, long ts, MetadataStore.MemoryMetaData data, DataInputStream in)
             throws IOException {
         if (in.available() == 0) {
             return null;
@@ -251,9 +254,11 @@
             return factory.createSystemRawResource(VERSION_FILE_META_KEY_TAG, ts, in);
         } else if (resourcePath.endsWith(METASTORE_IMAGE_META_KEY_TAG)) {
             return factory.createSystemRawResource(METASTORE_IMAGE_META_KEY_TAG, ts, in);
+        } else if (resourcePath.startsWith(UPGRADE_META_KEY_PREFIX)) {
+            return factory.createSystemRawResource(UPGRADE_META_KEY_TAG, ts, in);
         } else if (resourcePath.contains(ResourceStore.METASTORE_TRASH_RECORD_KEY)) {
             return null;
-        } else if (resourcePath.contains("/" + REC_FILE + "/")) {
+        } else if (resourcePath.contains(SEPRATOR + REC_FILE + SEPRATOR)) {
             return factory.createRecResource(resourcePath, ts, in);
         } else {
             byte[] byteArray = IOUtils.toByteArray(in);
@@ -307,26 +312,26 @@
      * @return The tuple of project, type and metaKey
      */
     public static Tuple3<String, MetadataType, String> splitFilePath(String resourcePath) {
-        if ("/".equals(resourcePath)) {
+        if (SEPRATOR.equals(resourcePath)) {
             return new Tuple3<>(resourcePath, MetadataType.ALL, null);
-        } else if (resourcePath.startsWith("/") && resourcePath.length() > 1) {
+        } else if (resourcePath.startsWith(SEPRATOR) && resourcePath.length() > 1) {
             resourcePath = resourcePath.substring(1);
         }
 
         if (resourcePath.contains("_global/sys_acl/user")) {
-            String[] split = resourcePath.split("/");
+            String[] split = resourcePath.split(SEPRATOR);
             return new Tuple3<>(split[0], MetadataType.USER_GLOBAL_ACL, split[3]);
         } else if (resourcePath.contains("_global/acl")) {
-            String[] split = resourcePath.split("/");
+            String[] split = resourcePath.split(SEPRATOR);
             return new Tuple3<>(split[0], MetadataType.OBJECT_ACL, split[2]);
         } else if (resourcePath.contains("/acl/user")) {
-            String[] split = resourcePath.split("/");
+            String[] split = resourcePath.split(SEPRATOR);
             return new Tuple3<>(split[0], MetadataType.ACL, split[0] + "." + "u" + "." + split[3]);
         } else if (resourcePath.contains("/acl/group")) {
-            String[] split = resourcePath.split("/");
+            String[] split = resourcePath.split(SEPRATOR);
             return new Tuple3<>(split[0], MetadataType.ACL, split[0] + "." + "g" + "." + split[3]);
         }
-        String[] split = resourcePath.split("/", 3);
+        String[] split = resourcePath.split(SEPRATOR, 3);
         if (split.length < 3) {
             throw new KylinRuntimeException("resourcePath is invalid: " + resourcePath);
         }
@@ -403,7 +408,7 @@
         KylinConfig inputConfig = KylinConfig.getInstanceFromEnv();
         KapConfig kapConf = KapConfig.wrap(inputConfig);
         if (inputPath != null) {
-            inputPath = StringUtils.appendIfMissing(inputPath, "/");
+            inputPath = StringUtils.appendIfMissing(inputPath, SEPRATOR);
         } else {
             inputPath = inputConfig.getMetadataUrl().getIdentifier();
         }
@@ -490,7 +495,7 @@
         if (outputPath.endsWith(".zip")) {
             // metnauadata_url doesn't support zip file
             outputZipFile = outputPath;
-            outputPath = outputPath.substring(0, outputPath.lastIndexOf("/"));
+            outputPath = outputPath.substring(0, outputPath.lastIndexOf(SEPRATOR));
         }
 
         Path metadataPath = new Path(outputPath);
@@ -690,6 +695,13 @@
                 StringEntity versionEntity = new StringEntity(VERSION_FILE_META_KEY_TAG, version);
                 content = JsonUtil.writeValueAsIndentBytes(versionEntity);
                 break;
+            case UPGRADE_META_KEY_TAG:
+                Map<String, String> aclVersionMap = JsonUtil
+                        .readValueAsMap(IOUtils.toString(in, StandardCharsets.UTF_8).trim());
+                // add a name field to compatible with system table
+                aclVersionMap.put("name", tagName);
+                content = JsonUtil.writeValueAsIndentBytes(aclVersionMap);
+                break;
             default:
                 break;
             }
@@ -703,10 +715,10 @@
         }
 
         public RawResource createRecResource(String resPath, long ts, DataInputStream in) throws IOException {
-            if (resPath.startsWith("/")) {
+            if (resPath.startsWith(SEPRATOR)) {
                 resPath = resPath.substring(1);
             }
-            String[] split = resPath.split("/");
+            String[] split = resPath.split(SEPRATOR);
             String project = split[0];
             String uuid = split[2].replace(FileSystemMetadataStore.JSON_SUFFIX, "");
             byte[] byteArray = IOUtils.toByteArray(in);
@@ -817,8 +829,8 @@
         private RawResource createLayoutResource(Tuple3<String, MetadataType, String> resPathPair, byte[] byteArray)
                 throws IOException {
             String proj = resPathPair._1();
-            String dataflowUuid = resPathPair._3().split("/")[0];
-            String layoutUuid = resPathPair._3().split("/")[1].replace(".json", "");
+            String dataflowUuid = resPathPair._3().split(SEPRATOR)[0];
+            String layoutUuid = resPathPair._3().split(SEPRATOR)[1].replace(".json", "");
             if (KylinConfig.getInstanceFromEnv().isUTEnv()) {
                 String uniqueModelUuid = getUniqueUuid(modelUuidMap, proj, dataflowUuid);
                 String uniqueSegUuid = getUniqueUuid(segUuidMap, uniqueModelUuid, layoutUuid);
@@ -900,7 +912,7 @@
     }
 
     public static boolean verifyNonMetadataFile(String resourcePath) {
-        if (resourcePath.startsWith("/")) {
+        if (resourcePath.startsWith(SEPRATOR)) {
             resourcePath = resourcePath.substring(1);
         }
         String[] nonMetadata = new String[] { "kylin.properties", ".DS_Store" };
@@ -917,12 +929,12 @@
             }
         }
         for (String s : metadataShouldBeIgnored) {
-            if (s.startsWith("/") ? resourcePath.contains(s) : resourcePath.startsWith(s)) {
+            if (s.startsWith(SEPRATOR) ? resourcePath.contains(s) : resourcePath.startsWith(s)) {
                 return true;
             }
         }
         for (String s : metadataWithoutJsonPostfix) {
-            if (s.startsWith("/") ? resourcePath.contains(s) : resourcePath.startsWith(s)) {
+            if (s.startsWith(SEPRATOR) ? resourcePath.contains(s) : resourcePath.startsWith(s)) {
                 return false;
             }
         }
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/MigrateKEMetadataToolTest.java b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/MigrateKEMetadataToolTest.java
new file mode 100644
index 0000000..a9f9393
--- /dev/null
+++ b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/MigrateKEMetadataToolTest.java
@@ -0,0 +1,61 @@
+package org.apache.kylin.common.persistence.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.resources.SystemRawResource;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.junit.annotation.JdbcMetadataInfo;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+@MetadataInfo
+@JdbcMetadataInfo
+class MigrateKEMetadataToolTest {
+
+    @Test
+    void testUpgradeMetadata2System() throws IOException {
+        final Charset ENCODING = StandardCharsets.UTF_8;
+
+        // UPGRADE Metadata which refer to GlobalAclVersion
+        Map<String, String> originMap = new HashMap<>();
+        originMap.put("uuid", "e6fca42f-032e-7b75-5f66-21971a842cda");
+        originMap.put("last_modified", "0");
+        originMap.put("create_time", "1732269833897");
+        originMap.put("version", "4.0.0.0");
+        originMap.put("acl_version", "data-permission-separate");
+        String upgradeMetadataString = JsonUtil.writeValueAsString(originMap);
+        DataInputStream dataInputStream = new DataInputStream(
+                new ByteArrayInputStream(upgradeMetadataString.getBytes(ENCODING)));
+        MigrateKEMetadataTool migrateKEMetadataTool = new MigrateKEMetadataTool();
+        MetadataStore.MemoryMetaData data = Mockito.mock(MetadataStore.MemoryMetaData.class);
+        // test loadByStream to convert UPGRADE metadata to SYSTEM metadata
+        RawResource rawResource = migrateKEMetadataTool.loadByStream("/_global/upgrade/acl_version.json", 0, data,
+                dataInputStream);
+        Assertions.assertInstanceOf(SystemRawResource.class, rawResource);
+        Map<String, String> aclVersionMap = JsonUtil
+                .readValueAsMap(new String(rawResource.getContent(), ENCODING).trim());
+        // check the name to fit SYSTEM metadata
+        Assertions.assertEquals(ResourceStore.UPGRADE_META_KEY_TAG, aclVersionMap.get("name"));
+        for (String key : originMap.keySet()) {
+            Assertions.assertEquals(originMap.get(key), aclVersionMap.get(key));
+        }
+        // check metadata could be put into store to fit db schema
+        UnitOfWork.doInTransactionWithRetry(() -> {
+            ResourceStore resourceStore = ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv());
+            resourceStore.checkAndPutResource("SYSTEM/acl_version", rawResource.getByteSource(), -1);
+            return null;
+        }, "restore");
+    }
+}
\ No newline at end of file
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/upgrade/GlobalAclVersion.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/upgrade/GlobalAclVersion.java
index 7624274..3b28355 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/upgrade/GlobalAclVersion.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/upgrade/GlobalAclVersion.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.metadata.upgrade;
 
 import org.apache.kylin.common.persistence.MetadataType;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -33,11 +34,15 @@
 @NoArgsConstructor
 public class GlobalAclVersion extends RootPersistentEntity {
     public static final String DATA_PERMISSION_SEPARATE = "data-permission-separate";
-    public static final String VERSION_KEY_NAME = "acl_version";
+
+    public static final String VERSION_KEY_NAME = ResourceStore.UPGRADE_META_KEY_TAG;
 
     @JsonProperty("acl_version")
     private String aclVersion;
 
+    @JsonProperty("name")
+    private String name = VERSION_KEY_NAME;
+
     @Override
     public String resourceName() {
         return VERSION_KEY_NAME;
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/upgrade/GlobalAclVersionManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/upgrade/GlobalAclVersionManagerTest.java
index f532482..eef9a72 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/upgrade/GlobalAclVersionManagerTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/upgrade/GlobalAclVersionManagerTest.java
@@ -17,36 +17,48 @@
  */
 package org.apache.kylin.metadata.upgrade;
 
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
-import org.junit.After;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.util.TestUtils;
+import org.apache.kylin.junit.annotation.JdbcMetadataInfo;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.rest.aspect.Transaction;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-public class GlobalAclVersionManagerTest extends NLocalFileMetadataTestCase {
-
-    @Before
-    public void setup() {
-        createTestMetadata();
-    }
-
-    @After
-    public void tearDown() {
-        cleanupTestMetadata();
-    }
+@MetadataInfo(onlyProps = true)
+@JdbcMetadataInfo
+public class GlobalAclVersionManagerTest {
 
     @Test
+    @Transaction
     public void testBasic() {
         GlobalAclVersion globalAclVersion = new GlobalAclVersion();
         globalAclVersion.setAclVersion(GlobalAclVersion.DATA_PERMISSION_SEPARATE);
         Assert.assertEquals("SYSTEM/acl_version", globalAclVersion.getResourcePath());
-        GlobalAclVersionManager manager = new GlobalAclVersionManager(getTestConfig());
-        manager.delete();
+        GlobalAclVersionManager manager = new GlobalAclVersionManager(TestUtils.getTestConfig());
+        UnitOfWork.doInTransactionWithRetry(() -> {
+            GlobalAclVersionManager m = new GlobalAclVersionManager(TestUtils.getTestConfig());
+            m.delete();
+            return null;
+        }, "delete");
         Assert.assertFalse(manager.exists());
-        manager.save(globalAclVersion);
+        UnitOfWork.doInTransactionWithRetry(() -> {
+            GlobalAclVersionManager m = new GlobalAclVersionManager(TestUtils.getTestConfig());
+            m.save(globalAclVersion);
+            return null;
+        }, "save");
         Assert.assertTrue(manager.exists());
-        manager.save(globalAclVersion);
-        manager.delete();
+        UnitOfWork.doInTransactionWithRetry(() -> {
+            GlobalAclVersionManager m = new GlobalAclVersionManager(TestUtils.getTestConfig());
+            m.save(globalAclVersion);
+            return null;
+        }, "save");
+        Assert.assertTrue(manager.exists());
+        UnitOfWork.doInTransactionWithRetry(() -> {
+            GlobalAclVersionManager m = new GlobalAclVersionManager(TestUtils.getTestConfig());
+            m.delete();
+            return null;
+        }, "delete");
         Assert.assertFalse(manager.exists());
     }
 }