blob: 7e0b4acae2b8cfc9762e8c85986d94e3e3ed9f34 [file] [log] [blame]
package org.apache.s4.comm.topology;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
@Singleton
public class AssignmentFromHelix implements Assignment
{
private static final Logger logger = LoggerFactory
.getLogger(AssignmentFromHelix.class);
private String clusterName;
private final String zookeeperAddress;
private String machineId;
private HelixManager zkHelixManager;
private HelixDataAccessor helixDataAccessor;
AtomicReference<ClusterNode> clusterNodeRef;
private final Lock lock;
private final AtomicBoolean currentlyOwningTask;
private final Condition taskAcquired;
private final StateModelFactory<? extends StateModel> taskStateModelFactory;
//private final StateModelFactory<? extends StateModel> appStateModelFactory;
@Inject
public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
@Named("s4.instance.name") String instanceName,
@Named("s4.cluster.zk_address") String zookeeperAddress
) throws Exception
{
this.taskStateModelFactory = new TaskStateModelFactory();
// this.appStateModelFactory = appStateModelFactory;
this.clusterName = clusterName;
this.zookeeperAddress = zookeeperAddress;
machineId = "localhost";
lock = new ReentrantLock();
ZkClient zkClient = new ZkClient(zookeeperAddress);
zkClient.setZkSerializer(new ZNRecordSerializer());
zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(
zkClient);
helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
clusterNodeRef = new AtomicReference<ClusterNode>();
taskAcquired = lock.newCondition();
currentlyOwningTask = new AtomicBoolean(true);
try
{
machineId = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e)
{
logger.warn("Unable to get hostname", e);
machineId = "UNKNOWN";
}
ClusterNode node = new ClusterNode(-1,
Integer.parseInt(instanceName.split("_")[1]), machineId,
instanceName);
clusterNodeRef.set(node);
currentlyOwningTask.set(true);
}
@Inject
public void init()
{
//joinCluster();
}
@Override
public ClusterNode assignClusterNode()
{
lock.lock();
try
{
while (!currentlyOwningTask.get())
{
taskAcquired.awaitUninterruptibly();
}
} catch (Exception e)
{
logger.error("Exception while waiting to join the cluster");
return null;
} finally
{
lock.unlock();
}
return clusterNodeRef.get();
}
public void joinClusterOld()
{
lock.lock();
try
{
Builder keyBuilder = helixDataAccessor.keyBuilder();
do
{
List<InstanceConfig> instances = helixDataAccessor
.getChildValues(keyBuilder.instanceConfigs());
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
.liveInstances());
for (InstanceConfig instanceConfig : instances)
{
String instanceName = instanceConfig.getInstanceName();
if (!liveInstances.contains(instanceName))
{
zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
zkHelixManager.getStateMachineEngine().registerStateModelFactory(
"LeaderStandby", taskStateModelFactory);
zkHelixManager.connect();
ClusterNode node = new ClusterNode(-1,
Integer.parseInt(instanceConfig.getPort()), machineId,
instanceName);
clusterNodeRef.set(node);
currentlyOwningTask.set(true);
taskAcquired.signalAll();
break;
}
}
if (instances.size() == liveInstances.size())
{
System.out
.println("No more nodes can join the cluster. Will wait for some node to die.");
Thread.sleep(100000);
}
} while (!currentlyOwningTask.get());
System.out.println("Joined the cluster:"+ clusterName +" as "+ clusterNodeRef.get().getTaskId());
} catch (Exception e)
{
e.printStackTrace();
} finally
{
lock.unlock();
}
}
}