blob: ab9b0d72b69542dd2501b363fd49ec2b752424e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
public abstract class IngestionTestBase extends InitializedNullHandlingTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final TestUtils testUtils = new TestUtils();
private final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
private SegmentCacheManagerFactory segmentCacheManagerFactory;
private TaskStorage taskStorage;
private IndexerSQLMetadataStorageCoordinator storageCoordinator;
private SegmentsMetadataManager segmentsMetadataManager;
private TaskLockbox lockbox;
@Before
public void setUpIngestionTestBase() throws IOException
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
temporaryFolder.create();
final SQLMetadataConnector connector = derbyConnectorRule.getConnector();
connector.createTaskTables();
connector.createSegmentTable();
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector()
);
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
SegmentsMetadataManagerConfig::new,
derbyConnectorRule.metadataTablesConfigSupplier(),
derbyConnectorRule.getConnector()
);
lockbox = new TaskLockbox(taskStorage, storageCoordinator);
segmentCacheManagerFactory = new SegmentCacheManagerFactory(getObjectMapper());
}
@After
public void tearDownIngestionTestBase()
{
temporaryFolder.delete();
}
public TestLocalTaskActionClientFactory createActionClientFactory()
{
return new TestLocalTaskActionClientFactory();
}
public TestLocalTaskActionClient createActionClient(Task task)
{
return new TestLocalTaskActionClient(task);
}
public void prepareTaskForLocking(Task task) throws EntryExistsException
{
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
}
public void shutdownTask(Task task)
{
lockbox.remove(task);
}
public SegmentCacheManager newSegmentLoader(File storageDir)
{
return segmentCacheManagerFactory.manufacturate(storageDir);
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
}
public TaskStorage getTaskStorage()
{
return taskStorage;
}
public SegmentCacheManagerFactory getSegmentCacheManagerFactory()
{
return segmentCacheManagerFactory;
}
public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator()
{
return storageCoordinator;
}
public SegmentsMetadataManager getSegmentsMetadataManager()
{
return segmentsMetadataManager;
}
public TaskLockbox getLockbox()
{
return lockbox;
}
public IndexerSQLMetadataStorageCoordinator getStorageCoordinator()
{
return storageCoordinator;
}
public RowIngestionMetersFactory getRowIngestionMetersFactory()
{
return testUtils.getRowIngestionMetersFactory();
}
public TaskActionToolbox createTaskActionToolbox()
{
storageCoordinator.start();
return new TaskActionToolbox(
lockbox,
taskStorage,
storageCoordinator,
new NoopServiceEmitter(),
null
);
}
public IndexIO getIndexIO()
{
return testUtils.getTestIndexIO();
}
public IndexMergerV9 getIndexMerger()
{
return testUtils.getTestIndexMergerV9();
}
public class TestLocalTaskActionClientFactory implements TaskActionClientFactory
{
@Override
public TaskActionClient create(Task task)
{
return new TestLocalTaskActionClient(task);
}
}
public class TestLocalTaskActionClient extends CountingLocalTaskActionClientForTest
{
private final Set<DataSegment> publishedSegments = new HashSet<>();
private TestLocalTaskActionClient(Task task)
{
super(task, taskStorage, createTaskActionToolbox());
}
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
final RetType result = super.submit(taskAction);
if (taskAction instanceof SegmentTransactionalInsertAction) {
publishedSegments.addAll(((SegmentTransactionalInsertAction) taskAction).getSegments());
} else if (taskAction instanceof SegmentInsertAction) {
publishedSegments.addAll(((SegmentInsertAction) taskAction).getSegments());
}
return result;
}
public Set<DataSegment> getPublishedSegments()
{
return publishedSegments;
}
}
public class TestTaskRunner implements TaskRunner
{
private TestLocalTaskActionClient taskActionClient;
private File taskReportsFile;
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
throw new UnsupportedOperationException();
}
@Override
public void start()
{
throw new UnsupportedOperationException();
}
@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
throw new UnsupportedOperationException();
}
@Override
public void unregisterListener(String listenerId)
{
throw new UnsupportedOperationException();
}
public TestLocalTaskActionClient getTaskActionClient()
{
return taskActionClient;
}
public File getTaskReportsFile()
{
return taskReportsFile;
}
public List<DataSegment> getPublishedSegments()
{
final List<DataSegment> segments = new ArrayList<>(taskActionClient.getPublishedSegments());
Collections.sort(segments);
return segments;
}
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
try {
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
taskActionClient = createActionClient(task);
taskReportsFile = temporaryFolder.newFile(
StringUtils.format("ingestionTestBase-%s.json", System.currentTimeMillis())
);
final TaskToolbox box = new TaskToolbox(
new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
taskActionClient,
null,
new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig()),
new NoopDataSegmentKiller(),
null,
null,
null,
null,
null,
null,
null,
NoopJoinableFactory.INSTANCE,
null,
null,
objectMapper,
temporaryFolder.newFolder(),
getIndexIO(),
null,
null,
null,
getIndexMerger(),
null,
null,
null,
null,
new SingleFileTaskReportFileWriter(taskReportsFile),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
testUtils.getRowIngestionMetersFactory(),
new TestAppenderatorsManager(),
new NoopIndexingServiceClient(),
null,
null,
null
);
if (task.isReady(box.getTaskActionClient())) {
return Futures.immediateFuture(task.run(box));
} else {
throw new ISE("task is not ready");
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
lockbox.remove(task);
}
}
@Override
public void shutdown(String taskid, String reason)
{
throw new UnsupportedOperationException();
}
@Override
public void stop()
{
throw new UnsupportedOperationException();
}
@Override
public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
{
throw new UnsupportedOperationException();
}
@Override
public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
{
throw new UnsupportedOperationException();
}
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
throw new UnsupportedOperationException();
}
@Override
public Optional<ScalingStats> getScalingStats()
{
throw new UnsupportedOperationException();
}
@Override
public long getTotalTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public long getIdleTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public long getUsedTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public long getLazyTaskSlotCount()
{
throw new UnsupportedOperationException();
}
@Override
public long getBlacklistedTaskSlotCount()
{
throw new UnsupportedOperationException();
}
}
}