blob: 243faaca46b39573d41628dbd5e46202c4181436 [file] [log] [blame]
package org.apache.s4.comm.topology;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
@Singleton
public class AssignmentFromHelix implements Assignment {
private static final Logger logger = LoggerFactory.getLogger(AssignmentFromHelix.class);
private String clusterName;
private final String zookeeperAddress;
private String machineId;
private HelixManager zkHelixManager;
private HelixDataAccessor helixDataAccessor;
AtomicReference<ClusterNode> clusterNodeRef;
private final Lock lock;
private final AtomicBoolean currentlyOwningTask;
private final Condition taskAcquired;
private final StateModelFactory<? extends StateModel> taskStateModelFactory;
// private final StateModelFactory<? extends StateModel> appStateModelFactory;
@Inject
public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
@Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress)
throws Exception {
this.taskStateModelFactory = new TaskStateModelFactory();
// this.appStateModelFactory = appStateModelFactory;
this.clusterName = clusterName;
this.zookeeperAddress = zookeeperAddress;
machineId = "localhost";
lock = new ReentrantLock();
ZkClient zkClient = new ZkClient(zookeeperAddress);
zkClient.setZkSerializer(new ZNRecordSerializer());
zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
clusterNodeRef = new AtomicReference<ClusterNode>();
taskAcquired = lock.newCondition();
currentlyOwningTask = new AtomicBoolean(true);
try {
machineId = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
logger.warn("Unable to get hostname", e);
machineId = "UNKNOWN";
}
ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
clusterNodeRef.set(node);
currentlyOwningTask.set(true);
}
@Inject
public void init() {
// joinCluster();
}
@Override
public ClusterNode assignClusterNode() {
lock.lock();
try {
while (!currentlyOwningTask.get()) {
taskAcquired.awaitUninterruptibly();
}
} catch (Exception e) {
logger.error("Exception while waiting to join the cluster");
return null;
} finally {
lock.unlock();
}
return clusterNodeRef.get();
}
public void joinClusterOld() {
lock.lock();
try {
Builder keyBuilder = helixDataAccessor.keyBuilder();
do {
List<InstanceConfig> instances = helixDataAccessor.getChildValues(keyBuilder.instanceConfigs());
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
for (InstanceConfig instanceConfig : instances) {
String instanceName = instanceConfig.getInstanceName();
if (!liveInstances.contains(instanceName)) {
zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
InstanceType.PARTICIPANT, zookeeperAddress);
zkHelixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
taskStateModelFactory);
zkHelixManager.connect();
ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceConfig.getPort()), machineId,
instanceName);
clusterNodeRef.set(node);
currentlyOwningTask.set(true);
taskAcquired.signalAll();
break;
}
}
if (instances.size() == liveInstances.size()) {
System.out.println("No more nodes can join the cluster. Will wait for some node to die.");
Thread.sleep(100000);
}
} while (!currentlyOwningTask.get());
System.out.println("Joined the cluster:" + clusterName + " as " + clusterNodeRef.get().getTaskId());
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}