SAMZA-1128 : Remove dependency of debounce timer from the CoordinationUtils
This patch addresses the following, apart from the main bug that barrier implementation uses a different scheduler that the Jobcoordinator's main thread.
* Removes CoordinationUtils#getBarrier, BarrierForVersionUpgrade interface
* Renamed ZkBarrierForVersionUpgrade to ZkBarrier and introduces a listener ZkBarrierListener
* Simplified the ZkBarrier class and its integration test
Author: Navina Ramesh <navina@apache.org>
Reviewers: Boris Shkolnik <boryas@apache.org>, Bharath Kumarasubramanian <bkumarasubramanian@linkedin.com>
Closes #195 from navina/SAMZA-1128
diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
index 34d2542..21bfe76 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
@@ -29,7 +29,7 @@
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
public static final int DEFAULT_CONSENSUS_TIMEOUT_MS = 40000;
-
+
public ZkConfig(Config config) {
super(config);
}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
deleted file mode 100644
index 664cef8..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/BarrierForVersionUpgrade.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator;
-
-import java.util.List;
-
-
-/**
- * Interface for a barrier - to allow synchronization between different processors to switch to a newly published
- * JobModel.
- */
-public interface BarrierForVersionUpgrade {
- /**
- * Barrier is usually started by the leader. Creates the Barrier paths in ZK
- *
- * @param version - String, representing the version of the JobModel for which the barrier is created
- * @param participants - {@link List} of participants that need to join for barrier to complete
- */
- void start(String version, List<String> participants);
-
- /**
- * Called by the processor.
- * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
- * joined.
- * The call is async. The callback will be invoked when the barrier is reached.
- * @param version - for which the barrier waits
- * @param thisProcessorsName as it appears in the list of processors.
- * @param callback will be invoked, when barrier is reached.
- */
- void waitForBarrier(String version, String thisProcessorsName, Runnable callback);
-}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
index 952aa51..150b3d4 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
@@ -27,7 +27,6 @@
* This service provide three primitives:
* - LeaderElection
* - Latch
- * - barrier for version upgrades
*/
@InterfaceStability.Evolving
public interface CoordinationUtils {
@@ -42,6 +41,4 @@
LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId
Latch getLatch(int size, String latchId);
-
- BarrierForVersionUpgrade getBarrier(String barrierId);
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index c7bfc1d..581387d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -19,161 +19,164 @@
package org.apache.samza.zk;
-import java.util.Arrays;
-import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
/**
- * This class creates a barrier for version upgrade.
- * Barrier is started by the participant responsible for the upgrade. (start())
- * Each participant will mark its readiness and register for a notification when the barrier is reached. (waitFor())
- * If a timer (started in start()) goes off before the barrier is reached, all the participants will unsubscribe
- * from the notification and the barrier becomes invalid.
+ * ZkBarrierForVersionUpgrade is an implementation of distributed barrier, which guarantees that the expected barrier
+ * size and barrier participants match before marking the barrier as complete.
+ * It also allows the caller to expire the barrier.
+ *
+ * This implementation is specifically tailored towards barrier support during jobmodel version upgrades. The participant
+ * responsible for the version upgrade starts the barrier by invoking {@link #create(String, List)}.
+ * Each participant in the list, then, joins the new barrier. When all listed participants {@link #join(String, String)}
+ * the barrier, the creator marks the barrier as {@link org.apache.samza.zk.ZkBarrierForVersionUpgrade.State#DONE}
+ * which signals the end of barrier.
+ * The creator of the barrier can expire the barrier by invoking {@link #expire(String)}. This will mark the barrier
+ * with value {@link org.apache.samza.zk.ZkBarrierForVersionUpgrade.State#TIMED_OUT} and indicates to everyone that it
+ * is no longer valid.
+ *
+ * The caller can listen to events associated with the barrier by registering a {@link ZkBarrierListener}.
+ *
+ * Zk Tree Reference:
+ * /barrierRoot/
+ * |
+ * |- barrier_{version1}/
+ * | |- barrier_state/
+ * | | ([DONE|TIMED_OUT])
+ * | |- barrier_participants/
+ * | | |- {id1}
+ * | | |- {id2}
+ * | | |- ...
*/
-public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
- private final ZkUtils zkUtils;
- private final static String BARRIER_DONE = "done";
- private final static String BARRIER_TIMED_OUT = "TIMED_OUT";
+public class ZkBarrierForVersionUpgrade {
private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
+ private final ZkUtils zkUtils;
+ private final BarrierKeyBuilder keyBuilder;
+ private final Optional<ZkBarrierListener> barrierListenerOptional;
- private final ScheduleAfterDebounceTime debounceTimer;
+ public enum State {
+ TIMED_OUT, DONE
+ }
- private final String barrierPrefix;
- private String barrierDonePath;
- private String barrierProcessors;
- private static final String VERSION_UPGRADE_TIMEOUT_TIMER = "VersionUpgradeTimeout";
- private final long barrierTimeoutMS;
- public ZkBarrierForVersionUpgrade(String barrierId, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, long barrierTimeoutMS) {
+ public ZkBarrierForVersionUpgrade(String barrierRoot, ZkUtils zkUtils, ZkBarrierListener barrierListener) {
if (zkUtils == null) {
throw new RuntimeException("Cannot operate ZkBarrierForVersionUpgrade without ZkUtils.");
}
this.zkUtils = zkUtils;
- barrierPrefix = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(barrierId);
- this.debounceTimer = debounceTimer;
- this.barrierTimeoutMS = barrierTimeoutMS;
- }
-
- protected long getBarrierTimeOutMs() {
- return barrierTimeoutMS;
- }
-
- private void timerOff(final String version, final Stat currentStatOfBarrierDone) {
- try {
- // write a new value "TIMED_OUT", if the value was changed since previous value, make sure it was changed to "DONE"
- zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_TIMED_OUT, currentStatOfBarrierDone.getVersion());
- } catch (ZkBadVersionException e) {
- // Expected. failed to write, make sure the value is "DONE"
- String done = zkUtils.getZkClient().<String>readData(barrierDonePath);
- LOG.info("Barrier timeout expired, but done=" + done);
- if (!done.equals(BARRIER_DONE)) {
- throw new SamzaException("Failed to write to the barrier_done, version=" + version, e);
- }
- }
- }
-
- private void setPaths(String version) {
- String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
- barrierDonePath = String.format("%s/barrier_done", barrierPath);
- barrierProcessors = String.format("%s/barrier_processors", barrierPath);
-
- zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
- }
-
- @Override
- public void start(String version, List<String> participants) {
- setPaths(version);
-
- // subscribe for processor's list changes
- LOG.info("Subscribing for child changes at " + barrierProcessors);
- zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, new ZkBarrierChangeHandler(participants));
-
- // create a timer for time-out
- Stat currentStatOfBarrierDone = new Stat();
- zkUtils.getZkClient().readData(barrierDonePath, currentStatOfBarrierDone);
-
- debounceTimer.scheduleAfterDebounceTime(
- VERSION_UPGRADE_TIMEOUT_TIMER,
- getBarrierTimeOutMs(),
- () -> timerOff(version, currentStatOfBarrierDone));
- }
-
- @Override
- public void waitForBarrier(String version, String participantName, Runnable callback) {
- setPaths(version);
- final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, participantName);
-
- // now subscribe for the barrier
- zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback));
-
- // update the barrier for this processor
- LOG.info("Creating a child for barrier at " + barrierProcessorThis);
- zkUtils.getZkClient().createPersistent(barrierProcessorThis);
+ this.keyBuilder = new BarrierKeyBuilder(barrierRoot);
+ this.barrierListenerOptional = Optional.ofNullable(barrierListener);
}
/**
- * Listener for the subscription for the list of participants.
- * This method will identify when all the participants have joined.
+ * Creates a shared barrier sub-tree in ZK
+ *
+ * @param version Version associated with the Barrier
+ * @param participants List of expected participated for this barrier to complete
+ */
+ public void create(final String version, List<String> participants) {
+ String barrierRoot = keyBuilder.getBarrierRoot();
+ String barrierParticipantsPath = keyBuilder.getBarrierParticipantsPath(version);
+ zkUtils.makeSurePersistentPathsExists(new String[]{
+ barrierRoot,
+ keyBuilder.getBarrierPath(version),
+ barrierParticipantsPath,
+ keyBuilder.getBarrierStatePath(version)});
+
+ // subscribe for participant's list changes
+ LOG.info("Subscribing for child changes at " + barrierParticipantsPath);
+ zkUtils.getZkClient().subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants));
+
+ barrierListenerOptional.ifPresent(zkBarrierListener -> zkBarrierListener.onBarrierCreated(version));
+ }
+
+ /**
+ * Joins a shared barrier by registering under the barrier sub-tree in ZK
+ *
+ * @param version Version associated with the Barrier
+ * @param participantId Identifier of the participant
+ */
+ public void join(String version, String participantId) {
+ String barrierDonePath = keyBuilder.getBarrierStatePath(version);
+ zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version));
+
+ // TODO: Handle ZkNodeExistsException - SAMZA-1304
+ zkUtils.getZkClient().createPersistent(
+ String.format("%s/%s", keyBuilder.getBarrierParticipantsPath(version), participantId));
+ }
+
+ /**
+ * Expires the barrier version by marking it as TIMED_OUT
+ *
+ * @param version Version associated with the Barrier
+ */
+ public void expire(String version) {
+ zkUtils.getZkClient().writeData(
+ keyBuilder.getBarrierStatePath(version),
+ State.TIMED_OUT);
+
+ }
+ /**
+ * Listener for changes to the list of participants. It is meant to be subscribed only by the creator of the barrier
+ * node. It checks to see when the barrier is ready to be marked as completed.
*/
class ZkBarrierChangeHandler implements IZkChildListener {
+ private final String barrierVersion;
private final List<String> names;
- public ZkBarrierChangeHandler(List<String> names) {
+ public ZkBarrierChangeHandler(String barrierVersion, List<String> names) {
+ this.barrierVersion = barrierVersion;
this.names = names;
}
@Override
- public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
-
+ public void handleChildChange(String parentPath, List<String> currentChildren) {
if (currentChildren == null) {
- LOG.info("Got handleChildChange with null currentChildren");
+ LOG.info("Got ZkBarrierChangeHandler handleChildChange with null currentChildren");
return;
}
LOG.info("list of children in the barrier = " + parentPath + ":" + Arrays.toString(currentChildren.toArray()));
LOG.info("list of children to compare against = " + parentPath + ":" + Arrays.toString(names.toArray()));
- // check if all the names are in
- if (CollectionUtils.containsAll(names, currentChildren)) {
- LOG.info("ALl nodes reached the barrier");
+ // check if all the expected participants are in
+ if (currentChildren.size() == names.size() && CollectionUtils.containsAll(currentChildren, names)) {
+ String barrierDonePath = keyBuilder.getBarrierStatePath(barrierVersion);
LOG.info("Writing BARRIER DONE to " + barrierDonePath);
- zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); // this will trigger notifications
+ zkUtils.getZkClient().writeData(barrierDonePath, State.DONE); // this will trigger notifications
+ zkUtils.getZkClient().unsubscribeChildChanges(barrierDonePath, this);
}
}
}
+ /**
+ * Listener for changes to the Barrier state. It is subscribed by all participants of the barrier, including the
+ * participant that creates the barrier.
+ * Barrier state values are either DONE or TIMED_OUT. It only registers to receive on valid state change notification.
+ * Once a valid state change notification is received, it will un-subscribe from further notifications.
+ */
class ZkBarrierReachedHandler implements IZkDataListener {
- private final ScheduleAfterDebounceTime debounceTimer;
- private final String barrierPathDone;
- private final Runnable callback;
+ private final String barrierStatePath;
+ private final String barrierVersion;
- public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) {
- this.barrierPathDone = barrierPathDone;
- this.callback = callback;
- this.debounceTimer = debounceTimer;
+ public ZkBarrierReachedHandler(String barrierStatePath, String version) {
+ this.barrierStatePath = barrierStatePath;
+ this.barrierVersion = version;
}
@Override
- public void handleDataChange(String dataPath, Object data)
- throws Exception {
- String done = (String) data;
- LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done);
- if (done.equals(BARRIER_DONE)) {
- debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
- } else if (done.equals(BARRIER_TIMED_OUT)) {
- // timed out
- LOG.warn("Barrier for " + dataPath + " timed out");
- }
- // in any case we unsubscribe
- zkUtils.unsubscribeDataChanges(barrierPathDone, this);
+ public void handleDataChange(String dataPath, Object data) {
+ LOG.info("got notification about barrier " + barrierStatePath + "; done=" + data);
+ zkUtils.unsubscribeDataChanges(barrierStatePath, this);
+ barrierListenerOptional.ifPresent(
+ zkBarrierListener -> zkBarrierListener.onBarrierStateChanged(barrierVersion, (State) data));
}
@Override
@@ -182,4 +185,33 @@
LOG.warn("barrier done got deleted at " + dataPath);
}
}
+
+ class BarrierKeyBuilder {
+ private static final String BARRIER_PARTICIPANTS = "/barrier_participants";
+ private static final String BARRIER_STATE = "/barrier_state";
+ private final String barrierRoot;
+ BarrierKeyBuilder(String barrierRoot) {
+ if (barrierRoot == null || barrierRoot.trim().isEmpty() || !barrierRoot.trim().startsWith("/")) {
+ throw new IllegalArgumentException("Barrier root path cannot be null or empty and the path has to start with '/'");
+ }
+ this.barrierRoot = barrierRoot;
+ }
+
+ String getBarrierRoot() {
+ return barrierRoot;
+ }
+
+ String getBarrierPath(String version) {
+ return String.format("%s/barrier_%s", barrierRoot, version);
+ }
+
+ String getBarrierParticipantsPath(String version) {
+ return getBarrierPath(version) + BARRIER_PARTICIPANTS;
+ }
+
+ String getBarrierStatePath(String version) {
+ return getBarrierPath(version) + BARRIER_STATE;
+ }
+ }
+
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierListener.java
new file mode 100644
index 0000000..ecf77b3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+/**
+ * An interface for listening to {@link ZkBarrierForVersionUpgrade} related events
+ */
+public interface ZkBarrierListener {
+ /**
+ * Invoked when the root of barrier for a given version is created in Zk
+ *
+ * @param version Version associated with the Barrier
+ */
+ void onBarrierCreated(String version);
+
+ /**
+ * Invoked when the data written to the Barrier state changes
+ *
+ * @param version Version associated with the Barrier
+ * @param state {@link org.apache.samza.zk.ZkBarrierForVersionUpgrade.State} value
+ */
+ void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state);
+
+ /**
+ * Invoked when Barrier encounters error
+ *
+ * @param version Version associated with the Barrier
+ * @param t Throwable describing the cause of the barrier error
+ */
+ void onBarrierError(String version, Throwable t);
+}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 44d2e37..a8317a8 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -28,7 +28,6 @@
import java.util.List;
-
public class ZkControllerImpl implements ZkController {
private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class);
@@ -115,10 +114,8 @@
class ZkJobModelVersionChangeHandler implements IZkDataListener {
/**
- * called when job model version gets updated
- * @param dataPath
- * @param data
- * @throws Exception
+ * Called when there is a change to the data in JobModel version path
+ * To the subscribers, it signifies that a new version of JobModel is available.
*/
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index 661650d..b3a2a6f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -45,7 +45,7 @@
ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
- return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime());
+ return new ZkCoordinationUtils(participantId, zkConfig, zkUtils);
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 965b32a..df0a527 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -19,7 +19,6 @@
package org.apache.samza.zk;
import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.BarrierForVersionUpgrade;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.coordinator.LeaderElector;
@@ -29,14 +28,11 @@
public final ZkConfig zkConfig;
public final ZkUtils zkUtils;
public final String processorIdStr;
- public final ScheduleAfterDebounceTime debounceTimer;
- public ZkCoordinationUtils(String processorId, ZkConfig zkConfig, ZkUtils zkUtils,
- ScheduleAfterDebounceTime debounceTimer) {
+ public ZkCoordinationUtils(String processorId, ZkConfig zkConfig, ZkUtils zkUtils) {
this.zkConfig = zkConfig;
this.zkUtils = zkUtils;
this.processorIdStr = processorId;
- this.debounceTimer = debounceTimer;
}
@Override
@@ -54,11 +50,6 @@
return new ZkProcessorLatch(size, latchId, processorIdStr, zkUtils);
}
- @Override
- public BarrierForVersionUpgrade getBarrier(String barrierId) {
- return new ZkBarrierForVersionUpgrade(barrierId, zkUtils, debounceTimer, zkConfig.getZkBarrierTimeoutMs());
- }
-
// TODO - SAMZA-1128 CoordinationService should directly depend on ZkUtils and DebounceTimer
public ZkUtils getZkUtils() {
return zkUtils;
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 6ad10d2..64395ac 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -21,8 +21,7 @@
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
-import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelManager;
@@ -36,7 +35,6 @@
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -45,7 +43,6 @@
*/
public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
- private static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
// TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
// with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
private static final int METADATA_CACHE_TTL_MS = 5000;
@@ -55,7 +52,7 @@
private final ZkController zkController;
private final Config config;
- private final CoordinationUtils coordinationUtils;
+ private final ZkBarrierForVersionUpgrade barrier;
private StreamMetadataCache streamMetadataCache = null;
private ScheduleAfterDebounceTime debounceTimer = null;
@@ -64,13 +61,24 @@
public ZkJobCoordinator(Config config) {
this.config = config;
+ ZkConfig zkConfig = new ZkConfig(config);
+ ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getAppId());
+ this.zkUtils = new ZkUtils(
+ keyBuilder,
+ ZkCoordinationServiceFactory.createZkClient(
+ zkConfig.getZkConnect(),
+ zkConfig.getZkSessionTimeoutMs(),
+ zkConfig.getZkConnectionTimeoutMs()),
+ zkConfig.getZkConnectionTimeoutMs());
+
this.processorId = createProcessorId(config);
- this.coordinationUtils = new ZkCoordinationServiceFactory()
- .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config);
- this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
+ this.barrier = new ZkBarrierForVersionUpgrade(
+ keyBuilder.getJobModelVersionBarrierPrefix(),
+ zkUtils,
+ new ZkBarrierListenerImpl());
}
@Override
@@ -121,12 +129,34 @@
ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> doOnProcessorChange(processors));
}
- public void doOnProcessorChange(List<String> processors) {
+ void doOnProcessorChange(List<String> processors) {
// if list of processors is empty - it means we are called from 'onBecomeLeader'
- generateNewJobModel(processors);
- if (coordinatorListener != null) {
- coordinatorListener.onJobModelExpired();
+ // TODO: Handle empty currentProcessorIds or duplicate processorIds in the list
+ List<String> currentProcessorIds = getActualProcessorIds(processors);
+
+ // Generate the JobModel
+ JobModel jobModel = generateNewJobModel(currentProcessorIds);
+
+ // Assign the next version of JobModel
+ String currentJMVersion = zkUtils.getJobModelVersion();
+ String nextJMVersion;
+ if (currentJMVersion == null) {
+ nextJMVersion = "1";
+ } else {
+ nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
}
+ LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
+
+ // Publish the new job model
+ zkUtils.publishJobModel(nextJMVersion, jobModel);
+
+ // Start the barrier for the job model update
+ barrier.create(nextJMVersion, currentProcessorIds);
+
+ // Notify all processors about the new JobModel by updating JobModel Version number
+ zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
+
+ LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
}
@Override
@@ -138,16 +168,13 @@
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
- LOG.info("pid=" + processorId + "new JobModel available.Container stopped.");
- // get the new job model
+ // get the new job model from ZK
newJobModel = zkUtils.getJobModel(version);
LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
// update ZK and wait for all the processors to get this new version
- ZkBarrierForVersionUpgrade barrier =
- (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_UPGRADE_BARRIER);
- barrier.waitForBarrier(version, processorId, () -> onNewJobModelConfirmed(version));
+ barrier.join(version, processorId);
});
}
@@ -179,53 +206,23 @@
}
}
- /**
- * Generate new JobModel when becoming a leader or the list of processor changed.
- */
- private void generateNewJobModel(List<String> processors) {
- List<String> currentProcessorsIds;
+ private List<String> getActualProcessorIds(List<String> processors) {
if (processors.size() > 0) {
// we should use this list
// but it needs to be converted into PIDs, which is part of the data
- currentProcessorsIds = zkUtils.getActiveProcessorsIDs(processors);
+ return zkUtils.getActiveProcessorsIDs(processors);
} else {
// get the current list of processors
- currentProcessorsIds = zkUtils.getSortedActiveProcessorsIDs();
+ return zkUtils.getSortedActiveProcessorsIDs();
}
+ }
- // get the current version
- String currentJMVersion = zkUtils.getJobModelVersion();
- String nextJMVersion;
- if (currentJMVersion == null) {
- LOG.info("pid=" + processorId + "generating first version of the model");
- nextJMVersion = "1";
- } else {
- nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
- }
- LOG.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
-
- List<String> containerIds = new ArrayList<>(currentProcessorsIds.size());
- for (String processorPid : currentProcessorsIds) {
- containerIds.add(processorPid);
- }
- LOG.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
-
- JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
- containerIds);
-
- LOG.info("pid=" + processorId + "Generated jobModel: " + jobModel);
-
- // publish the new job model first
- zkUtils.publishJobModel(nextJMVersion, jobModel);
-
- // start the barrier for the job model update
- BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(
- JOB_MODEL_UPGRADE_BARRIER);
- barrier.start(nextJMVersion, currentProcessorsIds);
-
- // publish new JobModel version
- zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
- LOG.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
+ /**
+ * Generate new JobModel when becoming a leader or the list of processor changed.
+ */
+ private JobModel generateNewJobModel(List<String> processors) {
+ return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
+ processors);
}
class LeaderElectorListenerImpl implements LeaderElectorListener {
@@ -241,4 +238,40 @@
});
}
}
+
+ class ZkBarrierListenerImpl implements ZkBarrierListener {
+ private final String barrierAction = "BarrierAction";
+ @Override
+ public void onBarrierCreated(String version) {
+ debounceTimer.scheduleAfterDebounceTime(
+ barrierAction,
+ (new ZkConfig(config)).getZkBarrierTimeoutMs(),
+ () -> barrier.expire(version)
+ );
+ }
+
+ public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
+ LOG.info("JobModel version " + version + " obtained consensus successfully!");
+ if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
+ debounceTimer.scheduleAfterDebounceTime(
+ barrierAction,
+ 0,
+ () -> onNewJobModelConfirmed(version));
+ } else {
+ if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
+ // no-op
+ // In our consensus model, if the Barrier is timed-out, then it means that one or more initial
+ // participants failed to join. That means, they should have de-registered from "processors" list
+ // and that would have triggered onProcessorChange action -> a new round of consensus.
+ LOG.info("Barrier for version " + version + " timed out.");
+ }
+ }
+ }
+
+ @Override
+ public void onBarrierError(String version, Throwable t) {
+ LOG.error("Encountered error while attaining consensus on JobModel version " + version);
+ stop();
+ }
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 7e4e0d6..37bff6d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -50,6 +50,7 @@
static final String PROCESSORS_PATH = "processors";
static final String JOBMODEL_GENERATION_PATH = "JobModelGeneration";
+ static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
public ZkKeyBuilder(String pathPrefix) {
if (pathPrefix != null && !pathPrefix.trim().isEmpty()) {
@@ -94,7 +95,7 @@
return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
}
- public String getJobModelVersionBarrierPrefix(String barrierId) {
- return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, barrierId);
+ public String getJobModelVersionBarrierPrefix() {
+ return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER);
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index c547901..677ce54 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -114,10 +114,6 @@
}
}
- public synchronized String getEphemeralPath() {
- return ephemeralPath;
- }
-
/**
* Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
*
@@ -259,7 +255,7 @@
if (currentVersion != null && !currentVersion.equals(oldVersion)) {
throw new SamzaException(
- "Someone change JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
+ "Someone changed JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
}
// data version is the ZK version of the data from the ZK.
int dataVersion = stat.getVersion();
@@ -268,7 +264,7 @@
} catch (Exception e) {
String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
LOG.error(msg, e);
- throw new SamzaException(msg);
+ throw new SamzaException(msg, e);
}
LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) +
"(actual data version after update = " + stat.getVersion() + ")");
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 9c91fd3..63d6663 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -18,137 +18,205 @@
*/
package org.apache.samza.zk;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
+import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.samza.coordinator.CoordinationServiceFactory;
-import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// TODO: Rename this such that it is clear that it is an integration test and NOT unit test
public class TestZkBarrierForVersionUpgrade {
private static EmbeddedZookeeper zkServer = null;
private static String testZkConnectionString = null;
- private static CoordinationUtils coordinationUtils;
+ private ZkUtils zkUtils;
+ private ZkUtils zkUtils1;
- private static AtomicInteger counter = new AtomicInteger(1);
-
-
- @Before
- public void testSetup() {
-
+ @BeforeClass
+ public static void test() {
zkServer = new EmbeddedZookeeper();
zkServer.setup();
testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
+ }
- String groupId = "group" + counter.getAndAdd(1);
- String processorId = "p1";
- Map<String, String> map = new HashMap<>();
- map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
- map.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, "200");
- Config config = new MapConfig(map);
-
- CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory();
- coordinationUtils = serviceFactory.getCoordinationService(groupId, processorId, config);
+ @Before
+ public void testSetup() {
+ ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@After
public void testTearDown() {
- coordinationUtils.reset();
+ zkUtils.close();
+ zkUtils1.close();
+ }
+
+ @AfterClass
+ public static void teardown() {
zkServer.teardown();
}
@Test
public void testZkBarrierForVersionUpgrade() {
- String barrierId = "b1";
+ String barrierId = zkUtils.getKeyBuilder().getRootPath() + "/b1";
String ver = "1";
List<String> processors = new ArrayList<>();
processors.add("p1");
processors.add("p2");
+ final CountDownLatch latch = new CountDownLatch(2);
+ final AtomicInteger stateChangedCalled = new AtomicInteger(0);
- BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
+ ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, new ZkBarrierListener() {
+ @Override
+ public void onBarrierCreated(String version) {
+ }
- class Status {
- boolean p1 = false;
- boolean p2 = false;
+ @Override
+ public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) {
+ if (state.equals(ZkBarrierForVersionUpgrade.State.DONE)) {
+ latch.countDown();
+ stateChangedCalled.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onBarrierError(String version, Throwable t) {
+
+ }
+ });
+
+ processor1Barrier.create(ver, processors);
+ processor1Barrier.join(ver, "p1");
+
+ ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, new ZkBarrierListener() {
+ @Override
+ public void onBarrierCreated(String version) {
+ }
+
+ @Override
+ public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) {
+ if (state.equals(ZkBarrierForVersionUpgrade.State.DONE)) {
+ latch.countDown();
+ stateChangedCalled.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onBarrierError(String version, Throwable t) {
+
+ }
+ });
+ processor2Barrier.join(ver, "p2");
+
+ boolean result = false;
+ try {
+ result = latch.await(10000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- final Status s = new Status();
+ Assert.assertTrue("Barrier failed to complete within test timeout.", result);
- barrier.start(ver, processors);
-
- barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
-
- barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
-
- Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2, 2, 100));
- }
-
- @Test
- public void testNegativeZkBarrierForVersionUpgrade() {
- String barrierId = "negativeZkBarrierForVersionUpgrade";
- String ver = "1";
- List<String> processors = new ArrayList<>();
- processors.add("p1");
- processors.add("p2");
- processors.add("p3");
-
- BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
-
- class Status {
- boolean p1 = false;
- boolean p2 = false;
- boolean p3 = false;
+ try {
+ List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_v1/barrier_participants");
+ Assert.assertNotNull(children);
+ Assert.assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size());
+ Assert.assertEquals("Unexpected barrier state. Didn't find the expected members.", processors, children);
+ } catch (Exception e) {
+ // no-op
}
- final Status s = new Status();
-
- barrier.start(ver, processors);
-
- barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
-
- barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
-
- Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
+ Assert.assertEquals(2, stateChangedCalled.get());
}
@Test
public void testZkBarrierForVersionUpgradeWithTimeOut() {
- String barrierId = "barrierTimeout";
+ String barrierId = zkUtils1.getKeyBuilder().getRootPath() + "/barrierTimeout";
String ver = "1";
List<String> processors = new ArrayList<>();
processors.add("p1");
processors.add("p2");
- processors.add("p3");
+ processors.add("p3"); // Simply to prevent barrier from completion for testing purposes
- BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(barrierId);
+ final AtomicInteger timeoutStateChangeCalled = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(2);
+ final ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(
+ barrierId,
+ zkUtils,
+ new ZkBarrierListener() {
+ @Override
+ public void onBarrierCreated(String version) {
+ }
- class Status {
- boolean p1 = false;
- boolean p2 = false;
- boolean p3 = false;
+ @Override
+ public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) {
+ if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
+ timeoutStateChangeCalled.incrementAndGet();
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onBarrierError(String version, Throwable t) {
+
+ }
+
+ });
+ processor1Barrier.create(ver, processors);
+ processor1Barrier.join(ver, "p1");
+
+ final ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(
+ barrierId,
+ zkUtils1,
+ new ZkBarrierListener() {
+ @Override
+ public void onBarrierCreated(String version) {
+ }
+
+ @Override
+ public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) {
+ if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
+ timeoutStateChangeCalled.incrementAndGet();
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onBarrierError(String version, Throwable t) {
+
+ }
+
+ });
+
+ processor2Barrier.join(ver, "p2");
+
+ processor1Barrier.expire(ver);
+ boolean result = false;
+ try {
+ result = latch.await(10000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- final Status s = new Status();
+ Assert.assertTrue("Barrier Timeout test failed to complete within test timeout.", result);
- barrier.start(ver, processors);
-
- barrier.waitForBarrier(ver, "p1", () -> s.p1 = true);
-
- barrier.waitForBarrier(ver, "p2", () -> s.p2 = true);
-
- // this node will join "too late"
- barrier.waitForBarrier(ver, "p3", () -> {
- TestZkUtils.sleepMs(300);
- s.p3 = true;
- });
- Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
+ try {
+ List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_v1/barrier_participants");
+ Assert.assertNotNull(children);
+ Assert.assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size());
+ Assert.assertEquals("Unexpected barrier state. Didn't find the expected members.", processors, children);
+ } catch (Exception e) {
+ // no-op
+ }
+ Assert.assertEquals(2, timeoutStateChangeCalled.get());
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index 6d0bc0b..8ddd688 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -56,7 +56,6 @@
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
String version = "2";
Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
- Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/testBarrier/versionBarriers", builder.getJobModelVersionBarrierPrefix(
- "testBarrier"));
+ Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER + "/versionBarriers", builder.getJobModelVersionBarrierPrefix());
}
}