blob: 677ce543c3094ca2501ce1fd0da354ab26680369 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.zk;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.samza.SamzaException;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Util class to help manage Zk connection and ZkClient.
* It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
*
* <p>
* <b>Note on ZkClient:</b>
* {@link ZkClient} consists of two threads - I/O thread and Event thread.
* I/O thread manages heartbeats to the Zookeeper server in the ensemble and handles responses to synchronous methods
* in Zookeeper API.
* Event thread typically receives all the Watcher events and delivers to registered listeners. It, also, handles
* responses to asynchronous methods in Zookeeper API.
* </p>
*
* <p>
* <b>Note on Session Management:</b>
* Session management, if needed, should be handled by the caller. This can be done by implementing
* {@link org.I0Itec.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
* callbacks are invoked in the context of the Event thread of the ZkClient. So, it is advised to do non-blocking
* processing in the callbacks.
* </p>
*/
public class ZkUtils {
private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
private final ZkClient zkClient;
private volatile String ephemeralPath = null;
private final ZkKeyBuilder keyBuilder;
private final int connectionTimeoutMs;
public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
this.keyBuilder = zkKeyBuilder;
this.connectionTimeoutMs = connectionTimeoutMs;
this.zkClient = zkClient;
}
public void connect() throws ZkInterruptedException {
boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS);
if (!isConnected) {
throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
}
}
public static ZkConnection createZkConnection(String zkConnectString, int sessionTimeoutMs) {
return new ZkConnection(zkConnectString, sessionTimeoutMs);
}
ZkClient getZkClient() {
return zkClient;
}
public ZkKeyBuilder getKeyBuilder() {
return keyBuilder;
}
/**
* Returns a ZK generated identifier for this client.
* If the current client is registering for the first time, it creates an ephemeral sequential node in the ZK tree
* If the current client has already registered and is still within the same session, it returns the already existing
* value for the ephemeralPath
*
* @param data Object that should be written as data in the registered ephemeral ZK node
* @return String representing the absolute ephemeralPath of this client in the current session
*/
public synchronized String registerProcessorAndGetId(final ProcessorData data) {
if (ephemeralPath == null) {
ephemeralPath =
zkClient.createEphemeralSequential(
keyBuilder.getProcessorsPath() + "/", data.toString());
LOG.info("newly generated path for " + data + " is " + ephemeralPath);
return ephemeralPath;
} else {
LOG.info("existing path for " + data + " is " + ephemeralPath);
return ephemeralPath;
}
}
/**
* Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
*
* @return List of absolute ZK node paths
*/
public List<String> getSortedActiveProcessorsZnodes() {
List<String> znodeIds = zkClient.getChildren(keyBuilder.getProcessorsPath());
if (znodeIds.size() > 0) {
Collections.sort(znodeIds);
LOG.info("Found these children - " + znodeIds);
}
return znodeIds;
}
/**
* Method is used to read processor's data from the znode
* @param fullPath absolute path to the znode
* @return processor's data
*/
String readProcessorData(String fullPath) {
String data = zkClient.<String>readData(fullPath, true);
if (data == null) {
throw new SamzaException(String.format("Cannot read ZK node:", fullPath));
}
return data;
}
/**
* Method is used to get the list of currently active/registered processor ids
* @return List of processorIds
*/
public List<String> getSortedActiveProcessorsIDs() {
return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
}
/**
* Method is used to get the <i>sorted</i> list of processors ids for a given list of znodes
* @param znodeIds - list of relative paths of the children's znodes
* @return List of processor ids for a given list of znodes
*/
public List<String> getActiveProcessorsIDs(List<String> znodeIds) {
String processorPath = keyBuilder.getProcessorsPath();
List<String> processorIds = new ArrayList<>(znodeIds.size());
if (znodeIds.size() > 0) {
for (String child : znodeIds) {
String fullPath = String.format("%s/%s", processorPath, child);
processorIds.add(new ProcessorData(readProcessorData(fullPath)).getProcessorId());
}
LOG.info("Found these children - " + znodeIds);
LOG.info("Found these processorIds - " + processorIds);
}
return processorIds;
}
/* Wrapper for standard I0Itec methods */
public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
zkClient.unsubscribeDataChanges(path, dataListener);
}
public void subscribeDataChanges(String path, IZkDataListener dataListener) {
zkClient.subscribeDataChanges(path, dataListener);
}
public boolean exists(String path) {
return zkClient.exists(path);
}
public void close() throws ZkInterruptedException {
zkClient.close();
}
/**
* subscribe for changes of JobModel version
* @param dataListener describe this
*/
public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
}
/**
* Publishes new job model into ZK.
* This call should FAIL if the node already exists.
* @param jobModelVersion version of the jobModeL to publish
* @param jobModel jobModel to publish
*
*/
public void publishJobModel(String jobModelVersion, JobModel jobModel) {
try {
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
LOG.info("jobModelAsString=" + jobModelStr);
zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
} catch (Exception e) {
LOG.error("JobModel publish failed for version=" + jobModelVersion, e);
throw new SamzaException(e);
}
}
/**
* get the job model from ZK by version
* @param jobModelVersion jobModel version to get
* @return job model for this version
*/
public JobModel getJobModel(String jobModelVersion) {
LOG.info("read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
JobModel jm;
try {
jm = mmapper.readValue((String) data, JobModel.class);
} catch (IOException e) {
throw new SamzaException("failed to read JobModel from ZK", e);
}
return jm;
}
/**
* read the jobmodel version from ZK
* @return jobmodel version as a string
*/
public String getJobModelVersion() {
return zkClient.<String>readData(keyBuilder.getJobModelVersionPath());
}
/**
* publish the version number of the next JobModel
* @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
* @param newVersion - new version.
*/
public void publishJobModelVersion(String oldVersion, String newVersion) {
Stat stat = new Stat();
String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
.getVersion() + ")");
if (currentVersion != null && !currentVersion.equals(oldVersion)) {
throw new SamzaException(
"Someone changed JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
}
// data version is the ZK version of the data from the ZK.
int dataVersion = stat.getVersion();
try {
stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
} catch (Exception e) {
String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) +
"(actual data version after update = " + stat.getVersion() + ")");
}
/**
* verify that given paths exist in ZK
* @param paths - paths to verify or create
*/
public void makeSurePersistentPathsExists(String[] paths) {
for (String path : paths) {
if (!zkClient.exists(path)) {
zkClient.createPersistent(path, true);
}
}
}
/**
* subscribe to the changes in the list of processors in ZK
* @param listener - will be called when a processor is added or removed.
*/
public void subscribeToProcessorChange(IZkChildListener listener) {
LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
}
}