blob: 27fb31e8d451b67285f784da77f2b84878219b9e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
/**
* This class is deprecated and required only to support {@link org.apache.druid.indexing.overlord.RemoteTaskRunner}.
* {@link org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner} should be used instead.
*
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
* created that waits for new tasks. Tasks are executed as soon as they are seen.
*/
@Deprecated
public class WorkerTaskMonitor extends WorkerTaskManager
{
private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
private final ObjectMapper jsonMapper;
private final PathChildrenCache pathChildrenCache;
private final CuratorFramework cf;
private final WorkerCuratorCoordinator workerCuratorCoordinator;
private final Object lifecycleLock = new Object();
private volatile boolean started = false;
@Inject
public WorkerTaskMonitor(
ObjectMapper jsonMapper,
TaskRunner taskRunner,
TaskConfig taskConfig,
CuratorFramework cf,
WorkerCuratorCoordinator workerCuratorCoordinator,
@IndexingService DruidLeaderClient overlordClient
)
{
super(jsonMapper, taskRunner, taskConfig, overlordClient);
this.jsonMapper = jsonMapper;
this.pathChildrenCache = new PathChildrenCache(
cf,
workerCuratorCoordinator.getTaskPathForWorker(),
false,
true,
Execs.makeThreadFactory("TaskMonitorCache-%s")
);
this.cf = cf;
this.workerCuratorCoordinator = workerCuratorCoordinator;
}
/**
* Register a monitor for new tasks. When new tasks appear, the worker node announces a status to indicate it has
* started the task. When the task is complete, the worker node updates the status.
*/
@LifecycleStart
@Override
public void start() throws Exception
{
super.start();
synchronized (lifecycleLock) {
Preconditions.checkState(!started, "already started");
started = true;
try {
cleanupStaleAnnouncements();
registerRunListener();
pathChildrenCache.start();
log.debug("Started WorkerTaskMonitor.");
started = true;
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
log.makeAlert(e, "Exception starting WorkerTaskMonitor")
.emit();
throw e;
}
}
}
private void cleanupStaleAnnouncements() throws Exception
{
synchronized (lock) {
// cleanup any old running task announcements which are invalid after restart
for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()) {
if (announcement.getTaskStatus().isRunnable()) {
TaskStatus completionStatus = null;
TaskAnnouncement completedAnnouncement = completedTasks.get(announcement.getTaskId());
if (completedAnnouncement != null) {
completionStatus = completedAnnouncement.getTaskStatus();
} else if (!runningTasks.containsKey(announcement.getTaskStatus().getId())) {
completionStatus = TaskStatus.failure(
announcement.getTaskStatus().getId(),
"Canceled as unknown task. See middleManager or indexer logs for more details."
);
}
if (completionStatus != null) {
log.info(
"Cleaning up stale announcement for task [%s]. New status is [%s].",
announcement.getTaskStatus().getId(),
completionStatus.getStatusCode()
);
workerCuratorCoordinator.updateTaskStatusAnnouncement(
TaskAnnouncement.create(
announcement.getTaskStatus().getId(),
announcement.getTaskType(),
announcement.getTaskResource(),
completionStatus,
TaskLocation.unknown(),
announcement.getTaskDataSource()
)
);
}
}
}
}
}
private void registerRunListener()
{
pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
throws Exception
{
if (CuratorUtils.isChildAdded(event)) {
final Task task = jsonMapper.readValue(
cf.getData().forPath(event.getData().getPath()),
Task.class
);
assignTask(task);
}
}
}
);
}
@LifecycleStop
@Override
public void stop() throws Exception
{
super.stop();
synchronized (lifecycleLock) {
Preconditions.checkState(started, "not started");
try {
started = false;
pathChildrenCache.close();
log.debug("Stopped WorkerTaskMonitor.");
}
catch (Exception e) {
log.makeAlert(e, "Exception stopping WorkerTaskMonitor")
.emit();
}
}
}
@Override
protected void taskStarted(String taskId)
{
try {
workerCuratorCoordinator.removeTaskRunZnode(taskId);
}
catch (Exception ex) {
log.error(ex, "Unknown exception while deleting task[%s] znode.", taskId);
}
}
@Override
protected void taskAnnouncementChanged(TaskAnnouncement announcement)
{
try {
workerCuratorCoordinator.updateTaskStatusAnnouncement(announcement);
}
catch (Exception ex) {
log.makeAlert(ex, "Failed to update task announcement")
.addData("task", announcement.getTaskId())
.emit();
}
}
}