TAJO-602: WorkerResourceManager should be broke down into 3 parts. (missed files)
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ProtoBufUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ProtoBufUtil.java
new file mode 100644
index 0000000..0dc7f24
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ProtoBufUtil.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+
+public class ProtoBufUtil {
+  public static final BoolProto TRUE = BoolProto.newBuilder().setValue(true).build();
+  public static final BoolProto FALSE = BoolProto.newBuilder().setValue(true).build();
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
new file mode 100644
index 0000000..a995058
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.tajo.QueryId;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+
+/**
+ * It's a worker resource manager context. It contains all context data about TajoWorkerResourceManager.
+ */
+public class TajoRMContext {
+
+  final Dispatcher rmDispatcher;
+
+  /** map between workerIds and running workers */
+  private final ConcurrentMap<String, Worker> workers = new ConcurrentHashMap<String, Worker>();
+
+  /** map between workerIds and inactive workers */
+  private final ConcurrentMap<String, Worker> inactiveWorkers = new ConcurrentHashMap<String, Worker>();
+
+  /** map between queryIds and query master ContainerId */
+  private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap();
+
+  private final Set<String> liveQueryMasterWorkerResources =
+      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+  public TajoRMContext(Dispatcher dispatcher) {
+    this.rmDispatcher = dispatcher;
+  }
+
+  public Dispatcher getDispatcher() {
+    return rmDispatcher;
+  }
+
+  /**
+   * @return The Map for active workers
+   */
+  public ConcurrentMap<String, Worker> getWorkers() {
+    return workers;
+  }
+
+  /**
+   * @return The Map for inactive workers
+   */
+  public ConcurrentMap<String, Worker> getInactiveWorkers() {
+    return inactiveWorkers;
+  }
+
+  /**
+   *
+   * @return The Map for query master containers
+   */
+  public ConcurrentMap<QueryId, ContainerIdProto> getQueryMasterContainer() {
+    return qmContainerMap;
+  }
+
+  public Set<String> getQueryMasterWorker() {
+    return liveQueryMasterWorkerResources;
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
new file mode 100644
index 0000000..1bcf38b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.ProtoBufUtil;
+
+import java.io.IOError;
+import java.net.InetSocketAddress;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder;
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
+
+/**
+ * It receives pings that workers periodically send. The ping messages contains the worker resources and their statuses.
+ * From ping messages, {@link TajoResourceTracker} tracks the recent status of all workers.
+ *
+ * In detail, it has two main roles as follows:
+ *
+ * <ul>
+ *   <li>Membership management for nodes which join to a Tajo cluster</li>
+ *   <ul>
+ *    <li>Register - It receives the ping from a new worker. It registers the worker.</li>
+ *    <li>Unregister - It unregisters a worker who does not send ping for some expiry time.</li>
+ *   <ul>
+ *   <li>Status Update - It updates the status of all participating workers</li>
+ * </ul>
+ */
+public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocolService.Interface {
+  /** Class logger */
+  private Log LOG = LogFactory.getLog(TajoResourceTracker.class);
+  /** the context of TajoWorkerResourceManager */
+  private final TajoRMContext rmContext;
+  /** Liveliness monitor which checks ping expiry times of workers */
+  private final WorkerLivelinessMonitor workerLivelinessMonitor;
+
+  /** RPC server for worker resource tracker */
+  private AsyncRpcServer server;
+  /** The bind address of RPC server of worker resource tracker */
+  private InetSocketAddress bindAddress;
+
+  public TajoResourceTracker(TajoRMContext rmContext, WorkerLivelinessMonitor workerLivelinessMonitor) {
+    super(TajoResourceTracker.class.getSimpleName());
+    this.rmContext = rmContext;
+    this.workerLivelinessMonitor = workerLivelinessMonitor;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance");
+    TajoConf systemConf = (TajoConf) conf;
+
+    String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+
+    try {
+      server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, 3);
+    } catch (Exception e) {
+      LOG.error(e);
+      throw new IOError(e);
+    }
+
+    server.start();
+    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+    // Set actual bind address to the systemConf
+    systemConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+
+    LOG.info("TajoResourceTracker starts up (" + this.bindAddress + ")");
+    super.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    // server can be null if some exception occurs before the rpc server starts up.
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+    super.stop();
+  }
+
+  /** The response builder */
+  private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoBufUtil.TRUE);
+
+  private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) {
+    return new WorkerStatusEvent(
+        workerKey,
+        heartbeat.getServerStatus().getRunningTaskNum(),
+        heartbeat.getServerStatus().getJvmHeap().getMaxHeap(),
+        heartbeat.getServerStatus().getJvmHeap().getFreeHeap(),
+        heartbeat.getServerStatus().getJvmHeap().getTotalHeap());
+  }
+
+  @Override
+  public void heartbeat(
+      RpcController controller,
+      NodeHeartbeat heartbeat,
+      RpcCallback<TajoHeartbeatResponse> done) {
+
+    try {
+      // get a workerId from the heartbeat
+      String workerId = createWorkerId(heartbeat);
+
+      if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running
+
+        // status update
+        rmContext.getDispatcher().getEventHandler().handle(createStatusEvent(workerId, heartbeat));
+        // refresh ping
+        workerLivelinessMonitor.receivedPing(workerId);
+
+      } else if (rmContext.getInactiveWorkers().containsKey(workerId)) { // worker was inactive
+
+        // remove the inactive worker from the list of inactive workers.
+        Worker worker = rmContext.getInactiveWorkers().remove(workerId);
+        workerLivelinessMonitor.unregister(worker.getWorkerId());
+
+        // create new worker instance
+        Worker newWorker = createWorkerResource(heartbeat);
+        String newWorkerId = newWorker.getWorkerId();
+        // add the new worker to the list of active workers
+        rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker);
+
+        // Transit the worker to RUNNING
+        rmContext.getDispatcher().getEventHandler().handle(new WorkerEvent(newWorkerId, WorkerEventType.STARTED));
+        // register the worker to the liveliness monitor
+        workerLivelinessMonitor.register(newWorkerId);
+
+      } else { // if new worker pings firstly
+
+        // create new worker instance
+        Worker newWorker = createWorkerResource(heartbeat);
+        Worker oldWorker = rmContext.getWorkers().putIfAbsent(workerId, newWorker);
+
+        if (oldWorker == null) {
+          // Transit the worker to RUNNING
+          rmContext.rmDispatcher.getEventHandler().handle(new WorkerEvent(workerId, WorkerEventType.STARTED));
+        } else {
+          LOG.info("Reconnect from the node at: " + workerId);
+          workerLivelinessMonitor.unregister(workerId);
+          rmContext.getDispatcher().getEventHandler().handle(new WorkerReconnectEvent(workerId, newWorker));
+        }
+
+        workerLivelinessMonitor.register(workerId);
+      }
+
+    } finally {
+      builder.setClusterResourceSummary(getClusterResourceSummary());
+      done.run(builder.build());
+    }
+  }
+
+  private static final String createWorkerId(NodeHeartbeat heartbeat) {
+    return heartbeat.getTajoWorkerHost() + ":" + heartbeat.getTajoQueryMasterPort() + ":" + heartbeat.getPeerRpcPort();
+  }
+
+  private Worker createWorkerResource(NodeHeartbeat request) {
+    boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
+    boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
+
+    WorkerResource workerResource = new WorkerResource();
+    workerResource.setQueryMasterMode(queryMasterMode);
+    workerResource.setTaskRunnerMode(taskRunnerMode);
+
+    if(request.getServerStatus() != null) {
+      workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB());
+      workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
+      workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
+      workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+      workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+      workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+      workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
+    } else {
+      workerResource.setMemoryMB(4096);
+      workerResource.setDiskSlots(4);
+      workerResource.setCpuCoreSlots(4);
+    }
+
+    Worker worker = new Worker(rmContext, workerResource);
+    worker.setHostName(request.getTajoWorkerHost());
+    worker.setHttpPort(request.getTajoWorkerHttpPort());
+    worker.setPeerRpcPort(request.getPeerRpcPort());
+    worker.setQueryMasterPort(request.getTajoQueryMasterPort());
+    worker.setClientPort(request.getTajoWorkerClientPort());
+    worker.setPullServerPort(request.getTajoWorkerPullServerPort());
+    return worker;
+  }
+
+  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+    int totalDiskSlots = 0;
+    int totalCpuCoreSlots = 0;
+    int totalMemoryMB = 0;
+
+    int totalAvailableDiskSlots = 0;
+    int totalAvailableCpuCoreSlots = 0;
+    int totalAvailableMemoryMB = 0;
+
+    synchronized(rmContext) {
+      for(String eachWorker: rmContext.getWorkers().keySet()) {
+        Worker worker = rmContext.getWorkers().get(eachWorker);
+        WorkerResource resource = worker.getResource();
+        if(worker != null) {
+          totalMemoryMB += resource.getMemoryMB();
+          totalAvailableMemoryMB += resource.getAvailableMemoryMB();
+
+          totalDiskSlots += resource.getDiskSlots();
+          totalAvailableDiskSlots += resource.getAvailableDiskSlots();
+
+          totalCpuCoreSlots += resource.getCpuCoreSlots();
+          totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots();
+        }
+      }
+    }
+
+    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+        .setNumWorkers(rmContext.getWorkers().size())
+        .setTotalCpuCoreSlots(totalCpuCoreSlots)
+        .setTotalDiskSlots(totalDiskSlots)
+        .setTotalMemoryMB(totalMemoryMB)
+        .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
+        .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
+        .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
+        .build();
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/Worker.java
new file mode 100644
index 0000000..0d6b5ee
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+
+import java.util.EnumSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * It contains resource and various information for a worker.
+ */
+public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(Worker.class);
+
+  private final ReentrantReadWriteLock.ReadLock readLock;
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+
+  /** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */
+  private final TajoRMContext rmContext;
+
+  /** Hostname */
+  private String hostName;
+  /** QueryMaster rpc port */
+  private int qmRpcPort;
+  /** Peer rpc port */
+  private int peerRpcPort;
+  /** http info port */
+  private int httpInfoPort;
+  /** the port of QueryMaster client rpc which provides an client API */
+  private int qmClientPort;
+  /** pull server port */
+  private int pullServerPort;
+  /** last heartbeat time */
+  private long lastHeartbeatTime;
+
+  /** Resource capability */
+  private WorkerResource resource;
+
+  private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition();
+  private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition();
+
+  private static final StateMachineFactory<Worker,
+      WorkerState,
+      WorkerEventType,
+      WorkerEvent> stateMachineFactory
+      = new StateMachineFactory<Worker,
+      WorkerState,
+      WorkerEventType,
+      WorkerEvent>(WorkerState.NEW)
+
+      // Transition from NEW
+      .addTransition(WorkerState.NEW, WorkerState.RUNNING,
+          WorkerEventType.STARTED,
+          new AddNodeTransition())
+
+      // Transition from RUNNING
+      .addTransition(WorkerState.RUNNING, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY),
+          WorkerEventType.STATE_UPDATE,
+          STATUS_UPDATE_TRANSITION)
+      .addTransition(WorkerState.RUNNING, WorkerState.LOST,
+          WorkerEventType.EXPIRE,
+          new DeactivateNodeTransition(WorkerState.LOST))
+      .addTransition(WorkerState.RUNNING, WorkerState.RUNNING,
+          WorkerEventType.RECONNECTED,
+          RECONNECT_NODE_TRANSITION)
+
+      // Transitions from UNHEALTHY state
+      .addTransition(WorkerState.UNHEALTHY, EnumSet.of(WorkerState.RUNNING, WorkerState.UNHEALTHY),
+          WorkerEventType.STATE_UPDATE,
+          STATUS_UPDATE_TRANSITION)
+      .addTransition(WorkerState.UNHEALTHY, WorkerState.LOST,
+          WorkerEventType.EXPIRE,
+          new DeactivateNodeTransition(WorkerState.LOST))
+      .addTransition(WorkerState.UNHEALTHY, WorkerState.UNHEALTHY,
+          WorkerEventType.RECONNECTED,
+          RECONNECT_NODE_TRANSITION);
+
+  private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine =
+      stateMachineFactory.make(this, WorkerState.NEW);
+
+  public Worker(TajoRMContext rmContext, WorkerResource resource) {
+    this.rmContext = rmContext;
+
+    this.lastHeartbeatTime = System.currentTimeMillis();
+    this.resource = resource;
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.readLock = lock.readLock();
+    this.writeLock = lock.writeLock();
+  }
+
+  public String getWorkerId() {
+    return hostName + ":" + qmRpcPort + ":" + peerRpcPort;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String allocatedHost) {
+    this.hostName = allocatedHost;
+  }
+
+  public int getPeerRpcPort() {
+    return peerRpcPort;
+  }
+
+  public void setPeerRpcPort(int peerRpcPort) {
+    this.peerRpcPort = peerRpcPort;
+  }
+
+  public int getQueryMasterPort() {
+    return qmRpcPort;
+  }
+
+  public void setQueryMasterPort(int queryMasterPort) {
+    this.qmRpcPort = queryMasterPort;
+  }
+
+  public int getClientPort() {
+    return qmClientPort;
+  }
+
+  public void setClientPort(int clientPort) {
+    this.qmClientPort = clientPort;
+  }
+
+  public int getPullServerPort() {
+    return pullServerPort;
+  }
+
+  public void setPullServerPort(int pullServerPort) {
+    this.pullServerPort = pullServerPort;
+  }
+
+  public int getHttpPort() {
+    return httpInfoPort;
+  }
+
+  public void setHttpPort(int port) {
+    this.httpInfoPort = port;
+  }
+
+  public void setLastHeartbeatTime(long lastheartbeatReportTime) {
+    this.writeLock.lock();
+
+    try {
+      this.lastHeartbeatTime = lastheartbeatReportTime;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  public long getLastHeartbeatTime() {
+    this.readLock.lock();
+
+    try {
+      return this.lastHeartbeatTime;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   *
+   * @return the current state of worker
+   */
+  public WorkerState getState() {
+    this.readLock.lock();
+
+    try {
+      return this.stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   *
+   * @return the current resource capability of worker
+   */
+  public WorkerResource getResource() {
+    return this.resource;
+  }
+
+  @Override
+  public int compareTo(Worker o) {
+    if(o == null) {
+      return 1;
+    }
+    return getWorkerId().compareTo(o.getWorkerId());
+  }
+
+  public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+    @Override
+    public void transition(Worker worker, WorkerEvent workerEvent) {
+
+      if(worker.getResource().isQueryMasterMode()) {
+        worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId());
+      }
+      LOG.info("Worker with " + worker.getResource() + " is joined to Tajo cluster");
+    }
+  }
+
+  public static class StatusUpdateTransition implements
+      MultipleArcTransition<Worker, WorkerEvent, WorkerState> {
+
+    @Override
+    public WorkerState transition(Worker worker, WorkerEvent event) {
+      WorkerStatusEvent statusEvent = (WorkerStatusEvent) event;
+
+      // TODO - the synchronization scope using rmContext is too coarsen.
+      synchronized (worker.rmContext) {
+        worker.setLastHeartbeatTime(System.currentTimeMillis());
+        worker.getResource().setNumRunningTasks(statusEvent.getRunningTaskNum());
+        worker.getResource().setMaxHeap(statusEvent.maxHeap());
+        worker.getResource().setFreeHeap(statusEvent.getFreeHeap());
+        worker.getResource().setTotalHeap(statusEvent.getTotalHeap());
+      }
+
+      return WorkerState.RUNNING;
+    }
+  }
+
+  public static class DeactivateNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+    private final WorkerState finalState;
+
+    public DeactivateNodeTransition(WorkerState finalState) {
+      this.finalState = finalState;
+    }
+
+    @Override
+    public void transition(Worker worker, WorkerEvent workerEvent) {
+
+      worker.rmContext.getWorkers().remove(worker.getWorkerId());
+      LOG.info("Deactivating Node " + worker.getWorkerId() + " as it is now " + finalState);
+      worker.rmContext.getInactiveWorkers().putIfAbsent(worker.getWorkerId(), worker);
+    }
+  }
+
+  public static class ReconnectNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
+
+    @Override
+    public void transition(Worker worker, WorkerEvent workerEvent) {
+      WorkerReconnectEvent castedEvent = (WorkerReconnectEvent) workerEvent;
+
+      Worker newWorker = castedEvent.getWorker();
+      worker.rmContext.getWorkers().put(castedEvent.getWorkerId(), newWorker);
+      worker.rmContext.getDispatcher().getEventHandler().handle(
+          new WorkerEvent(worker.getWorkerId(), WorkerEventType.STARTED));
+    }
+  }
+
+  @Override
+  public void handle(WorkerEvent event) {
+    LOG.debug("Processing " + event.getWorkerId() + " of type " + event.getType());
+    try {
+      writeLock.lock();
+      WorkerState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Invalid event " + event.getType() + " on Worker  " + getWorkerId());
+      }
+      if (oldState != getState()) {
+        LOG.info(getWorkerId() + " Node Transitioned from " + oldState + " to " + getState());
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
new file mode 100644
index 0000000..389c3be
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * WorkerEvent describes all kinds of events which sent to {@link Worker}.
+ */
+public class WorkerEvent extends AbstractEvent<WorkerEventType> {
+  private final String workerId;
+
+  public WorkerEvent(String workerId, WorkerEventType workerEventType) {
+    super(workerEventType);
+    this.workerId = workerId;
+  }
+
+  public String getWorkerId() {
+    return workerId;
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
new file mode 100644
index 0000000..0c97654
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+public enum WorkerEventType {
+
+  /** Source : {@link TajoResourceTracker}, Destination: {@link Worker} */
+  STARTED,
+  STATE_UPDATE,
+  RECONNECTED,
+
+  /** Source : {@link WorkerLivelinessMonitor}, Destination: {@link Worker} */
+  EXPIRE
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
new file mode 100644
index 0000000..e3524d6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+/**
+ * It periodically checks the latest heartbeat time of {@link Worker}.
+ * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}.
+ */
+public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+  private EventHandler dispatcher;
+
+  public WorkerLivelinessMonitor(Dispatcher d) {
+    super(WorkerLivelinessMonitor.class.getSimpleName(), new SystemClock());
+    this.dispatcher = d.getEventHandler();
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf systemConf = (TajoConf) conf;
+    // milliseconds
+    int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.RESOURCE_TRACKER_HEARTBEAT_TIMEOUT);
+    setExpireInterval(expireIntvl);
+    setMonitorInterval(expireIntvl/3);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void expire(String id) {
+    dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE));
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
new file mode 100644
index 0000000..46f286d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and it's destination is {@link Worker}.
+ * This event occurs only when an inactive worker sends a ping again.
+ */
+public class WorkerReconnectEvent extends WorkerEvent {
+  private final Worker worker;
+  public WorkerReconnectEvent(String workerId, Worker worker) {
+    super(workerId, WorkerEventType.RECONNECTED);
+    this.worker = worker;
+  }
+
+  public Worker getWorker() {
+    return worker;
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerState.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerState.java
new file mode 100644
index 0000000..a941008
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * It presents the states of {@link Worker}.
+ */
+public enum WorkerState {
+  /** New worker */
+  NEW,
+
+  /** Running worker */
+  RUNNING,
+
+  /** Worker is unhealthy */
+  UNHEALTHY,
+
+  /** worker is out of service */
+  DECOMMISSIONED,
+
+  /** worker has not sent a heartbeat for some configured time threshold */
+  LOST;
+
+  @SuppressWarnings("unused")
+  public boolean isUnusable() {
+    return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
new file mode 100644
index 0000000..8c3d7c1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+/**
+ * {@link TajoResourceTracker} produces this event, and its destination is
+ * {@link org.apache.tajo.master.rm.Worker.StatusUpdateTransition} of {@link Worker}.
+ */
+public class WorkerStatusEvent extends WorkerEvent {
+  private final int runningTaskNum;
+  private final long maxHeap;
+  private final long freeHeap;
+  private final long totalHeap;
+
+  public WorkerStatusEvent(String workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
+    super(workerId, WorkerEventType.STATE_UPDATE);
+    this.runningTaskNum = runningTaskNum;
+    this.maxHeap = maxHeap;
+    this.freeHeap = freeHeap;
+    this.totalHeap = totalHeap;
+  }
+
+  public int getRunningTaskNum() {
+    return runningTaskNum;
+  }
+
+  public long maxHeap() {
+    return maxHeap;
+  }
+
+  public long getFreeHeap() {
+    return freeHeap;
+  }
+
+  public long getTotalHeap() {
+    return totalHeap;
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
new file mode 100644
index 0000000..3843b8f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.v2.DiskDeviceInfo;
+import org.apache.tajo.storage.v2.DiskMountInfo;
+import org.apache.tajo.storage.v2.DiskUtil;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+
+/**
+ * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
+ */
+public class WorkerHeartbeatService extends AbstractService {
+  /** class logger */
+  private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class);
+
+  private final TajoWorker.WorkerContext context;
+  private TajoConf systemConf;
+  private RpcConnectionPool connectionPool;
+  private WorkerHeartbeatThread thread;
+
+  public WorkerHeartbeatService(TajoWorker.WorkerContext context) {
+    super(WorkerHeartbeatService.class.getSimpleName());
+    this.context = context;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
+    this.systemConf = (TajoConf) conf;
+
+    connectionPool = RpcConnectionPool.getPool(systemConf);
+    thread = new WorkerHeartbeatThread();
+    thread.start();
+    super.init(conf);
+  }
+
+  @Override
+  public void serviceStop() {
+    thread.stopped.set(true);
+    synchronized (thread) {
+      thread.notifyAll();
+    }
+    super.stop();
+  }
+
+  class WorkerHeartbeatThread extends Thread {
+    private volatile AtomicBoolean stopped = new AtomicBoolean(false);
+    TajoMasterProtocol.ServerStatusProto.System systemInfo;
+    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
+        new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+    float workerDiskSlots;
+    int workerMemoryMB;
+    List<DiskDeviceInfo> diskDeviceInfos;
+
+    public WorkerHeartbeatThread() {
+      int workerCpuCoreNum;
+
+      boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED);
+      int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
+
+      try {
+        diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+
+      if(dedicatedResource) {
+        float dedicatedMemoryRatio = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO);
+        int totalMemory = getTotalMemoryMB();
+        workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
+        workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
+
+        if(diskDeviceInfos == null) {
+          workerDiskSlots = TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
+        } else {
+          workerDiskSlots = diskDeviceInfos.size();
+        }
+      } else {
+        workerMemoryMB = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+        workerCpuCoreNum = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+        workerDiskSlots = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+      }
+
+      systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+          .setAvailableProcessors(workerCpuCoreNum)
+          .setFreeMemoryMB(0)
+          .setMaxMemoryMB(0)
+          .setTotalMemoryMB(getTotalMemoryMB())
+          .build();
+    }
+
+    public void run() {
+      LOG.info("Worker Resource Heartbeat Thread start.");
+      int sendDiskInfoCount = 0;
+      int pullServerPort = 0;
+      if(context.getPullService()!= null) {
+        long startTime = System.currentTimeMillis();
+        while(true) {
+          pullServerPort = context.getPullService().getPort();
+          if(pullServerPort > 0) {
+            break;
+          }
+          //waiting while pull server init
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+          }
+          if(System.currentTimeMillis() - startTime > 30 * 1000) {
+            LOG.fatal("Too long push server init.");
+            System.exit(0);
+          }
+        }
+      }
+
+      String hostName = null;
+      int peerRpcPort = 0;
+      int queryMasterPort = 0;
+      int clientPort = 0;
+
+      if(context.getTajoWorkerManagerService() != null) {
+        hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
+        peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
+      }
+      if(context.getQueryMasterManagerService() != null) {
+        hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
+        queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
+      }
+      if(context.getTajoWorkerClientService() != null) {
+        clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
+      }
+      if (context.getPullService() != null) {
+        pullServerPort = context.getPullService().getPort();
+      }
+
+      while(!stopped.get()) {
+        if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
+          getDiskUsageInfos();
+        }
+        TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
+            TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+                .setMaxHeap(Runtime.getRuntime().maxMemory())
+                .setFreeHeap(Runtime.getRuntime().freeMemory())
+                .setTotalHeap(Runtime.getRuntime().totalMemory())
+                .build();
+
+        TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+            .addAllDisk(diskInfos)
+            .setRunningTaskNum(
+                context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
+            .setSystem(systemInfo)
+            .setDiskSlots(workerDiskSlots)
+            .setMemoryResourceMB(workerMemoryMB)
+            .setJvmHeap(jvmHeap)
+            .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode()))
+            .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode()))
+            .build();
+
+        NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
+            .setTajoWorkerHost(hostName)
+            .setTajoQueryMasterPort(queryMasterPort)
+            .setPeerRpcPort(peerRpcPort)
+            .setTajoWorkerClientPort(clientPort)
+            .setTajoWorkerHttpPort(context.getHttpPort())
+            .setTajoWorkerPullServerPort(pullServerPort)
+            .setServerStatus(serverStatus)
+            .build();
+
+        NettyClientBase rmClient = null;
+        try {
+          CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+              new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+
+          rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
+          TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
+          resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
+
+          TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+          if(response != null) {
+            TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+            if(clusterResourceSummary.getNumWorkers() > 0) {
+              context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
+            }
+            context.setClusterResource(clusterResourceSummary);
+          } else {
+            if(callBack.getController().failed()) {
+              throw new ServiceException(callBack.getController().errorText());
+            }
+          }
+        } catch (InterruptedException e) {
+          break;
+        } catch (TimeoutException te) {
+          LOG.warn("Heartbeat response is being delayed.");
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        } finally {
+          connectionPool.releaseConnection(rmClient);
+        }
+
+        try {
+          synchronized (WorkerHeartbeatThread.this){
+            wait(10 * 1000);
+          }
+        } catch (InterruptedException e) {
+          break;
+        }
+        sendDiskInfoCount++;
+
+        if(sendDiskInfoCount > 10) {
+          sendDiskInfoCount = 0;
+        }
+      }
+
+      LOG.info("Worker Resource Heartbeat Thread stopped.");
+    }
+
+    private void getDiskUsageInfos() {
+      diskInfos.clear();
+      for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
+        List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
+        if(mountInfos != null) {
+          for(DiskMountInfo eachMount: mountInfos) {
+            File eachFile = new File(eachMount.getMountPath());
+            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+                .setAbsolutePath(eachFile.getAbsolutePath())
+                .setTotalSpace(eachFile.getTotalSpace())
+                .setFreeSpace(eachFile.getFreeSpace())
+                .setUsableSpace(eachFile.getUsableSpace())
+                .build());
+          }
+        }
+      }
+    }
+  }
+
+  public static int getTotalMemoryMB() {
+    com.sun.management.OperatingSystemMXBean bean =
+        (com.sun.management.OperatingSystemMXBean)
+            java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+    long max = bean.getTotalPhysicalMemorySize();
+    return ((int) (max / (1024 * 1024)));
+  }
+}
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ResourceTrackerProtocol.proto
new file mode 100644
index 0000000..d46d09a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/ResourceTrackerProtocol.proto
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoResourceTrackerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "TajoMasterProtocol.proto";
+
+message NodeHeartbeat {
+  required string tajoWorkerHost = 1;
+  required int32 peerRpcPort = 2;
+  required int32 tajoQueryMasterPort = 3;
+  optional ServerStatusProto serverStatus = 4;
+  optional int32 tajoWorkerClientPort = 5;
+  optional string statusMessage = 6;
+  optional int32 tajoWorkerPullServerPort = 7;
+  optional int32 tajoWorkerHttpPort = 8;
+}
+
+service TajoResourceTrackerProtocolService {
+  rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse);
+}
\ No newline at end of file
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
new file mode 100644
index 0000000..af48fa6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestTajoResourceManager {
+  private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+  private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+  TajoConf tajoConf;
+
+  long queryIdTime = System.currentTimeMillis();
+  int numWorkers = 5;
+  float workerDiskSlots = 5.0f;
+  int workerMemoryMB = 512 * 10;
+  WorkerResourceAllocationResponse response;
+
+  private TajoWorkerResourceManager initResourceManager(boolean queryMasterMode) throws Exception {
+    tajoConf = new org.apache.tajo.conf.TajoConf();
+
+    tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
+    tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512);
+    tajoConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
+    TajoWorkerResourceManager tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf);
+    tajoWorkerResourceManager.init(tajoConf);
+    tajoWorkerResourceManager.start();
+
+    for(int i = 0; i < numWorkers; i++) {
+      ServerStatusProto.System system = ServerStatusProto.System.newBuilder()
+          .setAvailableProcessors(1)
+          .setFreeMemoryMB(workerMemoryMB)
+          .setMaxMemoryMB(workerMemoryMB)
+          .setTotalMemoryMB(workerMemoryMB)
+          .build();
+
+      ServerStatusProto.JvmHeap jvmHeap = ServerStatusProto.JvmHeap.newBuilder()
+          .setFreeHeap(workerMemoryMB)
+          .setMaxHeap(workerMemoryMB)
+          .setTotalHeap(workerMemoryMB)
+          .build();
+
+      ServerStatusProto.Disk disk = ServerStatusProto.Disk.newBuilder()
+          .setAbsolutePath("/")
+          .setFreeSpace(0)
+          .setTotalSpace(0)
+          .setUsableSpace(0)
+          .build();
+
+      List<ServerStatusProto.Disk> disks = new ArrayList<ServerStatusProto.Disk>();
+
+      disks.add(disk);
+
+      ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
+          .setQueryMasterMode(queryMasterMode ? BOOL_TRUE : BOOL_FALSE)
+          .setTaskRunnerMode(BOOL_TRUE)
+          .setDiskSlots(workerDiskSlots)
+          .setMemoryResourceMB(workerMemoryMB)
+          .setJvmHeap(jvmHeap)
+          .setSystem(system)
+          .addAllDisk(disks)
+          .setRunningTaskNum(0)
+          .build();
+
+      NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder()
+          .setTajoWorkerHost("host" + (i + 1))
+          .setTajoQueryMasterPort(21000)
+          .setTajoWorkerHttpPort(28080 + i)
+          .setPeerRpcPort(12345)
+          .setServerStatus(serverStatus)
+          .build();
+
+      tajoWorkerResourceManager.getResourceTracker().heartbeat(null, tajoHeartbeat, NullCallback.get());
+    }
+
+    return tajoWorkerResourceManager;
+  }
+
+
+  @Test
+  public void testHeartbeat() throws Exception {
+    TajoWorkerResourceManager tajoWorkerResourceManager = null;
+    try {
+      tajoWorkerResourceManager = initResourceManager(false);
+      assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
+      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+        WorkerResource resource = worker.getResource();
+        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+      }
+    } finally {
+      if (tajoWorkerResourceManager != null) {
+        tajoWorkerResourceManager.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testMemoryResource() throws Exception {
+    TajoWorkerResourceManager tajoWorkerResourceManager = null;
+    try {
+      tajoWorkerResourceManager = initResourceManager(false);
+
+      final int minMemory = 256;
+      final int maxMemory = 512;
+      float diskSlots = 1.0f;
+
+      QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 1);
+
+      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+          .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
+          .setNumContainers(60)
+          .setQueryId(queryId.getProto())
+          .setMaxDiskSlotPerContainer(diskSlots)
+          .setMinDiskSlotPerContainer(diskSlots)
+          .setMinMemoryMBPerContainer(minMemory)
+          .setMaxMemoryMBPerContainer(maxMemory)
+          .build();
+
+      final Object monitor = new Object();
+      final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+
+
+      RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+
+        @Override
+        public void run(WorkerResourceAllocationResponse response) {
+          TestTajoResourceManager.this.response = response;
+          synchronized(monitor) {
+            monitor.notifyAll();
+          }
+        }
+      };
+
+      tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+      synchronized(monitor) {
+        monitor.wait();
+      }
+
+
+      // assert after callback
+      int totalUsedMemory = 0;
+      int totalUsedDisks = 0;
+
+      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+        WorkerResource resource = worker.getResource();
+        assertEquals(0, resource.getAvailableMemoryMB());
+        assertEquals(0, resource.getAvailableDiskSlots(), 0);
+        assertEquals(5.0f, resource.getUsedDiskSlots(), 0);
+
+        totalUsedMemory += resource.getUsedMemoryMB();
+        totalUsedDisks += resource.getUsedDiskSlots();
+      }
+
+      assertEquals(workerMemoryMB * numWorkers, totalUsedMemory);
+      assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
+
+      assertEquals(numWorkers * 10, response.getWorkerAllocatedResourceList().size());
+
+      for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
+        assertTrue(
+            eachResource.getAllocatedMemoryMB() >= minMemory &&  eachResource.getAllocatedMemoryMB() <= maxMemory);
+        containerIds.add(eachResource.getContainerId());
+      }
+
+      for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+        tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
+      }
+
+      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+        WorkerResource resource = worker.getResource();
+        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+        assertEquals(0, resource.getUsedMemoryMB());
+
+        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+        assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+      }
+    } finally {
+      if (tajoWorkerResourceManager != null) {
+        tajoWorkerResourceManager.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testMemoryNotCommensurable() throws Exception {
+    TajoWorkerResourceManager tajoWorkerResourceManager = null;
+
+    try {
+      tajoWorkerResourceManager = initResourceManager(false);
+
+      final int minMemory = 200;
+      final int maxMemory = 500;
+      float diskSlots = 1.0f;
+
+      QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 2);
+
+      int requiredContainers = 60;
+
+      int numAllocatedContainers = 0;
+
+      int loopCount = 0;
+      while(true) {
+        WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+            .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
+            .setNumContainers(requiredContainers - numAllocatedContainers)
+            .setQueryId(queryId.getProto())
+            .setMaxDiskSlotPerContainer(diskSlots)
+            .setMinDiskSlotPerContainer(diskSlots)
+            .setMinMemoryMBPerContainer(minMemory)
+            .setMaxMemoryMBPerContainer(maxMemory)
+            .build();
+
+        final Object monitor = new Object();
+
+        RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+          @Override
+          public void run(WorkerResourceAllocationResponse response) {
+            TestTajoResourceManager.this.response = response;
+            synchronized(monitor) {
+              monitor.notifyAll();
+            }
+          }
+        };
+
+        tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+        synchronized(monitor) {
+          monitor.wait();
+        }
+
+        numAllocatedContainers += TestTajoResourceManager.this.response.getWorkerAllocatedResourceList().size();
+
+        //release resource
+        for(WorkerAllocatedResource eachResource:
+            TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) {
+          assertTrue(
+              eachResource.getAllocatedMemoryMB() >= minMemory &&  eachResource.getAllocatedMemoryMB() <= maxMemory);
+          tajoWorkerResourceManager.releaseWorkerResource(eachResource.getContainerId());
+        }
+
+        for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+          WorkerResource resource = worker.getResource();
+          assertEquals(0, resource.getUsedMemoryMB());
+          assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+
+          assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+          assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+        }
+
+        loopCount++;
+
+        if(loopCount == 2) {
+          assertEquals(requiredContainers, numAllocatedContainers);
+          break;
+        }
+      }
+
+      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+        WorkerResource resource = worker.getResource();
+        assertEquals(0, resource.getUsedMemoryMB());
+        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+
+        assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+      }
+    } finally {
+      if (tajoWorkerResourceManager != null) {
+        tajoWorkerResourceManager.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testDiskResource() throws Exception {
+    TajoWorkerResourceManager tajoWorkerResourceManager = null;
+
+    try {
+      tajoWorkerResourceManager = initResourceManager(false);
+
+      final float minDiskSlots = 1.0f;
+      final float maxDiskSlots = 2.0f;
+      int memoryMB = 256;
+
+      QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
+
+      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
+          .setResourceRequestPriority(ResourceRequestPriority.DISK)
+          .setNumContainers(60)
+          .setQueryId(queryId.getProto())
+          .setMaxDiskSlotPerContainer(maxDiskSlots)
+          .setMinDiskSlotPerContainer(minDiskSlots)
+          .setMinMemoryMBPerContainer(memoryMB)
+          .setMaxMemoryMBPerContainer(memoryMB)
+          .build();
+
+      final Object monitor = new Object();
+      final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+
+
+      RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
+
+        @Override
+        public void run(WorkerResourceAllocationResponse response) {
+          TestTajoResourceManager.this.response = response;
+          synchronized(monitor) {
+            monitor.notifyAll();
+          }
+        }
+      };
+
+      tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
+      synchronized(monitor) {
+        monitor.wait();
+      }
+      for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) {
+        assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(),
+            eachResource.getAllocatedDiskSlots() >= minDiskSlots &&
+                eachResource.getAllocatedDiskSlots() <= maxDiskSlots);
+        containerIds.add(eachResource.getContainerId());
+      }
+
+      // assert after callback
+      int totalUsedDisks = 0;
+      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+        WorkerResource resource = worker.getResource();
+        //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
+        assertEquals(0, resource.getAvailableDiskSlots(), 0);
+        assertEquals(5.0f, resource.getUsedDiskSlots(), 0);
+        assertEquals(256 * 3, resource.getUsedMemoryMB());
+
+        totalUsedDisks += resource.getUsedDiskSlots();
+      }
+
+      assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
+
+      assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size());
+
+      for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+        tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
+      }
+
+      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
+        WorkerResource resource = worker.getResource();
+        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
+        assertEquals(0, resource.getUsedMemoryMB());
+
+        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
+        assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
+      }
+    } finally {
+      if (tajoWorkerResourceManager != null) {
+        tajoWorkerResourceManager.stop();
+      }
+    }
+  }
+}