[ASTERIXDB-3196][*DB] Cluster state for compute-storage separation

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Implement changes required to drive cluster state based on
  compute-storage partitions map.
- Persist index checkpoints to cloud storage.
- Remove eager caching from NC startup tasks.
- Fixes for static data partitioning.

Change-Id: I217da04d06884d841c4a56aee3ab9815cc659de7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17553
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index f60ed63..7f0b529 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -204,8 +204,7 @@
                 storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages());
         lsmIOScheduler = createIoScheduler(storageProperties);
         metadataMergePolicyFactory = new ConcurrentMergePolicyFactory();
-        // TODO do we want to write checkpoints for cloud?
-        indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager);
+        indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(persistenceIOManager);
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
                 new PersistentLocalResourceRepositoryFactory(persistenceIOManager, indexCheckpointManagerProvider,
                         persistedResourceRegistry);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 4f9613d..449dd27 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -35,7 +35,6 @@
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
-import org.apache.asterix.app.nc.task.CloudToLocalStorageCachingTask;
 import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
 import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
@@ -52,7 +51,6 @@
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
@@ -222,11 +220,6 @@
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
         int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
         tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
-
-        if (((ICcApplicationContext) (serviceContext.getControllerService()).getApplicationContext())
-                .isCloudDeployment()) {
-            tasks.add(new CloudToLocalStorageCachingTask(activePartitions));
-        }
         if (state == SystemState.CORRUPTED) {
             // need to perform local recovery for node active partitions
             LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 3d0fee8..5f714f4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -286,4 +286,10 @@
      * @return the count of storage partitions
      */
     int getStoragePartitionsCount();
+
+    /**
+     * Sets the compute-storage partitions map
+     * @param map
+     */
+    void setComputeStoragePartitionsMap(StorageComputePartitionsMap map);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
index 48b2ea1..6561d05 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -23,6 +23,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class StorageComputePartitionsMap {
 
@@ -53,9 +55,10 @@
             }
         }
         int[][] computerToStoArray = new int[computeToStoragePartitions.size()][];
+        int partitionIdx = 0;
         for (Map.Entry<Integer, List<Integer>> integerListEntry : computeToStoragePartitions.entrySet()) {
-            computerToStoArray[integerListEntry.getKey()] =
-                    integerListEntry.getValue().stream().mapToInt(i -> i).toArray();
+            computerToStoArray[partitionIdx] = integerListEntry.getValue().stream().mapToInt(i -> i).toArray();
+            partitionIdx++;
         }
         return computerToStoArray;
     }
@@ -94,4 +97,8 @@
         }
         return newMap;
     }
+
+    public Set<String> getComputeNodes() {
+        return stoToComputeLocation.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b07a03e..ea1b9e6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,7 +32,6 @@
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -891,13 +890,7 @@
         } else {
             numElementsHint = Long.parseLong(numElementsHintString);
         }
-        int numPartitions = 0;
-        List<String> nodeGroup =
-                MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-        for (String nd : nodeGroup) {
-            numPartitions += csm.getNodePartitionsCount(nd);
-        }
+        int numPartitions = getPartitioningProperties(dataset).getNumberOfPartitions();
         return numElementsHint / numPartitions;
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 92ea173..0128478 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -30,6 +30,7 @@
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -155,7 +156,16 @@
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
-            activateNodePartitions(nodeId, activePartitions);
+            if (appCtx.isCloudDeployment()) {
+                // node compute partitions never change
+                ClusterPartition[] nodePartitions = getNodePartitions(nodeId);
+                activePartitions =
+                        Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+                activateNodePartitions(nodeId, activePartitions);
+            } else {
+                activateNodePartitions(nodeId, activePartitions);
+            }
+
         } else {
             participantNodes.remove(nodeId);
             deactivateNodePartitions(nodeId);
@@ -183,16 +193,7 @@
             return;
         }
         resetClusterPartitionConstraint();
-        // if the cluster has no registered partitions or all partitions are pending activation -> UNUSABLE
-        if (clusterPartitions.isEmpty()
-                || clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation)) {
-            LOGGER.info("Cluster does not have any registered partitions");
-            setState(ClusterState.UNUSABLE);
-            return;
-        }
-
-        // exclude partitions that are pending activation
-        if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) {
+        if (isClusterUnusable()) {
             setState(ClusterState.UNUSABLE);
             return;
         }
@@ -310,9 +311,7 @@
         clusterActiveLocations.removeAll(pendingRemoval);
         clusterPartitionConstraint =
                 new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {}));
-        if (appCtx.getStorageProperties().getPartitioningScheme() == PartitioningScheme.STATIC) {
-            storageComputePartitionsMap = StorageComputePartitionsMap.computePartitionsMap(this);
-        }
+        resetStorageComputeMap();
     }
 
     @Override
@@ -512,6 +511,11 @@
         return storageComputePartitionsMap;
     }
 
+    @Override
+    public synchronized void setComputeStoragePartitionsMap(StorageComputePartitionsMap map) {
+        this.storageComputePartitionsMap = map;
+    }
+
     private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) {
         final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
         resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
@@ -543,6 +547,36 @@
                         false));
     }
 
+    private synchronized boolean isClusterUnusable() {
+        // if the cluster has no registered partitions or all partitions are pending activation -> UNUSABLE
+        if (clusterPartitions.isEmpty()
+                || clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation)) {
+            LOGGER.info("Cluster does not have any registered partitions");
+            return true;
+        }
+        if (appCtx.isCloudDeployment() && storageComputePartitionsMap != null) {
+            Set<String> computeNodes = storageComputePartitionsMap.getComputeNodes();
+            if (!participantNodes.containsAll(computeNodes)) {
+                LOGGER.info("Cluster missing compute nodes; required {}, current {}", computeNodes, participantNodes);
+                return true;
+            }
+        } else {
+            // exclude partitions that are pending activation
+            if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private synchronized void resetStorageComputeMap() {
+        if (storageComputePartitionsMap == null
+                && appCtx.getStorageProperties().getPartitioningScheme() == PartitioningScheme.STATIC
+                && !isClusterUnusable()) {
+            storageComputePartitionsMap = StorageComputePartitionsMap.computePartitionsMap(this);
+        }
+    }
+
     private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) {
         final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId);
         if (ncConfig == null) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 080204e..25b9610 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -646,8 +646,8 @@
     public synchronized List<FileReference> getOnDiskPartitions() throws HyracksDataException {
         List<FileReference> onDiskPartitions = new ArrayList<>();
         for (FileReference root : storageRoots) {
-            Collection<FileReference> partitions = ioManager.list(root,
-                    (dir, name) -> dir.isDirectory() && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
+            Collection<FileReference> partitions = ioManager.list(root, (dir, name) -> dir != null && dir.isDirectory()
+                    && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
             if (partitions != null) {
                 onDiskPartitions.addAll(partitions);
             }