blob: 6402721e73c65722b5b297def3a0223d6acef47a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.hadoop.DatasourceIngestionSpec;
import org.apache.druid.indexer.hadoop.WindowedDataSegment;
import org.apache.druid.indexer.path.DatasourcePathSpec;
import org.apache.druid.indexer.path.MultiplePathSpec;
import org.apache.druid.indexer.path.PathSpec;
import org.apache.druid.indexer.path.StaticPathSpec;
import org.apache.druid.indexer.path.UsedSegmentsRetriever;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
*/
public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
{
private static final String TEST_DATA_SOURCE = "test";
private static final String TEST_DATA_SOURCE2 = "test2";
private static final Interval TEST_DATA_SOURCE_INTERVAL = Intervals.of("1970/3000");
private static final Interval TEST_DATA_SOURCE_INTERVAL2 = Intervals.of("2000/2001");
private static final Interval TEST_DATA_SOURCE_INTERVAL_PARTIAL = Intervals.of("2050/3000");
private final ObjectMapper jsonMapper;
public HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ObjectMapper.class, jsonMapper)
.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)
);
}
private static final DataSegment SEGMENT = new DataSegment(
TEST_DATA_SOURCE,
Intervals.of("2000/3000"),
"ver",
ImmutableMap.of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
NoneShardSpec.instance(),
9,
2
);
private static final DataSegment SEGMENT2 = new DataSegment(
TEST_DATA_SOURCE2,
Intervals.of("2000/3000"),
"ver2",
ImmutableMap.of(
"type", "local",
"path", "/tmp/index2.zip"
),
ImmutableList.of("host2"),
ImmutableList.of("visited_sum", "unique_hosts"),
NoneShardSpec.instance(),
9,
2
);
@Test
public void testUpdateSegmentListIfDatasourcePathSpecIsUsedWithNoDatasourcePathSpec() throws Exception
{
PathSpec pathSpec = new StaticPathSpec("/xyz", null);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec, null);
Assert.assertTrue(config.getPathSpec() instanceof StaticPathSpec);
}
@Test
public void testUpdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpec() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
null,
new DatasourceIngestionSpec(TEST_DATA_SOURCE, TEST_DATA_SOURCE_INTERVAL, null, null, null, null, null, false, null),
null,
false
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
TEST_DATA_SOURCE_INTERVAL
);
Assert.assertEquals(
ImmutableList.of(WindowedDataSegment.of(SEGMENT)),
((DatasourcePathSpec) config.getPathSpec()).getSegments()
);
}
@Test
public void testUpdateSegmentListIfDatasourcePathSpecWithMatchingUserSegments() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
null,
new DatasourceIngestionSpec(
TEST_DATA_SOURCE,
TEST_DATA_SOURCE_INTERVAL,
null,
ImmutableList.of(SEGMENT),
null,
null,
null,
false,
null
),
null,
false
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
TEST_DATA_SOURCE_INTERVAL
);
Assert.assertEquals(
ImmutableList.of(WindowedDataSegment.of(SEGMENT)),
((DatasourcePathSpec) config.getPathSpec()).getSegments()
);
}
@Test(expected = IOException.class)
public void testUpdateSegmentListThrowsExceptionWithUserSegmentsMismatch() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
null,
new DatasourceIngestionSpec(
TEST_DATA_SOURCE,
TEST_DATA_SOURCE_INTERVAL,
null,
ImmutableList.of(SEGMENT.withVersion("v2")),
null,
null,
null,
false,
null
),
null,
false
);
testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
TEST_DATA_SOURCE_INTERVAL
);
}
@Test
public void testUpdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpecAndPartialInterval()
throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
null,
new DatasourceIngestionSpec(
TEST_DATA_SOURCE,
TEST_DATA_SOURCE_INTERVAL_PARTIAL,
null,
null,
null,
null,
null,
false,
null
),
null,
false
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
TEST_DATA_SOURCE_INTERVAL_PARTIAL
);
Assert.assertEquals(
ImmutableList.of(new WindowedDataSegment(SEGMENT, TEST_DATA_SOURCE_INTERVAL_PARTIAL)),
((DatasourcePathSpec) config.getPathSpec()).getSegments()
);
}
@Test
public void testUpdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec() throws Exception
{
PathSpec pathSpec = new MultiplePathSpec(
ImmutableList.of(
new StaticPathSpec("/xyz", null),
new DatasourcePathSpec(
null,
new DatasourceIngestionSpec(
TEST_DATA_SOURCE,
TEST_DATA_SOURCE_INTERVAL,
null,
null,
null,
null,
null,
false,
null
),
null,
false
),
new DatasourcePathSpec(
null,
new DatasourceIngestionSpec(
TEST_DATA_SOURCE2,
TEST_DATA_SOURCE_INTERVAL2,
null,
null,
null,
null,
null,
false,
null
),
null,
false
)
)
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
TEST_DATA_SOURCE_INTERVAL
);
Assert.assertEquals(
ImmutableList.of(WindowedDataSegment.of(SEGMENT)),
((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(1)).getSegments()
);
Assert.assertEquals(
ImmutableList.of(new WindowedDataSegment(SEGMENT2, TEST_DATA_SOURCE_INTERVAL2)),
((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(2)).getSegments()
);
}
private HadoopDruidIndexerConfig testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
PathSpec datasourcePathSpec,
@Nullable Interval jobInterval
)
throws Exception
{
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
jsonMapper
),
new HadoopIOConfig(
jsonMapper.convertValue(datasourcePathSpec, Map.class),
null,
null
),
null
);
spec = jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
HadoopIngestionSpec.class
);
UsedSegmentsRetriever segmentsRetriever = EasyMock.createMock(UsedSegmentsRetriever.class);
EasyMock
.expect(
segmentsRetriever.retrieveUsedSegmentsForIntervals(
TEST_DATA_SOURCE,
Collections.singletonList(jobInterval != null ? jobInterval.overlap(TEST_DATA_SOURCE_INTERVAL) : null),
Segments.ONLY_VISIBLE
)
)
.andReturn(ImmutableList.of(SEGMENT));
EasyMock
.expect(
segmentsRetriever.retrieveUsedSegmentsForIntervals(
TEST_DATA_SOURCE2,
Collections.singletonList(jobInterval != null ? jobInterval.overlap(TEST_DATA_SOURCE_INTERVAL2) : null),
Segments.ONLY_VISIBLE
)
)
.andReturn(ImmutableList.of(SEGMENT2));
EasyMock.replay(segmentsRetriever);
HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(spec, jsonMapper, segmentsRetriever);
return HadoopDruidIndexerConfig.fromString(jsonMapper.writeValueAsString(spec));
}
}