blob: 9694c2f4629eebc4a5ed955cf216ffa190ceb58f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.materializedview;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.HadoopIOConfig;
import org.apache.druid.indexer.HadoopIngestionSpec;
import org.apache.druid.indexer.HadoopTuningConfig;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.HadoopIndexTask;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
public class MaterializedViewSupervisorTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
private TaskStorage taskStorage;
private TaskMaster taskMaster;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private MetadataSupervisorManager metadataSupervisorManager;
private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private TaskQueue taskQueue;
private MaterializedViewSupervisor supervisor;
private String derivativeDatasourceName;
private final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
@Before
public void setUp()
{
TestDerbyConnector derbyConnector = derbyConnectorRule.getConnector();
derbyConnector.createDataSourceTable();
derbyConnector.createSegmentTable();
taskStorage = EasyMock.createMock(TaskStorage.class);
taskMaster = EasyMock.createMock(TaskMaster.class);
indexerMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector
);
metadataSupervisorManager = EasyMock.createMock(MetadataSupervisorManager.class);
sqlSegmentsMetadataManager = EasyMock.createMock(SqlSegmentsMetadataManager.class);
taskQueue = EasyMock.createMock(TaskQueue.class);
taskQueue.start();
objectMapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
"base",
new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim")), null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
false,
objectMapper,
taskMaster,
taskStorage,
metadataSupervisorManager,
sqlSegmentsMetadataManager,
indexerMetadataStorageCoordinator,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
EasyMock.createMock(ChatHandlerProvider.class),
new SupervisorStateManagerConfig()
);
derivativeDatasourceName = spec.getDataSourceName();
supervisor = (MaterializedViewSupervisor) spec.createSupervisor();
}
@Test
public void testCheckSegments() throws IOException
{
Set<DataSegment> baseSegments = Sets.newHashSet(
new DataSegment(
"base",
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"2015-01-02",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
),
new DataSegment(
"base",
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
),
new DataSegment(
"base",
Intervals.of("2015-01-03T00Z/2015-01-04T00Z"),
"2015-01-04",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
);
Set<DataSegment> derivativeSegments = Sets.newHashSet(
new DataSegment(
derivativeDatasourceName,
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"2015-01-02",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
),
new DataSegment(
derivativeDatasourceName,
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"3015-01-01",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
);
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
indexerMetadataStorageCoordinator.announceHistoricalSegments(derivativeSegments);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> toBuildInterval = supervisor.checkSegments();
Set<Interval> expectedToBuildInterval = Sets.newHashSet(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"));
Map<Interval, List<DataSegment>> expectedSegments = new HashMap<>();
expectedSegments.put(
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
Collections.singletonList(
new DataSegment(
"base",
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"2015-01-02",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
)
);
expectedSegments.put(
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
Collections.singletonList(
new DataSegment(
"base",
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
)
);
Assert.assertEquals(expectedToBuildInterval, toBuildInterval.lhs.keySet());
Assert.assertEquals(expectedSegments, toBuildInterval.rhs);
}
@Test
public void testCheckSegmentsAndSubmitTasks() throws IOException
{
Set<DataSegment> baseSegments = Sets.newHashSet(
new DataSegment(
"base",
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
);
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskStorage.getStatus("test_task1"))
.andReturn(Optional.of(TaskStatus.failure("test_task1")))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("test_task2"))
.andReturn(Optional.of(TaskStatus.running("test_task2")))
.anyTimes();
EasyMock.replay(taskStorage);
Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> runningTasksPair = supervisor.getRunningTasks();
Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs;
Map<Interval, String> runningVersion = runningTasksPair.rhs;
DataSchema dataSchema = new DataSchema(
"test_datasource",
null,
null,
null,
TransformSpec.NONE,
objectMapper
);
HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null);
HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null);
HadoopIndexTask task1 = new HadoopIndexTask(
"test_task1",
spec,
null,
null,
null,
objectMapper,
null,
null,
null
);
runningTasks.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), task1);
runningVersion.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), "test_version1");
HadoopIndexTask task2 = new HadoopIndexTask(
"test_task2",
spec,
null,
null,
null,
objectMapper,
null,
null,
null
);
runningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2);
runningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2");
supervisor.checkSegmentsAndSubmitTasks();
Map<Interval, HadoopIndexTask> expectedRunningTasks = new HashMap<>();
Map<Interval, String> expectedRunningVersion = new HashMap<>();
expectedRunningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2);
expectedRunningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2");
Assert.assertEquals(expectedRunningTasks, runningTasks);
Assert.assertEquals(expectedRunningVersion, runningVersion);
}
@Test
public void testSuspendedDoesntRun()
{
MaterializedViewSupervisorSpec suspended = new MaterializedViewSupervisorSpec(
"base",
new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim")), null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
true,
objectMapper,
taskMaster,
taskStorage,
metadataSupervisorManager,
sqlSegmentsMetadataManager,
indexerMetadataStorageCoordinator,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
EasyMock.createMock(ChatHandlerProvider.class),
new SupervisorStateManagerConfig()
);
MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) suspended.createSupervisor();
// mock IndexerSQLMetadataStorageCoordinator to ensure that retrieveDataSourceMetadata is not called
// which will be true if truly suspended, since this is the first operation of the 'run' method otherwise
IndexerSQLMetadataStorageCoordinator mock = EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
EasyMock.expect(mock.retrieveDataSourceMetadata(suspended.getDataSourceName()))
.andAnswer(() -> {
Assert.fail();
return null;
})
.anyTimes();
EasyMock.replay(mock);
supervisor.run();
}
}