blob: 3843b8f303b6138fa979314bc9002755e0f2082d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.worker;
import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.v2.DiskDeviceInfo;
import org.apache.tajo.storage.v2.DiskMountInfo;
import org.apache.tajo.storage.v2.DiskUtil;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
/**
* It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
*/
public class WorkerHeartbeatService extends AbstractService {
/** class logger */
private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class);
private final TajoWorker.WorkerContext context;
private TajoConf systemConf;
private RpcConnectionPool connectionPool;
private WorkerHeartbeatThread thread;
public WorkerHeartbeatService(TajoWorker.WorkerContext context) {
super(WorkerHeartbeatService.class.getSimpleName());
this.context = context;
}
@Override
public void serviceInit(Configuration conf) {
Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
this.systemConf = (TajoConf) conf;
connectionPool = RpcConnectionPool.getPool(systemConf);
thread = new WorkerHeartbeatThread();
thread.start();
super.init(conf);
}
@Override
public void serviceStop() {
thread.stopped.set(true);
synchronized (thread) {
thread.notifyAll();
}
super.stop();
}
class WorkerHeartbeatThread extends Thread {
private volatile AtomicBoolean stopped = new AtomicBoolean(false);
TajoMasterProtocol.ServerStatusProto.System systemInfo;
List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
float workerDiskSlots;
int workerMemoryMB;
List<DiskDeviceInfo> diskDeviceInfos;
public WorkerHeartbeatThread() {
int workerCpuCoreNum;
boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED);
int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
try {
diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
if(dedicatedResource) {
float dedicatedMemoryRatio = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO);
int totalMemory = getTotalMemoryMB();
workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
if(diskDeviceInfos == null) {
workerDiskSlots = TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
} else {
workerDiskSlots = diskDeviceInfos.size();
}
} else {
workerMemoryMB = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
workerCpuCoreNum = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
workerDiskSlots = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
}
systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
.setAvailableProcessors(workerCpuCoreNum)
.setFreeMemoryMB(0)
.setMaxMemoryMB(0)
.setTotalMemoryMB(getTotalMemoryMB())
.build();
}
public void run() {
LOG.info("Worker Resource Heartbeat Thread start.");
int sendDiskInfoCount = 0;
int pullServerPort = 0;
if(context.getPullService()!= null) {
long startTime = System.currentTimeMillis();
while(true) {
pullServerPort = context.getPullService().getPort();
if(pullServerPort > 0) {
break;
}
//waiting while pull server init
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if(System.currentTimeMillis() - startTime > 30 * 1000) {
LOG.fatal("Too long push server init.");
System.exit(0);
}
}
}
String hostName = null;
int peerRpcPort = 0;
int queryMasterPort = 0;
int clientPort = 0;
if(context.getTajoWorkerManagerService() != null) {
hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
}
if(context.getQueryMasterManagerService() != null) {
hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
}
if(context.getTajoWorkerClientService() != null) {
clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
}
if (context.getPullService() != null) {
pullServerPort = context.getPullService().getPort();
}
while(!stopped.get()) {
if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
getDiskUsageInfos();
}
TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
.setMaxHeap(Runtime.getRuntime().maxMemory())
.setFreeHeap(Runtime.getRuntime().freeMemory())
.setTotalHeap(Runtime.getRuntime().totalMemory())
.build();
TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
.addAllDisk(diskInfos)
.setRunningTaskNum(
context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
.setSystem(systemInfo)
.setDiskSlots(workerDiskSlots)
.setMemoryResourceMB(workerMemoryMB)
.setJvmHeap(jvmHeap)
.setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode()))
.setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode()))
.build();
NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
.setTajoWorkerHost(hostName)
.setTajoQueryMasterPort(queryMasterPort)
.setPeerRpcPort(peerRpcPort)
.setTajoWorkerClientPort(clientPort)
.setTajoWorkerHttpPort(context.getHttpPort())
.setTajoWorkerPullServerPort(pullServerPort)
.setServerStatus(serverStatus)
.build();
NettyClientBase rmClient = null;
try {
CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
if(response != null) {
TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
if(clusterResourceSummary.getNumWorkers() > 0) {
context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
}
context.setClusterResource(clusterResourceSummary);
} else {
if(callBack.getController().failed()) {
throw new ServiceException(callBack.getController().errorText());
}
}
} catch (InterruptedException e) {
break;
} catch (TimeoutException te) {
LOG.warn("Heartbeat response is being delayed.");
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
connectionPool.releaseConnection(rmClient);
}
try {
synchronized (WorkerHeartbeatThread.this){
wait(10 * 1000);
}
} catch (InterruptedException e) {
break;
}
sendDiskInfoCount++;
if(sendDiskInfoCount > 10) {
sendDiskInfoCount = 0;
}
}
LOG.info("Worker Resource Heartbeat Thread stopped.");
}
private void getDiskUsageInfos() {
diskInfos.clear();
for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
if(mountInfos != null) {
for(DiskMountInfo eachMount: mountInfos) {
File eachFile = new File(eachMount.getMountPath());
diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
.setAbsolutePath(eachFile.getAbsolutePath())
.setTotalSpace(eachFile.getTotalSpace())
.setFreeSpace(eachFile.getFreeSpace())
.setUsableSpace(eachFile.getUsableSpace())
.build());
}
}
}
}
}
public static int getTotalMemoryMB() {
com.sun.management.OperatingSystemMXBean bean =
(com.sun.management.OperatingSystemMXBean)
java.lang.management.ManagementFactory.getOperatingSystemMXBean();
long max = bean.getTotalPhysicalMemorySize();
return ((int) (max / (1024 * 1024)));
}
}