TAJO-602: WorkerResourceManager should be broke down into 3 parts. (missed files)
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ProtoBufUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ProtoBufUtil.java
new file mode 100644
index 0000000..0dc7f24
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ProtoBufUtil.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+
+public class ProtoBufUtil {
+ public static final BoolProto TRUE = BoolProto.newBuilder().setValue(true).build();
+ public static final BoolProto FALSE = BoolProto.newBuilder().setValue(true).build();
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
new file mode 100644
index 0000000..a995058
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.tajo.QueryId;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+
+/**
+ * It's a worker resource manager context. It contains all context data about TajoWorkerResourceManager.
+ */
+public class TajoRMContext {
+
+ final Dispatcher rmDispatcher;
+
+ /** map between workerIds and running workers */
+ private final ConcurrentMap<String, Worker> workers = new ConcurrentHashMap<String, Worker>();
+
+ /** map between workerIds and inactive workers */
+ private final ConcurrentMap<String, Worker> inactiveWorkers = new ConcurrentHashMap<String, Worker>();
+
+ /** map between queryIds and query master ContainerId */
+ private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap();
+
+ private final Set<String> liveQueryMasterWorkerResources =
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+ public TajoRMContext(Dispatcher dispatcher) {
+ this.rmDispatcher = dispatcher;
+ }
+
+ public Dispatcher getDispatcher() {
+ return rmDispatcher;
+ }
+
+ /**
+ * @return The Map for active workers
+ */
+ public ConcurrentMap<String, Worker> getWorkers() {
+ return workers;
+ }
+
+ /**
+ * @return The Map for inactive workers
+ */
+ public ConcurrentMap<String, Worker> getInactiveWorkers() {
+ return inactiveWorkers;
+ }
+
+ /**
+ *
+ * @return The Map for query master containers
+ */
+ public ConcurrentMap<QueryId, ContainerIdProto> getQueryMasterContainer() {
+ return qmContainerMap;
+ }
+
+ public Set<String> getQueryMasterWorker() {
+ return liveQueryMasterWorkerResources;
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
new file mode 100644
index 0000000..1bcf38b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.ProtoBufUtil;
+
+import java.io.IOError;
+import java.net.InetSocketAddress;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder;
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
+
+/**
+ * It receives pings that workers periodically send. The ping messages contains the worker resources and their statuses.
+ * From ping messages, {@link TajoResourceTracker} tracks the recent status of all workers.
+ *
+ * In detail, it has two main roles as follows:
+ *
+ * <ul>
+ * <li>Membership management for nodes which join to a Tajo cluster</li>
+ * <ul>
+ * <li>Register - It receives the ping from a new worker. It registers the worker.</li>
+ * <li>Unregister - It unregisters a worker who does not send ping for some expiry time.</li>
+ * <ul>
+ * <li>Status Update - It updates the status of all participating workers</li>
+ * </ul>
+ */
+public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocolService.Interface {
+ /** Class logger */
+ private Log LOG = LogFactory.getLog(TajoResourceTracker.class);
+ /** the context of TajoWorkerResourceManager */
+ private final TajoRMContext rmContext;
+ /** Liveliness monitor which checks ping expiry times of workers */
+ private final WorkerLivelinessMonitor workerLivelinessMonitor;
+
+ /** RPC server for worker resource tracker */
+ private AsyncRpcServer server;
+ /** The bind address of RPC server of worker resource tracker */
+ private InetSocketAddress bindAddress;
+
+ public TajoResourceTracker(TajoRMContext rmContext, WorkerLivelinessMonitor workerLivelinessMonitor) {
+ super(TajoResourceTracker.class.getSimpleName());
+ this.rmContext = rmContext;
+ this.workerLivelinessMonitor = workerLivelinessMonitor;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance");
+ TajoConf systemConf = (TajoConf) conf;
+
+ String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+
+ try {
+ server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, 3);
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new IOError(e);
+ }
+
+ server.start();
+ bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+ // Set actual bind address to the systemConf
+ systemConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+
+ LOG.info("TajoResourceTracker starts up (" + this.bindAddress + ")");
+ super.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ // server can be null if some exception occurs before the rpc server starts up.
+ if(server != null) {
+ server.shutdown();
+ server = null;
+ }
+ super.stop();
+ }
+
+ /** The response builder */
+ private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoBufUtil.TRUE);
+
+ private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) {
+ return new WorkerStatusEvent(
+ workerKey,
+ heartbeat.getServerStatus().getRunningTaskNum(),
+ heartbeat.getServerStatus().getJvmHeap().getMaxHeap(),
+ heartbeat.getServerStatus().getJvmHeap().getFreeHeap(),
+ heartbeat.getServerStatus().getJvmHeap().getTotalHeap());
+ }
+
+ @Override
+ public void heartbeat(
+ RpcController controller,
+ NodeHeartbeat heartbeat,
+ RpcCallback<TajoHeartbeatResponse> done) {
+
+ try {
+ // get a workerId from the heartbeat
+ String workerId = createWorkerId(heartbeat);
+
+ if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running
+
+ // status update
+ rmContext.getDispatcher().getEventHandler().handle(createStatusEvent(workerId, heartbeat));
+ // refresh ping
+ workerLivelinessMonitor.receivedPing(workerId);
+
+ } else if (rmContext.getInactiveWorkers().containsKey(workerId)) { // worker was inactive
+
+ // remove the inactive worker from the list of inactive workers.
+ Worker worker = rmContext.getInactiveWorkers().remove(workerId);
+ workerLivelinessMonitor.unregister(worker.getWorkerId());
+
+ // create new worker instance
+ Worker newWorker = createWorkerResource(heartbeat);
+ String newWorkerId = newWorker.getWorkerId();
+ // add the new worker to the list of active workers
+ rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker);
+
+ // Transit the worker to RUNNING
+ rmContext.getDispatcher().getEventHandler().handle(new WorkerEvent(newWorkerId, WorkerEventType.STARTED));
+ // register the worker to the liveliness monitor
+ workerLivelinessMonitor.register(newWorkerId);
+
+ } else { // if new worker pings firstly
+
+ // create new worker instance
+ Worker newWorker = createWorkerResource(heartbeat);
+ Worker oldWorker = rmContext.getWorkers().putIfAbsent(workerId, newWorker);
+
+ if (oldWorker == null) {
+ // Transit the worker to RUNNING
+ rmContext.rmDispatcher.getEventHandler().handle(new WorkerEvent(workerId, WorkerEventType.STARTED));
+ } else {
+ LOG.info("Reconnect from the node at: " + workerId);
+ workerLivelinessMonitor.unregister(workerId);
+ rmContext.getDispatcher().getEventHandler().handle(new WorkerReconnectEvent(workerId, newWorker));
+ }
+
+ workerLivelinessMonitor.register(workerId);
+ }
+
+ } finally {
+ builder.setClusterResourceSummary(getClusterResourceSummary());
+ done.run(builder.build());
+ }
+ }
+
+ private static final String createWorkerId(NodeHeartbeat heartbeat) {
+ return heartbeat.getTajoWorkerHost() + ":" + heartbeat.getTajoQueryMasterPort() + ":" + heartbeat.getPeerRpcPort();
+ }
+
+ private Worker createWorkerResource(NodeHeartbeat request) {
+ boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
+ boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
+
+ WorkerResource workerResource = new WorkerResource();
+ workerResource.setQueryMasterMode(queryMasterMode);
+ workerResource.setTaskRunnerMode(taskRunnerMode);
+
+ if(request.getServerStatus() != null) {
+ workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB());
+ workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
+ workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
+ workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+ workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+ workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+ workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
+ } else {
+ workerResource.setMemoryMB(4096);
+ workerResource.setDiskSlots(4);
+ workerResource.setCpuCoreSlots(4);
+ }
+
+ Worker worker = new Worker(rmContext, workerResource);
+ worker.setHostName(request.getTajoWorkerHost());
+ worker.setHttpPort(request.getTajoWorkerHttpPort());
+ worker.setPeerRpcPort(request.getPeerRpcPort());
+ worker.setQueryMasterPort(request.getTajoQueryMasterPort());
+ worker.setClientPort(request.getTajoWorkerClientPort());
+ worker.setPullServerPort(request.getTajoWorkerPullServerPort());
+ return worker;
+ }
+
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ int totalDiskSlots = 0;
+ int totalCpuCoreSlots = 0;
+ int totalMemoryMB = 0;
+
+ int totalAvailableDiskSlots = 0;
+ int totalAvailableCpuCoreSlots = 0;
+ int totalAvailableMemoryMB = 0;
+
+ synchronized(rmContext) {
+ for(String eachWorker: rmContext.getWorkers().keySet()) {
+ Worker worker = rmContext.getWorkers().get(eachWorker);
+ WorkerResource resource = worker.getResource();
+ if(worker != null) {
+ totalMemoryMB += resource.getMemoryMB();
+ totalAvailableMemoryMB += resource.getAvailableMemoryMB();
+
+ totalDiskSlots += resource.getDiskSlots();
+ totalAvailableDiskSlots += resource.getAvailableDiskSlots();
+
+ totalCpuCoreSlots += resource.getCpuCoreSlots();
+ totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots();
+ }
+ }
+ }
+
+ return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+ .setNumWorkers(rmContext.getWorkers().size())
+ .setTotalCpuCoreSlots(totalCpuCoreSlots)
+ .setTotalDiskSlots(totalDiskSlots)
+ .setTotalMemoryMB(totalMemoryMB)
+ .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
+ .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
+ .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
+ .build();
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/Worker.java
new file mode 100644
index 0000000..0d6b5ee
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+
+import java.util.EnumSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * It contains resource and various information for a worker.
+ */
+public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
+ /** class logger */
+ private static final Log LOG = LogFactory.getLog(Worker.class);
+
+ private final ReentrantReadWriteLock.ReadLock readLock;
+ private final ReentrantReadWriteLock.WriteLock writeLock;
+
+ /** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */
+ private final TajoRMContext rmContext;
+
+ /** Hostname */
+ private String hostName;
+ /** QueryMaster rpc port */
+ private int qmRpcPort;
+ /** Peer rpc port */
+ private int peerRpcPort;
+ /** http info port */
+ private int httpInfoPort;
+ /** the port of QueryMaster client rpc which provides an client API */
+ private int qmClientPort;
+ /** pull server port */
+ private int pullServerPort;
+ /** last heartbeat time */
+ private long lastHeartbeatTime;
+
+ /** Resource capability */
+ private WorkerResource resource;
+
+ private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition();
+ private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition();
+
+ private static final StateMachineFactory<Worker,
+ WorkerState,
+ WorkerEventType,
+ WorkerEvent> stateMachineFactory
+ = new StateMachineFactory<Worker,
+ WorkerState,
+ WorkerEventType,
+ WorkerEvent>(WorkerState.NEW)
+
+ // Transition from NEW
+ .addTransition(WorkerState.NEW, WorkerState.RUNNING,
+ WorkerEventType.STARTED,
+ new AddNodeTransition())
+
+ // Transition from RUNNING
+ .addTransition(WorkerState.RUNNING, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY),
+ WorkerEventType.STATE_UPDATE,
+ STATUS_UPDATE_TRANSITION)
+ .addTransition(WorkerState.RUNNING, WorkerState.LOST,
+ WorkerEventType.EXPIRE,
+ new DeactivateNodeTransition(WorkerState.LOST))
+ .addTransition(WorkerState.RUNNING, WorkerState.RUNNING,
+ WorkerEventType.RECONNECTED,
+ RECONNECT_NODE_TRANSITION)
+
+ // Transitions from UNHEALTHY state
+ .addTransition(WorkerState.UNHEALTHY, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY),
+ WorkerEventType.STATE_UPDATE,
+ STATUS_UPDATE_TRANSITION)
+ .addTransition(WorkerState.UNHEALTHY, WorkerState.LOST,
+ WorkerEventType.EXPIRE,
+ new DeactivateNodeTransition(WorkerState.LOST))
+ .addTransition(WorkerState.UNHEALTHY, WorkerState.UNHEALTHY,
+ WorkerEventType.RECONNECTED,
+ RECONNECT_NODE_TRANSITION);
+
+ private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine =
+ stateMachineFactory.make(this, WorkerState.NEW);
+
+ public Worker(TajoRMContext rmContext, WorkerResource resource) {
+ this.rmContext = rmContext;
+
+ this.lastHeartbeatTime = System.currentTimeMillis();
+ this.resource = resource;
+
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.readLock = lock.readLock();
+ this.writeLock = lock.writeLock();
+ }
+
+ public String getWorkerId() {
+ return hostName + ":" + qmRpcPort + ":" + peerRpcPort;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String allocatedHost) {
+ this.hostName = allocatedHost;
+ }
+
+ public int getPeerRpcPort() {
+ return peerRpcPort;
+ }
+
+ public void setPeerRpcPort(int peerRpcPort) {
+ this.peerRpcPort = peerRpcPort;
+ }
+
+ public int getQueryMasterPort() {
+ return qmRpcPort;
+ }
+
+ public void setQueryMasterPort(int queryMasterPort) {
+ this.qmRpcPort = queryMasterPort;
+ }
+
+ public int getClientPort() {
+ return qmClientPort;
+ }
+
+ public void setClientPort(int clientPort) {
+ this.qmClientPort = clientPort;
+ }
+
+ public int getPullServerPort() {
+ return pullServerPort;
+ }
+
+ public void setPullServerPort(int pullServerPort) {
+ this.pullServerPort = pullServerPort;
+ }
+
+ public int getHttpPort() {
+ return httpInfoPort;
+ }
+
+ public void setHttpPort(int port) {
+ this.httpInfoPort = port;
+ }
+
+ public void setLastHeartbeatTime(long lastheartbeatReportTime) {
+ this.writeLock.lock();
+
+ try {
+ this.lastHeartbeatTime = lastheartbeatReportTime;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public long getLastHeartbeatTime() {
+ this.readLock.lock();
+
+ try {
+ return this.lastHeartbeatTime;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ *
+ * @return the current state of worker
+ */
+ public WorkerState getState() {
+ this.readLock.lock();
+
+ try {
+ return this.stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ *
+ * @return the current resource capability of worker
+ */
+ public WorkerResource getResource() {
+ return this.resource;
+ }
+
+ @Override
+ public int compareTo(Worker o) {
+ if(o == null) {
+ return 1;
+ }
+ return getWorkerId().compareTo(o.getWorkerId());
+ }
+
+ public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+ @Override
+ public void transition(Worker worker, WorkerEvent workerEvent) {
+
+ if(worker.getResource().isQueryMasterMode()) {
+ worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId());
+ }
+ LOG.info("Worker with " + worker.getResource() + " is joined to Tajo cluster");
+ }
+ }
+
+ public static class StatusUpdateTransition implements
+ MultipleArcTransition<Worker, WorkerEvent, WorkerState> {
+
+ @Override
+ public WorkerState transition(Worker worker, WorkerEvent event) {
+ WorkerStatusEvent statusEvent = (WorkerStatusEvent) event;
+
+ // TODO - the synchronization scope using rmContext is too coarsen.
+ synchronized (worker.rmContext) {
+ worker.setLastHeartbeatTime(System.currentTimeMillis());
+ worker.getResource().setNumRunningTasks(statusEvent.getRunningTaskNum());
+ worker.getResource().setMaxHeap(statusEvent.maxHeap());
+ worker.getResource().setFreeHeap(statusEvent.getFreeHeap());
+ worker.getResource().setTotalHeap(statusEvent.getTotalHeap());
+ }
+
+ return WorkerState.RUNNING;
+ }
+ }
+
+ public static class DeactivateNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+ private final WorkerState finalState;
+
+ public DeactivateNodeTransition(WorkerState finalState) {
+ this.finalState = finalState;
+ }
+
+ @Override
+ public void transition(Worker worker, WorkerEvent workerEvent) {
+
+ worker.rmContext.getWorkers().remove(worker.getWorkerId());
+ LOG.info("Deactivating Node " + worker.getWorkerId() + " as it is now " + finalState);
+ worker.rmContext.getInactiveWorkers().putIfAbsent(worker.getWorkerId(), worker);
+ }
+ }
+
+ public static class ReconnectNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+
+ @Override
+ public void transition(Worker worker, WorkerEvent workerEvent) {
+ WorkerReconnectEvent castedEvent = (WorkerReconnectEvent) workerEvent;
+
+ Worker newWorker = castedEvent.getWorker();
+ worker.rmContext.getWorkers().put(castedEvent.getWorkerId(), newWorker);
+ worker.rmContext.getDispatcher().getEventHandler().handle(
+ new WorkerEvent(worker.getWorkerId(), WorkerEventType.STARTED));
+ }
+ }
+
+ @Override
+ public void handle(WorkerEvent event) {
+ LOG.debug("Processing " + event.getWorkerId() + " of type " + event.getType());
+ try {
+ writeLock.lock();
+ WorkerState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ LOG.error("Invalid event " + event.getType() + " on Worker " + getWorkerId());
+ }
+ if (oldState != getState()) {
+ LOG.info(getWorkerId() + " Node Transitioned from " + oldState + " to " + getState());
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
new file mode 100644
index 0000000..389c3be
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * WorkerEvent describes all kinds of events which sent to {@link Worker}.
+ */
+public class WorkerEvent extends AbstractEvent<WorkerEventType> {
+ private final String workerId;
+
+ public WorkerEvent(String workerId, WorkerEventType workerEventType) {
+ super(workerEventType);
+ this.workerId = workerId;
+ }
+
+ public String getWorkerId() {
+ return workerId;
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
new file mode 100644
index 0000000..0c97654
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+public enum WorkerEventType {
+
+ /** Source : {@link TajoResourceTracker}, Destination: {@link Worker} */
+ STARTED,
+ STATE_UPDATE,
+ RECONNECTED,
+
+ /** Source : {@link WorkerLivelinessMonitor}, Destination: {@link Worker} */
+ EXPIRE
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
new file mode 100644
index 0000000..e3524d6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+/**
+ * It periodically checks the latest heartbeat time of {@link Worker}.
+ * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}.
+ */
+public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+ private EventHandler dispatcher;
+
+ public WorkerLivelinessMonitor(Dispatcher d) {
+ super(WorkerLivelinessMonitor.class.getSimpleName(), new SystemClock());
+ this.dispatcher = d.getEventHandler();
+ }
+
+ public void serviceInit(Configuration conf) throws Exception {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ TajoConf systemConf = (TajoConf) conf;
+ // milliseconds
+ int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.RESOURCE_TRACKER_HEARTBEAT_TIMEOUT);
+ setExpireInterval(expireIntvl);
+ setMonitorInterval(expireIntvl/3);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void expire(String id) {
+ dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE));
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
new file mode 100644
index 0000000..46f286d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and it's destination is {@link Worker}.
+ * This event occurs only when an inactive worker sends a ping again.
+ */
+public class WorkerReconnectEvent extends WorkerEvent {
+ private final Worker worker;
+ public WorkerReconnectEvent(String workerId, Worker worker) {
+ super(workerId, WorkerEventType.RECONNECTED);
+ this.worker = worker;
+ }
+
+ public Worker getWorker() {
+ return worker;
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerState.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerState.java
new file mode 100644
index 0000000..a941008
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * It presents the states of {@link Worker}.
+ */
+public enum WorkerState {
+ /** New worker */
+ NEW,
+
+ /** Running worker */
+ RUNNING,
+
+ /** Worker is unhealthy */
+ UNHEALTHY,
+
+ /** worker is out of service */
+ DECOMMISSIONED,
+
+ /** worker has not sent a heartbeat for some configured time threshold */
+ LOST;
+
+ @SuppressWarnings("unused")
+ public boolean isUnusable() {
+ return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
new file mode 100644
index 0000000..8c3d7c1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and its destination is
+ * {@link org.apache.tajo.master.rm.Worker.StatusUpdateTransition} of {@link Worker}.
+ */
+public class WorkerStatusEvent extends WorkerEvent {
+ private final int runningTaskNum;
+ private final long maxHeap;
+ private final long freeHeap;
+ private final long totalHeap;
+
+ public WorkerStatusEvent(String workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
+ super(workerId, WorkerEventType.STATE_UPDATE);
+ this.runningTaskNum = runningTaskNum;
+ this.maxHeap = maxHeap;
+ this.freeHeap = freeHeap;
+ this.totalHeap = totalHeap;
+ }
+
+ public int getRunningTaskNum() {
+ return runningTaskNum;
+ }
+
+ public long maxHeap() {
+ return maxHeap;
+ }
+
+ public long getFreeHeap() {
+ return freeHeap;
+ }
+
+ public long getTotalHeap() {
+ return totalHeap;
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
new file mode 100644
index 0000000..3843b8f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.v2.DiskDeviceInfo;
+import org.apache.tajo.storage.v2.DiskMountInfo;
+import org.apache.tajo.storage.v2.DiskUtil;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+
+/**
+ * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
+ */
+public class WorkerHeartbeatService extends AbstractService {
+ /** class logger */
+ private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class);
+
+ private final TajoWorker.WorkerContext context;
+ private TajoConf systemConf;
+ private RpcConnectionPool connectionPool;
+ private WorkerHeartbeatThread thread;
+
+ public WorkerHeartbeatService(TajoWorker.WorkerContext context) {
+ super(WorkerHeartbeatService.class.getSimpleName());
+ this.context = context;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
+ this.systemConf = (TajoConf) conf;
+
+ connectionPool = RpcConnectionPool.getPool(systemConf);
+ thread = new WorkerHeartbeatThread();
+ thread.start();
+ super.init(conf);
+ }
+
+ @Override
+ public void serviceStop() {
+ thread.stopped.set(true);
+ synchronized (thread) {
+ thread.notifyAll();
+ }
+ super.stop();
+ }
+
+ class WorkerHeartbeatThread extends Thread {
+ private volatile AtomicBoolean stopped = new AtomicBoolean(false);
+ TajoMasterProtocol.ServerStatusProto.System systemInfo;
+ List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
+ new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+ float workerDiskSlots;
+ int workerMemoryMB;
+ List<DiskDeviceInfo> diskDeviceInfos;
+
+ public WorkerHeartbeatThread() {
+ int workerCpuCoreNum;
+
+ boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED);
+ int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
+
+ try {
+ diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ if(dedicatedResource) {
+ float dedicatedMemoryRatio = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO);
+ int totalMemory = getTotalMemoryMB();
+ workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
+ workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
+
+ if(diskDeviceInfos == null) {
+ workerDiskSlots = TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
+ } else {
+ workerDiskSlots = diskDeviceInfos.size();
+ }
+ } else {
+ workerMemoryMB = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+ workerCpuCoreNum = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+ workerDiskSlots = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+ }
+
+ systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+ .setAvailableProcessors(workerCpuCoreNum)
+ .setFreeMemoryMB(0)
+ .setMaxMemoryMB(0)
+ .setTotalMemoryMB(getTotalMemoryMB())
+ .build();
+ }
+
+ public void run() {
+ LOG.info("Worker Resource Heartbeat Thread start.");
+ int sendDiskInfoCount = 0;
+ int pullServerPort = 0;
+ if(context.getPullService()!= null) {
+ long startTime = System.currentTimeMillis();
+ while(true) {
+ pullServerPort = context.getPullService().getPort();
+ if(pullServerPort > 0) {
+ break;
+ }
+ //waiting while pull server init
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ if(System.currentTimeMillis() - startTime > 30 * 1000) {
+ LOG.fatal("Too long push server init.");
+ System.exit(0);
+ }
+ }
+ }
+
+ String hostName = null;
+ int peerRpcPort = 0;
+ int queryMasterPort = 0;
+ int clientPort = 0;
+
+ if(context.getTajoWorkerManagerService() != null) {
+ hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
+ peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
+ }
+ if(context.getQueryMasterManagerService() != null) {
+ hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
+ queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
+ }
+ if(context.getTajoWorkerClientService() != null) {
+ clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
+ }
+ if (context.getPullService() != null) {
+ pullServerPort = context.getPullService().getPort();
+ }
+
+ while(!stopped.get()) {
+ if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
+ getDiskUsageInfos();
+ }
+ TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
+ TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+ .setMaxHeap(Runtime.getRuntime().maxMemory())
+ .setFreeHeap(Runtime.getRuntime().freeMemory())
+ .setTotalHeap(Runtime.getRuntime().totalMemory())
+ .build();
+
+ TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+ .addAllDisk(diskInfos)
+ .setRunningTaskNum(
+ context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
+ .setSystem(systemInfo)
+ .setDiskSlots(workerDiskSlots)
+ .setMemoryResourceMB(workerMemoryMB)
+ .setJvmHeap(jvmHeap)
+ .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode()))
+ .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode()))
+ .build();
+
+ NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
+ .setTajoWorkerHost(hostName)
+ .setTajoQueryMasterPort(queryMasterPort)
+ .setPeerRpcPort(peerRpcPort)
+ .setTajoWorkerClientPort(clientPort)
+ .setTajoWorkerHttpPort(context.getHttpPort())
+ .setTajoWorkerPullServerPort(pullServerPort)
+ .setServerStatus(serverStatus)
+ .build();
+
+ NettyClientBase rmClient = null;
+ try {
+ CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+ new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+
+ rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
+ TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
+ resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
+
+ TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+ if(response != null) {
+ TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+ if(clusterResourceSummary.getNumWorkers() > 0) {
+ context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
+ }
+ context.setClusterResource(clusterResourceSummary);
+ } else {
+ if(callBack.getController().failed()) {
+ throw new ServiceException(callBack.getController().errorText());
+ }
+ }
+ } catch (InterruptedException e) {
+ break;
+ } catch (TimeoutException te) {
+ LOG.warn("Heartbeat response is being delayed.");
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ connectionPool.releaseConnection(rmClient);
+ }
+
+ try {
+ synchronized (WorkerHeartbeatThread.this){
+ wait(10 * 1000);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ sendDiskInfoCount++;
+
+ if(sendDiskInfoCount > 10) {
+ sendDiskInfoCount = 0;
+ }
+ }
+
+ LOG.info("Worker Resource Heartbeat Thread stopped.");
+ }
+
+ private void getDiskUsageInfos() {
+ diskInfos.clear();
+ for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
+ List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
+ if(mountInfos != null) {
+ for(DiskMountInfo eachMount: mountInfos) {
+ File eachFile = new File(eachMount.getMountPath());
+ diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+ .setAbsolutePath(eachFile.getAbsolutePath())
+ .setTotalSpace(eachFile.getTotalSpace())
+ .setFreeSpace(eachFile.getFreeSpace())
+ .setUsableSpace(eachFile.getUsableSpace())
+ .build());
+ }
+ }
+ }
+ }
+ }
+
+ public static int getTotalMemoryMB() {
+ com.sun.management.OperatingSystemMXBean bean =
+ (com.sun.management.OperatingSystemMXBean)
+ java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+ long max = bean.getTotalPhysicalMemorySize();
+ return ((int) (max / (1024 * 1024)));
+ }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ResourceTrackerProtocol.proto
new file mode 100644
index 0000000..d46d09a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/ResourceTrackerProtocol.proto
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoResourceTrackerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "TajoMasterProtocol.proto";
+
+message NodeHeartbeat {
+ required string tajoWorkerHost = 1;
+ required int32 peerRpcPort = 2;
+ required int32 tajoQueryMasterPort = 3;
+ optional ServerStatusProto serverStatus = 4;
+ optional int32 tajoWorkerClientPort = 5;
+ optional string statusMessage = 6;
+ optional int32 tajoWorkerPullServerPort = 7;
+ optional int32 tajoWorkerHttpPort = 8;
+}
+
+service TajoResourceTrackerProtocolService {
+ rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse);
+}
\ No newline at end of file
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
new file mode 100644
index 0000000..af48fa6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestTajoResourceManager {
+ private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+ private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+ TajoConf tajoConf;
+
+ long queryIdTime = System.currentTimeMillis();
+ int numWorkers = 5;
+ float workerDiskSlots = 5.0f;
+ int workerMemoryMB = 512 * 10;
+ WorkerResourceAllocationResponse response;
+
+ private TajoWorkerResourceManager initResourceManager(boolean queryMasterMode) throws Exception {
+ tajoConf = new org.apache.tajo.conf.TajoConf();
+
+ tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
+ tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512);
+ tajoConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
+ TajoWorkerResourceManager tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
+ tajoWorkerResourceManager.init(tajoConf);
+ tajoWorkerResourceManager.start();
+
+ for(int i = 0; i < numWorkers; i++) {
+ ServerStatusProto.System system = ServerStatusProto.System.newBuilder()
+ .setAvailableProcessors(1)
+ .setFreeMemoryMB(workerMemoryMB)
+ .setMaxMemoryMB(workerMemoryMB)
+ .setTotalMemoryMB(workerMemoryMB)
+ .build();
+
+ ServerStatusProto.JvmHeap jvmHeap = ServerStatusProto.JvmHeap.newBuilder()
+ .setFreeHeap(workerMemoryMB)
+ .setMaxHeap(workerMemoryMB)
+ .setTotalHeap(workerMemoryMB)
+ .build();
+
+ ServerStatusProto.Disk disk = ServerStatusProto.Disk.newBuilder()
+ .setAbsolutePath("/")
+ .setFreeSpace(0)
+ .setTotalSpace(0)
+ .setUsableSpace(0)
+ .build();
+
+ List<ServerStatusProto.Disk> disks = new ArrayList<ServerStatusProto.Disk>();
+
+ disks.add(disk);
+
+ ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
+ .setQueryMasterMode(queryMasterMode ? BOOL_TRUE : BOOL_FALSE)
+ .setTaskRunnerMode(BOOL_TRUE)
+ .setDiskSlots(workerDiskSlots)
+ .setMemoryResourceMB(workerMemoryMB)
+ .setJvmHeap(jvmHeap)
+ .setSystem(system)
+ .addAllDisk(disks)
+ .setRunningTaskNum(0)
+ .build();
+
+ NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder()
+ .setTajoWorkerHost("host" + (i + 1))
+ .setTajoQueryMasterPort(21000)
+ .setTajoWorkerHttpPort(28080 + i)
+ .setPeerRpcPort(12345)
+ .setServerStatus(serverStatus)
+ .build();
+
+ tajoWorkerResourceManager.getResourceTracker().heartbeat(null, tajoHeartbeat, NullCallback.get());
+ }
+
+ return tajoWorkerResourceManager;
+ }
+
+
+ @Test
+ public void testHeartbeat() throws Exception {
+ TajoWorkerResourceManager tajoWorkerResourceManager = null;
+ try {
+ tajoWorkerResourceManager = initResourceManager(false);
+ assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+ assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+ }
+ } finally {
+ if (tajoWorkerResourceManager != null) {
+ tajoWorkerResourceManager.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testMemoryResource() throws Exception {
+ TajoWorkerResourceManager tajoWorkerResourceManager = null;
+ try {
+ tajoWorkerResourceManager = initResourceManager(false);
+
+ final int minMemory = 256;
+ final int maxMemory = 512;
+ float diskSlots = 1.0f;
+
+ QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 1);
+
+ WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+ .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
+ .setNumContainers(60)
+ .setQueryId(queryId.getProto())
+ .setMaxDiskSlotPerContainer(diskSlots)
+ .setMinDiskSlotPerContainer(diskSlots)
+ .setMinMemoryMBPerContainer(minMemory)
+ .setMaxMemoryMBPerContainer(maxMemory)
+ .build();
+
+ final Object monitor = new Object();
+ final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+
+
+ RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+
+ @Override
+ public void run(WorkerResourceAllocationResponse response) {
+ TestTajoResourceManager.this.response = response;
+ synchronized(monitor) {
+ monitor.notifyAll();
+ }
+ }
+ };
+
+ tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+ synchronized(monitor) {
+ monitor.wait();
+ }
+
+
+ // assert after callback
+ int totalUsedMemory = 0;
+ int totalUsedDisks = 0;
+
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ assertEquals(0, resource.getAvailableMemoryMB());
+ assertEquals(0, resource.getAvailableDiskSlots(), 0);
+ assertEquals(5.0f, resource.getUsedDiskSlots(), 0);
+
+ totalUsedMemory += resource.getUsedMemoryMB();
+ totalUsedDisks += resource.getUsedDiskSlots();
+ }
+
+ assertEquals(workerMemoryMB * numWorkers, totalUsedMemory);
+ assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
+
+ assertEquals(numWorkers * 10, response.getWorkerAllocatedResourceList().size());
+
+ for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
+ assertTrue(
+ eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory);
+ containerIds.add(eachResource.getContainerId());
+ }
+
+ for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
+ }
+
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+ assertEquals(0, resource.getUsedMemoryMB());
+
+ assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+ assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+ }
+ } finally {
+ if (tajoWorkerResourceManager != null) {
+ tajoWorkerResourceManager.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testMemoryNotCommensurable() throws Exception {
+ TajoWorkerResourceManager tajoWorkerResourceManager = null;
+
+ try {
+ tajoWorkerResourceManager = initResourceManager(false);
+
+ final int minMemory = 200;
+ final int maxMemory = 500;
+ float diskSlots = 1.0f;
+
+ QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 2);
+
+ int requiredContainers = 60;
+
+ int numAllocatedContainers = 0;
+
+ int loopCount = 0;
+ while(true) {
+ WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+ .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
+ .setNumContainers(requiredContainers - numAllocatedContainers)
+ .setQueryId(queryId.getProto())
+ .setMaxDiskSlotPerContainer(diskSlots)
+ .setMinDiskSlotPerContainer(diskSlots)
+ .setMinMemoryMBPerContainer(minMemory)
+ .setMaxMemoryMBPerContainer(maxMemory)
+ .build();
+
+ final Object monitor = new Object();
+
+ RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+ @Override
+ public void run(WorkerResourceAllocationResponse response) {
+ TestTajoResourceManager.this.response = response;
+ synchronized(monitor) {
+ monitor.notifyAll();
+ }
+ }
+ };
+
+ tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+ synchronized(monitor) {
+ monitor.wait();
+ }
+
+ numAllocatedContainers += TestTajoResourceManager.this.response.getWorkerAllocatedResourceList().size();
+
+ //release resource
+ for(WorkerAllocatedResource eachResource:
+ TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) {
+ assertTrue(
+ eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory);
+ tajoWorkerResourceManager.releaseWorkerResource(eachResource.getContainerId());
+ }
+
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ assertEquals(0, resource.getUsedMemoryMB());
+ assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+
+ assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+ assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+ }
+
+ loopCount++;
+
+ if(loopCount == 2) {
+ assertEquals(requiredContainers, numAllocatedContainers);
+ break;
+ }
+ }
+
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ assertEquals(0, resource.getUsedMemoryMB());
+ assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+
+ assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+ assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+ }
+ } finally {
+ if (tajoWorkerResourceManager != null) {
+ tajoWorkerResourceManager.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testDiskResource() throws Exception {
+ TajoWorkerResourceManager tajoWorkerResourceManager = null;
+
+ try {
+ tajoWorkerResourceManager = initResourceManager(false);
+
+ final float minDiskSlots = 1.0f;
+ final float maxDiskSlots = 2.0f;
+ int memoryMB = 256;
+
+ QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
+
+ WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+ .setResourceRequestPriority(ResourceRequestPriority.DISK)
+ .setNumContainers(60)
+ .setQueryId(queryId.getProto())
+ .setMaxDiskSlotPerContainer(maxDiskSlots)
+ .setMinDiskSlotPerContainer(minDiskSlots)
+ .setMinMemoryMBPerContainer(memoryMB)
+ .setMaxMemoryMBPerContainer(memoryMB)
+ .build();
+
+ final Object monitor = new Object();
+ final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+
+
+ RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+
+ @Override
+ public void run(WorkerResourceAllocationResponse response) {
+ TestTajoResourceManager.this.response = response;
+ synchronized(monitor) {
+ monitor.notifyAll();
+ }
+ }
+ };
+
+ tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+ synchronized(monitor) {
+ monitor.wait();
+ }
+ for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
+ assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(),
+ eachResource.getAllocatedDiskSlots() >= minDiskSlots &&
+ eachResource.getAllocatedDiskSlots() <= maxDiskSlots);
+ containerIds.add(eachResource.getContainerId());
+ }
+
+ // assert after callback
+ int totalUsedDisks = 0;
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
+ assertEquals(0, resource.getAvailableDiskSlots(), 0);
+ assertEquals(5.0f, resource.getUsedDiskSlots(), 0);
+ assertEquals(256 * 3, resource.getUsedMemoryMB());
+
+ totalUsedDisks += resource.getUsedDiskSlots();
+ }
+
+ assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
+
+ assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size());
+
+ for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
+ }
+
+ for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+ WorkerResource resource = worker.getResource();
+ assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+ assertEquals(0, resource.getUsedMemoryMB());
+
+ assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+ assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+ }
+ } finally {
+ if (tajoWorkerResourceManager != null) {
+ tajoWorkerResourceManager.stop();
+ }
+ }
+ }
+}