blob: 365fb1751eacfb08b03ce20f162493578b73777d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.materializedview;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.HadoopTuningConfig;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.concurrent.Callable;
public class MaterializedViewSupervisorSpecTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
@Before
public void setup()
{
objectMapper.registerSubtypes(new NamedType(MaterializedViewSupervisorSpec.class, "derivativeDataSource"));
objectMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(TaskMaster.class, null)
.addValue(TaskStorage.class, null)
.addValue(ExprMacroTable.class.getName(), LookupEnabledTestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class, objectMapper)
.addValue(MetadataSupervisorManager.class, null)
.addValue(SqlSegmentsMetadataManager.class, null)
.addValue(IndexerMetadataStorageCoordinator.class, null)
.addValue(MaterializedViewTaskConfig.class, new MaterializedViewTaskConfig())
.addValue(AuthorizerMapper.class, EasyMock.createMock(AuthorizerMapper.class))
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
.addValue(SupervisorStateManagerConfig.class, new SupervisorStateManagerConfig())
);
}
@Test
public void testSupervisorSerialization() throws IOException
{
String supervisorStr = "{\n" +
" \"type\" : \"derivativeDataSource\",\n" +
" \"baseDataSource\": \"wikiticker\",\n" +
" \"dimensionsSpec\":{\n" +
" \"dimensions\" : [\n" +
" \"isUnpatrolled\",\n" +
" \"metroCode\",\n" +
" \"namespace\",\n" +
" \"page\",\n" +
" \"regionIsoCode\",\n" +
" \"regionName\",\n" +
" \"user\"\n" +
" ]\n" +
" },\n" +
" \"metricsSpec\" : [\n" +
" {\n" +
" \"name\" : \"count\",\n" +
" \"type\" : \"count\"\n" +
" },\n" +
" {\n" +
" \"name\" : \"added\",\n" +
" \"type\" : \"longSum\",\n" +
" \"fieldName\" : \"added\"\n" +
" }\n" +
" ],\n" +
" \"tuningConfig\": {\n" +
" \"type\" : \"hadoop\"\n" +
" }\n" +
"}";
MaterializedViewSupervisorSpec expected = new MaterializedViewSupervisorSpec(
"wikiticker",
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
)
),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("added", "added")
},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
false,
objectMapper,
null,
null,
null,
null,
null,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
new NoopChatHandlerProvider(),
new SupervisorStateManagerConfig()
);
MaterializedViewSupervisorSpec spec = objectMapper.readValue(supervisorStr, MaterializedViewSupervisorSpec.class);
Assert.assertEquals(expected.getBaseDataSource(), spec.getBaseDataSource());
Assert.assertEquals(expected.getId(), spec.getId());
Assert.assertEquals(expected.getDataSourceName(), spec.getDataSourceName());
Assert.assertEquals(expected.getDimensions(), spec.getDimensions());
Assert.assertEquals(expected.getMetrics(), spec.getMetrics());
}
@Test
public void testMaterializedViewSupervisorSpecCreated()
{
Exception ex = null;
try {
MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
"wikiticker",
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
)
),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("added", "added")
},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
false,
objectMapper,
null,
null,
null,
null,
null,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
new NoopChatHandlerProvider(),
new SupervisorStateManagerConfig()
);
Supervisor supervisor = spec.createSupervisor();
Assert.assertTrue(supervisor instanceof MaterializedViewSupervisor);
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
Assert.assertNull(autoscaler);
Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.computeLagStats());
Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount());
Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes());
Callable<Integer> noop = new Callable<Integer>() {
@Override
public Integer call()
{
return -1;
}
};
}
catch (Exception e) {
ex = e;
}
Assert.assertNull(ex);
}
@Test
public void testSuspendResuume() throws IOException
{
String supervisorStr = "{\n" +
" \"type\" : \"derivativeDataSource\",\n" +
" \"baseDataSource\": \"wikiticker\",\n" +
" \"dimensionsSpec\":{\n" +
" \"dimensions\" : [\n" +
" \"isUnpatrolled\",\n" +
" \"metroCode\",\n" +
" \"namespace\",\n" +
" \"page\",\n" +
" \"regionIsoCode\",\n" +
" \"regionName\",\n" +
" \"user\"\n" +
" ]\n" +
" },\n" +
" \"metricsSpec\" : [\n" +
" {\n" +
" \"name\" : \"count\",\n" +
" \"type\" : \"count\"\n" +
" },\n" +
" {\n" +
" \"name\" : \"added\",\n" +
" \"type\" : \"longSum\",\n" +
" \"fieldName\" : \"added\"\n" +
" }\n" +
" ],\n" +
" \"tuningConfig\": {\n" +
" \"type\" : \"hadoop\"\n" +
" }\n" +
"}";
MaterializedViewSupervisorSpec spec = objectMapper.readValue(supervisorStr, MaterializedViewSupervisorSpec.class);
Assert.assertFalse(spec.isSuspended());
String suspendedSerialized = objectMapper.writeValueAsString(spec.createSuspendedSpec());
MaterializedViewSupervisorSpec suspendedSpec = objectMapper.readValue(
suspendedSerialized,
MaterializedViewSupervisorSpec.class
);
Assert.assertTrue(suspendedSpec.isSuspended());
String runningSerialized = objectMapper.writeValueAsString(spec.createRunningSpec());
MaterializedViewSupervisorSpec runningSpec = objectMapper.readValue(
runningSerialized,
MaterializedViewSupervisorSpec.class
);
Assert.assertFalse(runningSpec.isSuspended());
}
@Test
public void testEmptyBaseDataSource()
{
expectedException.expect(CoreMatchers.instanceOf(IllegalArgumentException.class));
expectedException.expectMessage(
"baseDataSource cannot be null or empty. Please provide a baseDataSource."
);
//noinspection ResultOfObjectAllocationIgnored (this method call will trigger the expected exception)
new MaterializedViewSupervisorSpec(
"",
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
)
),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("added", "added")
},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
false,
objectMapper,
null,
null,
null,
null,
null,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
new NoopChatHandlerProvider(),
new SupervisorStateManagerConfig()
);
}
@Test
public void testNullBaseDataSource()
{
expectedException.expect(CoreMatchers.instanceOf(IllegalArgumentException.class));
expectedException.expectMessage(
"baseDataSource cannot be null or empty. Please provide a baseDataSource."
);
//noinspection ResultOfObjectAllocationIgnored (this method call will trigger the expected exception)
new MaterializedViewSupervisorSpec(
null,
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
)
),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("added", "added")
},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
false,
objectMapper,
null,
null,
null,
null,
null,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
new NoopChatHandlerProvider(),
new SupervisorStateManagerConfig()
);
}
}