blob: 7b12c0dc381eddc8c1c59b3c5213201d23089a51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.NoopDataSegmentMover;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
public class SingleTaskBackgroundRunnerTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private SingleTaskBackgroundRunner runner;
@Before
public void setup() throws IOException
{
final TestUtils utils = new TestUtils();
final DruidNode node = new DruidNode("testServer", "testHost", false, 1000, null, true, false);
final TaskConfig taskConfig = new TaskConfig(
temporaryFolder.newFile().toString(),
null,
null,
50000,
null,
true,
null,
null,
null,
false,
false
);
final ServiceEmitter emitter = new NoopServiceEmitter();
EmittingLogger.registerEmitter(emitter);
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
taskConfig,
null,
EasyMock.createMock(TaskActionClientFactory.class),
emitter,
new NoopDataSegmentPusher(),
new NoopDataSegmentKiller(),
new NoopDataSegmentMover(),
new NoopDataSegmentArchiver(),
new NoopDataSegmentAnnouncer(),
null,
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
new SegmentCacheManagerFactory(utils.getTestObjectMapper()),
utils.getTestObjectMapper(),
utils.getTestIndexIO(),
null,
null,
null,
utils.getTestIndexMergerV9(),
null,
node,
null,
null,
new SingleFileTaskReportFileWriter(new File("fake")),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
utils.getRowIngestionMetersFactory(),
new TestAppenderatorsManager(),
new NoopIndexingServiceClient(),
null,
null,
null
);
runner = new SingleTaskBackgroundRunner(
toolboxFactory,
taskConfig,
emitter,
node,
new ServerConfig()
);
}
@After
public void teardown()
{
runner.stop();
}
@Test
public void testRun() throws ExecutionException, InterruptedException
{
Assert.assertEquals(
TaskState.SUCCESS,
runner.run(new NoopTask(null, null, null, 500L, 0, null, null, null)).get().getStatusCode()
);
}
@Test
public void testGetQueryRunner() throws ExecutionException, InterruptedException
{
runner.run(new NoopTask(null, null, "foo", 500L, 0, null, null, null)).get().getStatusCode();
final QueryRunner<ScanResultValue> queryRunner =
Druids.newScanQueryBuilder()
.dataSource("foo")
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
.build()
.getRunner(runner);
Assert.assertThat(queryRunner, CoreMatchers.instanceOf(SetAndVerifyContextQueryRunner.class));
}
@Test
public void testStop() throws ExecutionException, InterruptedException, TimeoutException
{
final ListenableFuture<TaskStatus> future = runner.run(
new NoopTask(null, null, null, Long.MAX_VALUE, 0, null, null, null) // infinite task
);
runner.stop();
Assert.assertEquals(
TaskState.FAILED,
future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
);
}
@Test
public void testStopWithRestorableTask() throws InterruptedException, ExecutionException, TimeoutException
{
final BooleanHolder holder = new BooleanHolder();
final ListenableFuture<TaskStatus> future = runner.run(
new RestorableTask(holder)
);
runner.stop();
Assert.assertEquals(
TaskState.SUCCESS,
future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
);
Assert.assertTrue(holder.get());
}
@Test
public void testStopRestorableTaskExceptionAfterStop()
{
// statusChanged callback can be called by multiple threads.
AtomicReference<TaskStatus> statusHolder = new AtomicReference<>();
runner.registerListener(
new TaskRunnerListener()
{
@Override
public String getListenerId()
{
return "testStopRestorableTaskExceptionAfterStop";
}
@Override
public void locationChanged(String taskId, TaskLocation newLocation)
{
// do nothing
}
@Override
public void statusChanged(String taskId, TaskStatus status)
{
statusHolder.set(status);
}
},
Execs.directExecutor()
);
runner.run(
new RestorableTask(new BooleanHolder())
{
@Override
public TaskStatus run(TaskToolbox toolbox)
{
throw new Error("task failure test");
}
}
);
runner.stop();
Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
Assert.assertEquals(
"Failed to stop gracefully with exception. See task logs for more details.",
statusHolder.get().getErrorMsg()
);
}
@Test
public void testStopNonRestorableTask() throws InterruptedException
{
// latch to wait for SingleTaskBackgroundRunnerCallable to be executed before stopping the task
// We need this latch because TaskRunnerListener is currently racy.
// See https://github.com/apache/druid/issues/11445 for more details.
CountDownLatch runLatch = new CountDownLatch(1);
// statusChanged callback can be called by multiple threads.
AtomicReference<TaskStatus> statusHolder = new AtomicReference<>();
runner.registerListener(
new TaskRunnerListener()
{
@Override
public String getListenerId()
{
return "testStopNonRestorableTask";
}
@Override
public void locationChanged(String taskId, TaskLocation newLocation)
{
// do nothing
}
@Override
public void statusChanged(String taskId, TaskStatus status)
{
if (status.getStatusCode() == TaskState.RUNNING) {
runLatch.countDown();
} else {
statusHolder.set(status);
}
}
},
Execs.directExecutor()
);
runner.run(
new NoopTask(
null,
null,
"datasource",
10000, // 10 sec
0,
null,
null,
null
)
);
Assert.assertTrue(runLatch.await(1, TimeUnit.SECONDS));
runner.stop();
Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
Assert.assertEquals(
"Canceled as task execution process stopped",
statusHolder.get().getErrorMsg()
);
}
private static class RestorableTask extends AbstractTask
{
private final BooleanHolder gracefullyStopped;
RestorableTask(BooleanHolder gracefullyStopped)
{
super("testId", "testDataSource", Collections.emptyMap());
this.gracefullyStopped = gracefullyStopped;
}
@Override
public String getType()
{
return "restorable";
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox)
{
return TaskStatus.success(getId());
}
@Override
public boolean canRestore()
{
return true;
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
gracefullyStopped.set();
}
}
private static class BooleanHolder
{
private boolean value;
void set()
{
this.value = true;
}
boolean get()
{
return value;
}
}
}