HELIX-705: create interfaces and interactions among HelixViewAggregator components
RB=1205300
BUG=HELIX-705
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang,jxue
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
new file mode 100644
index 0000000..ff24211
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
@@ -0,0 +1,124 @@
+package org.apache.helix.api.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.PropertyType;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Represents source physical cluster information for view cluster
+ */
+public class ViewClusterSourceConfig {
+
+ private static final List<PropertyType> _validPropertyTypes = Collections.unmodifiableList(Arrays
+ .asList(new PropertyType[] { PropertyType.INSTANCES, PropertyType.EXTERNALVIEW,
+ PropertyType.LIVEINSTANCES
+ }));
+
+ private static ObjectMapper _objectMapper = new ObjectMapper();
+
+ @JsonProperty("name")
+ private String _name;
+
+ @JsonProperty("zkAddress")
+ String _zkAddress;
+
+ @JsonProperty("properties")
+ private List<PropertyType> _properties;
+
+ private ViewClusterSourceConfig() {
+ }
+
+ public ViewClusterSourceConfig(String name, String zkAddress, List<PropertyType> properties) {
+ _name = name;
+ _zkAddress = zkAddress;
+ _properties = properties;
+ }
+
+ public ViewClusterSourceConfig(ViewClusterSourceConfig config) {
+ this(config.getName(), config.getZkAddress(), new ArrayList<>(config.getProperties()));
+ }
+
+ public void setName(String name) {
+ _name = name;
+ }
+
+ public void setZkAddress(String zkAddress) {
+ _zkAddress = zkAddress;
+ }
+
+ public void setProperties(List<PropertyType> properties) {
+ for (PropertyType p : properties) {
+ if (!_validPropertyTypes.contains(p)) {
+ throw new IllegalArgumentException(
+ String.format("Property %s is not support in ViewCluster yet.", p));
+ }
+ }
+ _properties = properties;
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public String getZkAddress() {
+ return _zkAddress;
+ }
+
+ public List<PropertyType> getProperties() {
+ return _properties;
+ }
+
+ public String toJson() throws IOException {
+ return new ObjectMapper().writeValueAsString(this);
+ }
+
+ public String toString() {
+ return String.format("name=%s; zkAddr=%s; properties=%s", _name, _zkAddress, _properties);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof ViewClusterSourceConfig)) {
+ return false;
+ }
+ ViewClusterSourceConfig otherConfig = (ViewClusterSourceConfig) other;
+
+ return _name.equals(otherConfig.getName()) && _zkAddress.equals(otherConfig.getZkAddress())
+ && _properties.containsAll(otherConfig.getProperties()) && otherConfig.getProperties()
+ .containsAll(_properties);
+
+ }
+
+ public static ViewClusterSourceConfig fromJson(String jsonString) {
+ try {
+ return _objectMapper.readValue(jsonString, ViewClusterSourceConfig.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Invalid Json: %s, Exception: %s", jsonString, e.toString()));
+ }
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 65f6bb4..12dc758 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -39,6 +39,7 @@
OnDemandRebalance,
ControllerChange,
RetryRebalance,
+ ViewClusterPeriodicRefresh,
StateVerifier,
Unknown
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 72bc5dc..0456458 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -36,6 +36,7 @@
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.util.ConfigStringUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
/**
* Cluster configurations
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index 7de552e..e60b7ce 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -31,6 +31,7 @@
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskPartitionState;
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
new file mode 100644
index 0000000..b361ea4
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -0,0 +1,237 @@
+package org.apache.helix.view.aggregator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.view.common.ViewAggregatorEventAttributes;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.dataprovider.ViewClusterConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main logic for Helix view aggregator
+ */
+public class HelixViewAggregator {
+ private static final Logger logger = LoggerFactory.getLogger(HelixViewAggregator.class);
+ private final String _viewClusterName;
+ private final HelixManager _viewClusterManager;
+ private ViewAggregationWorker _aggregationWorker;
+ private ViewConfigWorker _viewConfigWorker;
+ private long _lastViewClusterRefreshTimestampMs;
+ private Map<String, SourceClusterDataProvider> _dataProviderMap;
+ private ViewClusterConfigProvider _viewClusterConfigProvider;
+ private List<ViewClusterSourceConfig> _sourceConfigs;
+ private long _refreshPeriodMs;
+ private Timer _viewClusterRefreshTimer;
+ private ViewClusterRefresher _viewClusterRefresher;
+
+ public HelixViewAggregator(String viewClusterName, String zkAddr) {
+ _viewClusterName = viewClusterName;
+ _lastViewClusterRefreshTimestampMs = 0L;
+ _refreshPeriodMs = -1L;
+ _sourceConfigs = new ArrayList<>();
+ _dataProviderMap = new HashMap<>();
+ _viewClusterManager = HelixManagerFactory
+ .getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
+ InstanceType.SPECTATOR, zkAddr);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ }));
+ }
+
+ /**
+ * Start controller main logic
+ * @throws Exception
+ */
+ public void start() throws Exception {
+ try {
+ _viewClusterManager.connect();
+ } catch (Exception e) {
+ throw new HelixException("Failed to connect view cluster helix manager", e);
+ }
+
+ // set up view cluster refresher
+ _viewClusterRefresher =
+ new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor(),
+ _dataProviderMap);
+
+ // Start workers
+ _aggregationWorker = new ViewAggregationWorker();
+ _aggregationWorker.start();
+ _viewConfigWorker = new ViewConfigWorker();
+ _viewConfigWorker.start();
+
+ // Start cluster config provider
+ _viewClusterConfigProvider =
+ new ViewClusterConfigProvider(_viewClusterName, _viewClusterManager, _viewConfigWorker);
+ _viewClusterConfigProvider.setup();
+ }
+
+ /**
+ * Process view cluster config change events
+ */
+ private class ViewConfigWorker extends ClusterEventProcessor {
+ public ViewConfigWorker() {
+ super(_viewClusterName, "ViewConfigWorker");
+ }
+ @Override
+ public void handleEvent(ClusterEvent event) {
+ logger.info("Processing event " + event.getEventType().name());
+ switch (event.getEventType()) {
+ case ClusterConfigChange:
+ processViewClusterConfigUpdate();
+ break;
+ default:
+ logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
+ }
+ }
+ }
+
+ /**
+ * Process source cluster data change events and view cluster periodic refresh events
+ */
+ private class ViewAggregationWorker extends ClusterEventProcessor {
+ private boolean _shouldRefresh;
+
+ public ViewAggregationWorker() {
+ super(_viewClusterName, "ViewAggregationWorker");
+ _shouldRefresh = false;
+ }
+
+ @Override
+ public void handleEvent(ClusterEvent event) {
+ logger.info("Processing event " + event.getEventType().name());
+
+ switch (event.getEventType()) {
+ case LiveInstanceChange:
+ case InstanceConfigChange:
+ case ExternalViewChange:
+ _shouldRefresh = true;
+ break;
+ case ViewClusterPeriodicRefresh:
+ Boolean forceRefresh =
+ event.getAttribute(ViewAggregatorEventAttributes.ViewClusterForceRefresh.name());
+ if (!forceRefresh && !_shouldRefresh) {
+ logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
+ return;
+ }
+
+ // mark source cluster as changed to trigger next refresh as we failed to refresh at
+ // least some of the elements in view cluster
+ logger.info("Refreshing cluster based on event " + event.getEventType().name());
+ _shouldRefresh = refreshViewCluster();
+ break;
+ default:
+ logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
+ }
+ }
+ }
+
+ private class RefreshViewClusterTask extends TimerTask {
+ @Override
+ public void run() {
+ triggerViewClusterRefresh(false);
+ }
+ }
+
+ public void shutdown() {
+ if (_viewClusterManager != null) {
+ logger.info("Shutting down view cluster helix manager");
+ _viewClusterManager.disconnect();
+ }
+
+ if (_viewClusterRefreshTimer != null) {
+ logger.info("Shutting down view cluster refresh timer");
+ _viewClusterRefreshTimer.cancel();
+ }
+
+ for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
+ logger
+ .info(String.format("Shutting data provider for source cluster %s", provider.getName()));
+ provider.shutdown();
+ }
+ logger.info("HelixViewAggregator shutdown cleanly");
+ }
+
+ /**
+ * Recreate timer that triggers RefreshViewClusterTask
+ */
+ private void resetTimer() {
+ // TODO: implement
+ }
+
+ /**
+ * Use ViewClusterConfigProvider (assume its up-to-date) to compute
+ * SourceClusterConfigChangeAction, based on _sourceConfigs. Use the action object to
+ * reset timer (RefreshViewClusterTask), create/delete/update SourceClusterDataProvider in
+ * data provider map and populate new _sourceConfigs
+ */
+ private synchronized void processViewClusterConfigUpdate() {
+ // TODO: implement
+ }
+
+ /**
+ * push event to worker queue to trigger refresh. Worker might not refresh view cluster
+ * if there is no event happened since last refresh
+ * @param forceRefresh
+ */
+ private void triggerViewClusterRefresh(boolean forceRefresh) {
+ ClusterEvent event = new ClusterEvent(_viewClusterName, ClusterEventType.ViewClusterPeriodicRefresh);
+ event.addAttribute(ViewAggregatorEventAttributes.ViewClusterForceRefresh.name(),
+ Boolean.valueOf(forceRefresh));
+ _aggregationWorker.queueEvent(event);
+ logger.info("Triggering view cluster refresh, forceRefresh=" + forceRefresh);
+ }
+
+ /**
+ * Use ViewClusterRefresher to refresh ViewCluster.
+ * @return true if needs retry, else false
+ */
+ private synchronized boolean refreshViewCluster() {
+ // TODO: Implement refresh logic
+ return false;
+ }
+
+ private static String generateHelixManagerInstanceName(String viewClusterName) {
+ return String.format("HelixViewAggregator-%s", viewClusterName);
+ }
+
+ private static String generateSourceClusterDataProviderMapKey(ViewClusterSourceConfig config) {
+ return String.format("%s-%s", config.getName(), config.getZkAddress());
+ }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
new file mode 100644
index 0000000..7f28441
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
@@ -0,0 +1,34 @@
+package org.apache.helix.view.aggregator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HelixViewAggregatorMain {
+ private static Logger logger = LoggerFactory.getLogger(HelixViewAggregatorMain.class);
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ // TODO: implement this function as launching HelixViewAggregator using CLI
+ }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
new file mode 100644
index 0000000..e770302
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -0,0 +1,107 @@
+package org.apache.helix.view.aggregator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains logics to refresh view cluster based on information from source cluster data providers
+ */
+public class ViewClusterRefresher {
+ private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
+ private String _viewClusterName;
+ private HelixDataAccessor _viewClusterDataAccessor;
+ private PropertyKey.Builder _viewClusterKeyBuilder;
+ private Map<String, SourceClusterDataProvider> _dataProviderMap;
+
+ // These 3 caches stores objects that are pushed to view cluster (write-through) cache,
+ // thus we don't need to read from view cluster everytime we refresh it.
+ private Map<String, LiveInstance> _viewClusterLiveInstanceCache;
+ private Map<String, InstanceConfig> _viewClusterInstanceConfigCache;
+ private Map<String, ExternalView> _viewClusterExternalViewCache;
+
+ public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor,
+ Map<String, SourceClusterDataProvider> dataProviderMap) {
+ _viewClusterName = viewClusterName;
+ _viewClusterDataAccessor = viewClusterDataAccessor;
+ _dataProviderMap = dataProviderMap;
+ _viewClusterLiveInstanceCache = new HashMap<>();
+ _viewClusterInstanceConfigCache = new HashMap<>();
+ _viewClusterExternalViewCache = new HashMap<>();
+ }
+
+ /**
+ * Create / update current live instances; remove dead live instances.
+ * This function assumes that all data providers have been refreshed.
+ */
+ public void refreshLiveInstancesInViewCluster() {
+ // TODO: implement it
+ }
+
+ /**
+ * Create / update current instance configs; remove dead instance configs
+ * This function assumes that all data providers have been refreshed.
+ */
+ public void refreshInstanceConfigsInViewCluster() {
+ // TODO: implement it
+ }
+
+ /**
+ * Create / update currently existing external views; delete outdated external views.
+ * This function assumes that all data providers have been refreshed.
+ */
+ public void refreshExtrenalViewsInViewCluster() {
+ // TODO: implement it
+ }
+
+ /**
+ * Merge external view "toMerge" into external view "source":
+ * - if partition in toMerge does not exist in source, we add it into source
+ * - if partition exist in both external views, we add all map fields from toMerge to source
+ * @param source
+ * @param toMerge
+ */
+ public static void mergeExternalViews(ExternalView source, ExternalView toMerge)
+ throws IllegalArgumentException {
+ if (!source.getId().equals(toMerge.getId())) {
+ throw new IllegalArgumentException(String
+ .format("Cannot merge ExternalViews with different ID. SourceID: %s; ToMergeID: %s",
+ source.getId(), toMerge.getId()));
+ }
+ for (String partitionName : toMerge.getPartitionSet()) {
+ if (!source.getPartitionSet().contains(partitionName)) {
+ source.setStateMap(partitionName, toMerge.getStateMap(partitionName));
+ } else {
+ Map<String, String> mergedPartitionState = source.getStateMap(partitionName);
+ mergedPartitionState.putAll(toMerge.getStateMap(partitionName));
+ source.setStateMap(partitionName, mergedPartitionState);
+ }
+ }
+ }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java
new file mode 100644
index 0000000..ca9b164
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java
@@ -0,0 +1,24 @@
+package org.apache.helix.view.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public enum ViewAggregatorEventAttributes {
+ ViewClusterForceRefresh,
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
new file mode 100644
index 0000000..add2688
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
@@ -0,0 +1,241 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
+import org.apache.helix.common.BasicClusterDataCache;
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+
+/**
+ * SourceClusterDataProvider listens to changes in 1 source cluster, notifies cluster data cache,
+ * generates event for event processor and provide methods to read data cache
+ */
+public class SourceClusterDataProvider extends BasicClusterDataCache
+ implements InstanceConfigChangeListener, LiveInstanceChangeListener,
+ ExternalViewChangeListener {
+ private HelixManager _helixManager;
+ private HelixDataAccessor _dataAccessor;
+ private PropertyKey.Builder _propertyKeyBuilder;
+ private ViewClusterSourceConfig _sourceClusterConfig;
+ private ClusterEventProcessor _eventProcessor;
+
+ public SourceClusterDataProvider(ViewClusterSourceConfig config,
+ ClusterEventProcessor eventProcessor) {
+ super(config.getName());
+ _eventProcessor = eventProcessor;
+ _sourceClusterConfig = config;
+ _helixManager = HelixManagerFactory
+ .getZKHelixManager(config.getName(), generateHelixManagerInstanceName(config.getName()),
+ InstanceType.SPECTATOR, config.getZkAddress());
+ requireFullRefreshBasedOnConfig();
+ }
+
+ public String getName() {
+ return _helixManager.getInstanceName();
+ }
+
+ public ViewClusterSourceConfig getSourceClusterConfig() {
+ return _sourceClusterConfig;
+ }
+
+ /**
+ * Set up ClusterDataProvider. After setting up, the class should start listening
+ * on change events and perform corresponding reactions
+ * @throws Exception
+ */
+ public void setup() throws Exception {
+ try {
+ LOG.info(String.format("%s setting up ...", _helixManager.getInstanceName()));
+ _helixManager.connect();
+ _helixManager.addInstanceConfigChangeListener(this);
+ _helixManager.addLiveInstanceChangeListener(this);
+ _helixManager.addExternalViewChangeListener(this);
+ _dataAccessor = _helixManager.getHelixDataAccessor();
+ _propertyKeyBuilder = _dataAccessor.keyBuilder();
+ LOG.info(String.format("%s started.", _helixManager.getInstanceName()));
+ } catch(Exception e) {
+ shutdown();
+ throw e;
+ }
+ }
+
+ public void shutdown() {
+ // Clear cache to make sure memory is released
+ for (HelixConstants.ChangeType changeType : HelixConstants.ChangeType.values()) {
+ clearCache(changeType);
+ }
+ if (_helixManager != null && _helixManager.isConnected()) {
+ _helixManager.disconnect();
+ }
+ }
+
+ public void refreshCache() {
+ refresh(_dataAccessor);
+ }
+
+ /**
+ * Re-list current instance config names. ListName is a more reliable way to find
+ * current instance config names. This is needed for ViewClusterRefresher when
+ * it is generating diffs to push to ViewCluster
+ * @return
+ */
+ public List<String> listInstanceConfigNames() {
+ return _dataAccessor.getChildNames(_propertyKeyBuilder.instanceConfigs());
+ }
+
+ /**
+ * Re-list current live instance names.
+ * @return
+ */
+ public List<String> listLiveInstanceNames() {
+ return _dataAccessor.getChildNames(_propertyKeyBuilder.liveInstances());
+ }
+
+ /**
+ * re-list external view names
+ * @return
+ */
+ public List<String> listExternalViewNames() {
+ return _dataAccessor.getChildNames(_propertyKeyBuilder.externalViews());
+ }
+
+ /**
+ * Based on current ViewClusterSourceConfig, decide whether caller should
+ * read instance config. Used by ViewClusterRefresher
+ * @return
+ */
+ public boolean shouldReadInstanceConfigs() {
+ // TODO: implement logic
+ return false;
+ }
+
+ /**
+ * Based on current ViewClusterSourceConfig, decide whether caller should
+ * read live instances. Used by ViewClusterRefresher
+ * @return
+ */
+ public boolean shouldReadLiveInstances() {
+ // TODO: implement logic
+ return false;
+ }
+
+ /**
+ * Based on current ViewClusterSourceConfig, decide whether caller should
+ * read external view. Used by ViewClusterRefresher
+ * @return
+ */
+ public boolean shouldReadExternalViews() {
+ // TODO: implement logic
+ return false;
+ }
+
+ @Override
+ @PreFetch(enabled = false)
+ public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
+ NotificationContext context) {
+ queueEventToProcessor(context, ClusterEventType.InstanceConfigChange,
+ HelixConstants.ChangeType.INSTANCE_CONFIG);
+ }
+
+ @Override
+ @PreFetch(enabled = false)
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+ NotificationContext changeContext) {
+ queueEventToProcessor(changeContext, ClusterEventType.LiveInstanceChange,
+ HelixConstants.ChangeType.LIVE_INSTANCE);
+ }
+
+ @Override
+ @PreFetch(enabled = false)
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext) {
+ queueEventToProcessor(changeContext, ClusterEventType.ExternalViewChange,
+ HelixConstants.ChangeType.EXTERNAL_VIEW);
+ }
+
+ private void queueEventToProcessor(NotificationContext context,
+ ClusterEventType clusterEventType, HelixConstants.ChangeType cacheChangeType)
+ throws IllegalStateException {
+ if (!shouldProcessEvent(cacheChangeType)) {
+ LOG.info(String.format(
+ "Skip processing event based on ViewClusterSourceConfig: ClusterName=%s; ClusterEventType=%s, ChangeType=%s",
+ _clusterName, clusterEventType, cacheChangeType));
+ return;
+ }
+ ClusterEvent event = new ClusterEvent(_clusterName, clusterEventType);
+ if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
+ notifyDataChange(cacheChangeType);
+ _eventProcessor.queueEvent(event);
+ } else {
+ LOG.info(String.format("SourceClusterDataProvider: skip queuing event %s", event));
+ }
+ }
+
+ /**
+ * Replace current source cluster config properties (A list of PropertyType we should aggregate)
+ * with the given ones, and update corresponding provider mechanisms
+ * @param properties
+ */
+ public synchronized void setSourceClusterConfigProperty(List<PropertyType> properties) {
+ // TODO: implement logic
+ }
+
+ /**
+ * Based on current ViewClusterSourceConfig, notify cache accordingly.
+ */
+ private void requireFullRefreshBasedOnConfig() {
+ // TODO: implement logic
+ }
+
+ /**
+ * Check source cluster config and decide whether this event should be processed
+ * @param sourceClusterChangeType
+ * @return
+ */
+ private synchronized boolean shouldProcessEvent(
+ HelixConstants.ChangeType sourceClusterChangeType) {
+ // TODO: implement
+ return false;
+ }
+
+ private static String generateHelixManagerInstanceName(String clusterName) {
+ return String.format("SourceClusterSpectatorHelixManager-%s", clusterName);
+ }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterConfigProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterConfigProvider.java
new file mode 100644
index 0000000..d7281c2
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterConfigProvider.java
@@ -0,0 +1,138 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.api.listeners.ClusterConfigChangeListener;
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.model.ClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class keeps an updated version of cluster config of the view cluster, notifies the event
+ * processor it is designated about view cluster config change, and provides methods to help adjust
+ * cluster settings.
+ */
+public class ViewClusterConfigProvider implements ClusterConfigChangeListener {
+ private static final Logger logger = LoggerFactory.getLogger(ViewClusterConfigProvider.class);
+ private String _viewClusterName;
+ private HelixManager _helixManager;
+ protected ClusterConfig _viewClusterConfig;
+ private ClusterEventProcessor _eventProcessor;
+
+ public ViewClusterConfigProvider(String clusterName, HelixManager manager,
+ ClusterEventProcessor eventProcessor) {
+ _viewClusterName = clusterName;
+ _helixManager = manager;
+ _eventProcessor = eventProcessor;
+ }
+
+ /**
+ * Set up ViewClusterConfigProvider. After setting up, the class should start listening
+ * on change events and perform corresponding reactions
+ */
+ public void setup() {
+ logger.info("Setting up ViewClusterConfigProvider for view cluster %s ...", _viewClusterName);
+ try {
+ _helixManager.addClusterfigChangeListener(this);
+ } catch (Exception e) {
+ throw new HelixException(
+ "ViewClusterConfigProvider: Failed to attach listeners to HelixManager!", e);
+ }
+ }
+
+ /**
+ * This class takes a existing list of source cluster configs and generate lists of source
+ * cluster configs to add / delete / modify, and tell caller if ViewClusterRefreshTimer should
+ * be reset or not
+ */
+ public class SourceClusterConfigChangeAction {
+ private List<ViewClusterSourceConfig> _oldConfigList;
+ private long _oldRefreshPeriodMs;
+
+ private List<ViewClusterSourceConfig> _toAdd;
+ private List<ViewClusterSourceConfig> _toDelete;
+ private List<ViewClusterSourceConfig> _toModify;
+ private boolean _refreshPeriodChanged;
+
+ public SourceClusterConfigChangeAction(List<ViewClusterSourceConfig> oldConfigList,
+ long oldRefreshPeriodMs) {
+ _oldConfigList = oldConfigList;
+ _oldRefreshPeriodMs = oldRefreshPeriodMs;
+ }
+
+ /**
+ * Compute actions and generate toAdd, toDelete toModify, and refreshPeriodChanged.
+ */
+ public void computeAction() {
+ // TODO: implement logic
+ }
+
+ public List<ViewClusterSourceConfig> getConfigsToAdd() {
+ return _toAdd;
+ }
+
+ public List<ViewClusterSourceConfig> getConfigsToDelete() {
+ return _toDelete;
+ }
+
+ public List<ViewClusterSourceConfig> getConfigsToModify() {
+ return _toModify;
+ }
+
+ public boolean shouldResetTimer() {
+ return _refreshPeriodChanged;
+ }
+ }
+
+ public synchronized long getViewClusterRefreshPeriodMs() {
+ return _viewClusterConfig.getViewClusterRefershPeriod() * 1000;
+ }
+
+ public synchronized SourceClusterConfigChangeAction getSourceConfigChangeAction(
+ List<ViewClusterSourceConfig> oldConfigList, long oldRefreshPeriodMs) {
+ return new SourceClusterConfigChangeAction(oldConfigList, oldRefreshPeriodMs);
+ }
+
+ @Override
+ public void onClusterConfigChange(ClusterConfig clusterConfig,
+ NotificationContext context) {
+ // TODO: we assume its a view cluster config here. Error handling if not?
+ refreshViewClusterConfig(clusterConfig);
+ // Source cluster config will not be aggregated so here ClusterConfigChange infers view cluster
+ ClusterEvent event = new ClusterEvent(_viewClusterName, ClusterEventType.ClusterConfigChange);
+ _eventProcessor.queueEvent(event);
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("ViewClusterConfigProvider: queued event %s", event.toString()));
+ }
+ }
+
+ protected synchronized void refreshViewClusterConfig(ClusterConfig clusterConfig) {
+ _viewClusterConfig = clusterConfig;
+ }
+}