| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.worker; |
| |
| import com.codahale.metrics.Gauge; |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.shell.PathData; |
| import org.apache.hadoop.service.CompositeService; |
| import org.apache.hadoop.yarn.event.AsyncDispatcher; |
| import org.apache.hadoop.yarn.util.RackResolver; |
| import org.apache.tajo.TajoConstants; |
| import org.apache.tajo.catalog.CatalogClient; |
| import org.apache.tajo.catalog.CatalogService; |
| import org.apache.tajo.catalog.FunctionDesc; |
| import org.apache.tajo.conf.TajoConf; |
| import org.apache.tajo.engine.function.FunctionLoader; |
| import org.apache.tajo.function.FunctionSignature; |
| import org.apache.tajo.master.cluster.WorkerConnectionInfo; |
| import org.apache.tajo.metrics.Node; |
| import org.apache.tajo.plan.function.python.PythonScriptEngine; |
| import org.apache.tajo.pullserver.TajoPullServerService; |
| import org.apache.tajo.querymaster.QueryMaster; |
| import org.apache.tajo.querymaster.QueryMasterManagerService; |
| import org.apache.tajo.rpc.RpcClientManager; |
| import org.apache.tajo.rpc.RpcConstants; |
| import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; |
| import org.apache.tajo.rule.EvaluationContext; |
| import org.apache.tajo.rule.EvaluationFailedException; |
| import org.apache.tajo.rule.SelfDiagnosisRuleEngine; |
| import org.apache.tajo.rule.SelfDiagnosisRuleSession; |
| import org.apache.tajo.service.ServiceTracker; |
| import org.apache.tajo.service.ServiceTrackerFactory; |
| import org.apache.tajo.service.TajoMasterInfo; |
| import org.apache.tajo.storage.HashShuffleAppenderManager; |
| import org.apache.tajo.storage.OldStorageManager; |
| import org.apache.tajo.util.*; |
| import org.apache.tajo.util.history.HistoryReader; |
| import org.apache.tajo.util.history.HistoryWriter; |
| import org.apache.tajo.util.metrics.TajoSystemMetrics; |
| import org.apache.tajo.webapp.StaticHttpServer; |
| |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.Writer; |
| import java.lang.management.ManagementFactory; |
| import java.lang.management.ThreadInfo; |
| import java.lang.management.ThreadMXBean; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import static org.apache.tajo.conf.TajoConf.ConfVars; |
| |
| public class TajoWorker extends CompositeService { |
| public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build(); |
| public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build(); |
| public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build(); |
| public static final int SHUTDOWN_HOOK_PRIORITY = 30; |
| |
| private static final Log LOG = LogFactory.getLog(TajoWorker.class); |
| |
| private TajoConf systemConf; |
| |
| private StaticHttpServer webServer; |
| |
| private TajoWorkerClientService tajoWorkerClientService; |
| |
| private QueryMasterManagerService queryMasterManagerService; |
| |
| private TajoWorkerManagerService tajoWorkerManagerService; |
| |
| private TajoMasterInfo tajoMasterInfo; |
| |
| private CatalogClient catalogClient; |
| |
| private WorkerContext workerContext; |
| |
| private TaskManager taskManager; |
| |
| private TaskExecutor taskExecutor; |
| |
| private TajoPullServerService pullService; |
| |
| private ServiceTracker serviceTracker; |
| |
| private NodeResourceManager nodeResourceManager; |
| |
| private NodeStatusUpdater nodeStatusUpdater; |
| |
| private AtomicBoolean stopped = new AtomicBoolean(false); |
| |
| private WorkerConnectionInfo connectionInfo; |
| |
| private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); |
| |
| private String[] cmdArgs; |
| |
| private DeletionService deletionService; |
| |
| private TajoSystemMetrics workerSystemMetrics; |
| |
| private HashShuffleAppenderManager hashShuffleAppenderManager; |
| |
| private AsyncDispatcher dispatcher; |
| |
| private LocalDirAllocator lDirAllocator; |
| |
| private JvmPauseMonitor pauseMonitor; |
| |
| private HistoryWriter taskHistoryWriter; |
| |
| private HistoryReader historyReader; |
| |
| public TajoWorker() throws Exception { |
| super(TajoWorker.class.getName()); |
| } |
| |
| public void startWorker(TajoConf systemConf, String[] args) { |
| this.systemConf = systemConf; |
| this.cmdArgs = args; |
| init(systemConf); |
| start(); |
| } |
| |
| |
| @Override |
| public void serviceInit(Configuration conf) throws Exception { |
| ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY); |
| |
| this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); |
| RackResolver.init(systemConf); |
| |
| serviceTracker = ServiceTrackerFactory.get(systemConf); |
| |
| this.workerContext = new TajoWorkerContext(); |
| this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); |
| |
| |
| int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort(); |
| int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort(); |
| int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort(); |
| |
| |
| this.dispatcher = new AsyncDispatcher(); |
| addIfService(dispatcher); |
| |
| tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort); |
| addIfService(tajoWorkerManagerService); |
| |
| // querymaster worker |
| tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort); |
| addIfService(tajoWorkerClientService); |
| |
| queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort); |
| addIfService(queryMasterManagerService); |
| |
| this.taskManager = new TaskManager(dispatcher, workerContext); |
| addService(taskManager); |
| |
| this.taskExecutor = new TaskExecutor(workerContext); |
| addService(taskExecutor); |
| |
| AsyncDispatcher rmDispatcher = new AsyncDispatcher(); |
| addService(rmDispatcher); |
| this.nodeResourceManager = new NodeResourceManager(rmDispatcher, workerContext); |
| addService(nodeResourceManager); |
| |
| this.nodeStatusUpdater = new NodeStatusUpdater(workerContext); |
| addService(nodeStatusUpdater); |
| |
| int httpPort = 0; |
| if(!TajoPullServerService.isStandalone()) { |
| pullService = new TajoPullServerService(); |
| addIfService(pullService); |
| } |
| |
| if (!systemConf.getBoolVar(ConfVars.$TEST_MODE)) { |
| httpPort = initWebServer(); |
| } |
| |
| super.serviceInit(conf); |
| |
| int pullServerPort; |
| if(pullService != null){ |
| pullServerPort = pullService.getPort(); |
| } else { |
| pullServerPort = getStandAlonePullServerPort(); |
| } |
| |
| this.connectionInfo = new WorkerConnectionInfo( |
| tajoWorkerManagerService.getBindAddr().getHostName(), |
| tajoWorkerManagerService.getBindAddr().getPort(), |
| pullServerPort, |
| tajoWorkerClientService.getBindAddr().getPort(), |
| queryMasterManagerService.getBindAddr().getPort(), |
| httpPort); |
| |
| LOG.info("Tajo Worker is initialized." + " connection :" + connectionInfo.toString()); |
| |
| try { |
| hashShuffleAppenderManager = new HashShuffleAppenderManager(systemConf); |
| } catch (IOException e) { |
| LOG.fatal(e.getMessage(), e); |
| System.exit(-1); |
| } |
| |
| taskHistoryWriter = new HistoryWriter(workerContext.getWorkerName(), false); |
| addIfService(taskHistoryWriter); |
| taskHistoryWriter.init(conf); |
| |
| historyReader = new HistoryReader(workerContext.getWorkerName(), this.systemConf); |
| |
| FunctionLoader.loadUserDefinedFunctions(systemConf, new HashMap<FunctionSignature, FunctionDesc>()); |
| |
| PythonScriptEngine.initPythonScriptEngineFiles(); |
| |
| diagnoseTajoWorker(); |
| } |
| |
| private void initWorkerMetrics() { |
| workerSystemMetrics = new TajoSystemMetrics(systemConf, Node.class, workerContext.getWorkerName()); |
| workerSystemMetrics.start(); |
| |
| workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, new Gauge<Integer>() { |
| @Override |
| public Integer getValue() { |
| if(queryMasterManagerService != null) { |
| return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size(); |
| } else { |
| return 0; |
| } |
| } |
| }); |
| |
| workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, new Gauge<Integer>() { |
| @Override |
| public Integer getValue() { |
| if(taskExecutor != null) { |
| return taskExecutor.getRunningTasks(); |
| } else { |
| return 0; |
| } |
| } |
| }); |
| } |
| |
| private int initWebServer() { |
| int httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort(); |
| try { |
| webServer = StaticHttpServer.getInstance(this, "worker", null, httpPort, |
| true, null, systemConf, null); |
| webServer.start(); |
| httpPort = webServer.getPort(); |
| LOG.info("Worker info server started:" + httpPort); |
| } catch (IOException e) { |
| LOG.error(e.getMessage(), e); |
| } |
| return httpPort; |
| } |
| |
| private void initCleanupService() throws IOException { |
| String[] localDirs = systemConf.getVar(ConfVars.WORKER_TEMPORAL_DIR).trim().split("\\s*,\\s*"); |
| deletionService = new DeletionService(localDirs.length, 0); |
| if (systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)) { |
| getWorkerContext().cleanupTemporalDirectories(); |
| } |
| } |
| |
| private void diagnoseTajoWorker() throws EvaluationFailedException { |
| SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); |
| SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); |
| EvaluationContext context = new EvaluationContext(); |
| |
| context.addParameter(TajoConf.class.getName(), systemConf); |
| |
| ruleSession.withCategoryNames("base", "worker").fireRules(context); |
| } |
| |
| private void startJvmPauseMonitor(){ |
| pauseMonitor = new JvmPauseMonitor(systemConf); |
| pauseMonitor.start(); |
| } |
| |
| public WorkerContext getWorkerContext() { |
| return workerContext; |
| } |
| |
| @Override |
| public void serviceStart() throws Exception { |
| startJvmPauseMonitor(); |
| |
| tajoMasterInfo = new TajoMasterInfo(); |
| |
| if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { |
| tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress()); |
| tajoMasterInfo.setWorkerResourceTrackerAddr(serviceTracker.getResourceTrackerAddress()); |
| } else { |
| tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars |
| .TAJO_MASTER_UMBILICAL_RPC_ADDRESS))); |
| tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars |
| .RESOURCE_TRACKER_RPC_ADDRESS))); |
| } |
| connectToCatalog(); |
| |
| if (!systemConf.getBoolVar(ConfVars.$TEST_MODE)) { |
| initCleanupService(); |
| } |
| |
| initWorkerMetrics(); |
| super.serviceStart(); |
| LOG.info("Tajo Worker is started"); |
| } |
| |
| @Override |
| public void serviceStop() throws Exception { |
| if(stopped.getAndSet(true)) { |
| return; |
| } |
| |
| if(webServer != null) { |
| try { |
| webServer.stop(); |
| } catch (Throwable e) { |
| LOG.error(e.getMessage(), e); |
| } |
| } |
| |
| if (catalogClient != null) { |
| catalogClient.close(); |
| } |
| |
| if(webServer != null && webServer.isAlive()) { |
| try { |
| webServer.stop(); |
| } catch (Throwable e) { |
| } |
| } |
| |
| try { |
| OldStorageManager.shutdown(); |
| } catch (IOException ie) { |
| LOG.error(ie.getMessage(), ie); |
| } |
| |
| if(workerSystemMetrics != null) { |
| workerSystemMetrics.stop(); |
| } |
| |
| if(deletionService != null) deletionService.stop(); |
| |
| if(pauseMonitor != null) pauseMonitor.stop(); |
| super.serviceStop(); |
| LOG.info("TajoWorker main thread exiting"); |
| } |
| |
| public interface WorkerContext { |
| QueryMaster getQueryMaster(); |
| |
| TajoConf getConf(); |
| |
| ServiceTracker getServiceTracker(); |
| |
| QueryMasterManagerService getQueryMasterManagerService(); |
| |
| TaskManager getTaskManager(); |
| |
| TaskExecutor getTaskExecuor(); |
| |
| NodeResourceManager getNodeResourceManager(); |
| |
| CatalogService getCatalog(); |
| |
| WorkerConnectionInfo getConnectionInfo(); |
| |
| String getWorkerName(); |
| |
| LocalDirAllocator getLocalDirAllocator(); |
| |
| TajoSystemMetrics getMetrics(); |
| |
| HashShuffleAppenderManager getHashShuffleAppenderManager(); |
| |
| HistoryWriter getTaskHistoryWriter(); |
| |
| HistoryReader getHistoryReader(); |
| |
| void cleanup(String strPath); |
| |
| void cleanupTemporalDirectories(); |
| } |
| |
| class TajoWorkerContext implements WorkerContext { |
| public QueryMaster getQueryMaster() { |
| if (queryMasterManagerService == null) { |
| return null; |
| } |
| return queryMasterManagerService.getQueryMaster(); |
| } |
| |
| public TajoConf getConf() { |
| return systemConf; |
| } |
| |
| public ServiceTracker getServiceTracker() { |
| return serviceTracker; |
| } |
| |
| public QueryMasterManagerService getQueryMasterManagerService() { |
| return queryMasterManagerService; |
| } |
| |
| @Override |
| public TaskManager getTaskManager(){ |
| return taskManager; |
| } |
| |
| @Override |
| public TaskExecutor getTaskExecuor() { |
| return taskExecutor; |
| } |
| |
| @Override |
| public NodeResourceManager getNodeResourceManager() { |
| return nodeResourceManager; |
| } |
| |
| public CatalogService getCatalog() { |
| return catalogClient; |
| } |
| |
| public TajoPullServerService getPullService() { |
| return pullService; |
| } |
| |
| public WorkerConnectionInfo getConnectionInfo() { |
| return connectionInfo; |
| } |
| |
| public String getWorkerName() { |
| return connectionInfo.getHostAndPeerRpcPort(); |
| } |
| |
| public LocalDirAllocator getLocalDirAllocator(){ |
| return lDirAllocator; |
| } |
| |
| public void cleanup(String strPath) { |
| if (deletionService == null) return; |
| |
| LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); |
| |
| try { |
| Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, systemConf); |
| FileSystem localFS = FileSystem.getLocal(systemConf); |
| for (Path path : iter) { |
| deletionService.delete(localFS.makeQualified(path)); |
| } |
| } catch (IOException e) { |
| LOG.error(e.getMessage(), e); |
| } |
| } |
| |
| public void cleanupTemporalDirectories() { |
| if (deletionService == null) return; |
| |
| LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); |
| |
| try { |
| Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", systemConf); |
| FileSystem localFS = FileSystem.getLocal(systemConf); |
| for (Path path : iter) { |
| PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new Path(path, "*")).toString(), systemConf); |
| |
| ArrayList<Path> paths = new ArrayList<Path>(); |
| for (PathData pd : items) { |
| paths.add(pd.path); |
| } |
| if (paths.size() == 0) continue; |
| |
| deletionService.delete(null, paths.toArray(new Path[paths.size()])); |
| } |
| } catch (IOException e) { |
| LOG.error(e.getMessage(), e); |
| } |
| } |
| |
| public TajoSystemMetrics getMetrics() { |
| return workerSystemMetrics; |
| } |
| |
| public HashShuffleAppenderManager getHashShuffleAppenderManager() { |
| return hashShuffleAppenderManager; |
| } |
| |
| public HistoryWriter getTaskHistoryWriter() { |
| return taskHistoryWriter; |
| } |
| |
| public HistoryReader getHistoryReader() { |
| return historyReader; |
| } |
| } |
| |
| private int getStandAlonePullServerPort() { |
| long startTime = System.currentTimeMillis(); |
| int pullServerPort; |
| |
| //wait for pull server bring up |
| while (true) { |
| pullServerPort = TajoPullServerService.readPullServerPort(); |
| if (pullServerPort > 0) { |
| break; |
| } |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| } |
| if (System.currentTimeMillis() - startTime > 30 * 1000) { |
| LOG.fatal("TajoWorker stopped cause can't get PullServer port."); |
| System.exit(-1); |
| } |
| } |
| return pullServerPort; |
| } |
| |
| @VisibleForTesting |
| public void stopWorkerForce() { |
| stop(); |
| } |
| |
| private void connectToCatalog() { |
| try { |
| catalogClient = new CatalogClient(systemConf); |
| } catch (IOException e) { |
| LOG.error(e.getMessage(), e); |
| } |
| } |
| |
| private class ShutdownHook implements Runnable { |
| @Override |
| public void run() { |
| if(!stopped.get()) { |
| LOG.info("============================================"); |
| LOG.info("TajoWorker received SIGINT Signal"); |
| LOG.info("============================================"); |
| stop(); |
| RpcClientManager.shutdown(); |
| } |
| } |
| } |
| |
| String getThreadTaskName(long id, String name) { |
| if (name == null) { |
| return Long.toString(id); |
| } |
| return id + " (" + name + ")"; |
| } |
| |
| public void dumpThread(Writer writer) { |
| PrintWriter stream = new PrintWriter(writer); |
| int STACK_DEPTH = 20; |
| boolean contention = threadBean.isThreadContentionMonitoringEnabled(); |
| long[] threadIds = threadBean.getAllThreadIds(); |
| stream.println("Process Thread Dump: Tajo Worker"); |
| stream.println(threadIds.length + " active threads"); |
| for (long tid : threadIds) { |
| ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH); |
| if (info == null) { |
| stream.println(" Inactive"); |
| continue; |
| } |
| stream.println("Thread " + getThreadTaskName(info.getThreadId(), info.getThreadName()) + ":"); |
| Thread.State state = info.getThreadState(); |
| stream.println(" State: " + state + ", Blocked count: " + info.getBlockedCount() + |
| ", Waited count: " + info.getWaitedCount()); |
| if (contention) { |
| stream.println(" Blocked time: " + info.getBlockedTime() + ", Waited time: " + info.getWaitedTime()); |
| } |
| if (state == Thread.State.WAITING) { |
| stream.println(" Waiting on " + info.getLockName()); |
| } else if (state == Thread.State.BLOCKED) { |
| stream.println(" Blocked on " + info.getLockName() + |
| ", Blocked by " + getThreadTaskName(info.getLockOwnerId(), info.getLockOwnerName())); |
| } |
| stream.println(" Stack:"); |
| for (StackTraceElement frame : info.getStackTrace()) { |
| stream.println(" " + frame.toString()); |
| } |
| stream.println(""); |
| } |
| } |
| |
| public static void main(String[] args) throws Exception { |
| Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler()); |
| StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG); |
| |
| TajoConf tajoConf = new TajoConf(); |
| tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME)); |
| |
| try { |
| TajoWorker tajoWorker = new TajoWorker(); |
| tajoWorker.startWorker(tajoConf, args); |
| } catch (Throwable t) { |
| LOG.fatal("Error starting TajoWorker", t); |
| System.exit(-1); |
| } |
| } |
| } |