| /* |
| * Licensed to Metamarkets Group Inc. (Metamarkets) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. Metamarkets licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package io.druid.indexing.common.task; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.google.common.io.Files; |
| import io.druid.data.input.impl.CSVParseSpec; |
| import io.druid.data.input.impl.DimensionsSpec; |
| import io.druid.data.input.impl.ParseSpec; |
| import io.druid.data.input.impl.SpatialDimensionSchema; |
| import io.druid.data.input.impl.StringInputRowParser; |
| import io.druid.data.input.impl.TimestampSpec; |
| import io.druid.indexing.common.TaskLock; |
| import io.druid.indexing.common.TaskLockType; |
| import io.druid.indexing.common.TaskToolbox; |
| import io.druid.indexing.common.TestUtils; |
| import io.druid.indexing.common.actions.LockAcquireAction; |
| import io.druid.indexing.common.actions.LockListAction; |
| import io.druid.indexing.common.actions.LockTryAcquireAction; |
| import io.druid.indexing.common.actions.SegmentAllocateAction; |
| import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; |
| import io.druid.indexing.common.actions.TaskAction; |
| import io.druid.indexing.common.actions.TaskActionClient; |
| import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; |
| import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; |
| import io.druid.indexing.overlord.SegmentPublishResult; |
| import io.druid.java.util.common.DateTimes; |
| import io.druid.java.util.common.Intervals; |
| import io.druid.java.util.common.StringUtils; |
| import io.druid.java.util.common.granularity.Granularities; |
| import io.druid.java.util.common.parsers.ParseException; |
| import io.druid.math.expr.ExprMacroTable; |
| import io.druid.query.aggregation.AggregatorFactory; |
| import io.druid.query.aggregation.LongSumAggregatorFactory; |
| import io.druid.query.filter.SelectorDimFilter; |
| import io.druid.segment.IndexIO; |
| import io.druid.segment.IndexMergerV9; |
| import io.druid.segment.IndexSpec; |
| import io.druid.segment.indexing.DataSchema; |
| import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; |
| import io.druid.segment.indexing.granularity.GranularitySpec; |
| import io.druid.segment.indexing.granularity.UniformGranularitySpec; |
| import io.druid.segment.loading.DataSegmentPusher; |
| import io.druid.segment.realtime.appenderator.SegmentIdentifier; |
| import io.druid.segment.realtime.firehose.LocalFirehoseFactory; |
| import io.druid.segment.transform.ExpressionTransform; |
| import io.druid.segment.transform.TransformSpec; |
| import io.druid.timeline.DataSegment; |
| import io.druid.timeline.partition.HashBasedNumberedShardSpec; |
| import io.druid.timeline.partition.NoneShardSpec; |
| import io.druid.timeline.partition.NumberedShardSpec; |
| import io.druid.timeline.partition.ShardSpec; |
| import org.joda.time.Interval; |
| import org.junit.Assert; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.rules.TemporaryFolder; |
| |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| public class IndexTaskTest |
| { |
| @Rule |
| public TemporaryFolder temporaryFolder = new TemporaryFolder(); |
| |
| @Rule |
| public ExpectedException expectedException = ExpectedException.none(); |
| |
| private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( |
| new TimestampSpec( |
| "ts", |
| "auto", |
| null |
| ), |
| new DimensionsSpec( |
| DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), |
| Lists.newArrayList(), |
| Lists.newArrayList() |
| ), |
| null, |
| Arrays.asList("ts", "dim", "val"), |
| false, |
| 0 |
| ); |
| |
| private static final IndexSpec indexSpec = new IndexSpec(); |
| private final ObjectMapper jsonMapper; |
| private IndexMergerV9 indexMergerV9; |
| private IndexIO indexIO; |
| private volatile int segmentAllocatePartitionCounter; |
| |
| public IndexTaskTest() |
| { |
| TestUtils testUtils = new TestUtils(); |
| jsonMapper = testUtils.getTestObjectMapper(); |
| indexMergerV9 = testUtils.getTestIndexMergerV9(); |
| indexIO = testUtils.getTestIndexIO(); |
| } |
| |
| @Test |
| public void testDeterminePartitions() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| null, |
| createTuningConfig(2, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(2, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getPartitions()); |
| |
| Assert.assertEquals("test", segments.get(1).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); |
| Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); |
| Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getPartitions()); |
| } |
| |
| @Test |
| public void testForceExtendableShardSpecs() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| null, |
| createTuningConfig(2, null, true, false), |
| false |
| ), |
| null |
| ); |
| |
| Assert.assertEquals(indexTask.getId(), indexTask.getGroupId()); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(2, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| |
| Assert.assertEquals("test", segments.get(1).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); |
| Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); |
| } |
| |
| @Test |
| public void testTransformSpec() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| new TransformSpec( |
| new SelectorDimFilter("dim", "b", null), |
| ImmutableList.of( |
| new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()) |
| ) |
| ), |
| null, |
| createTuningConfig(2, null, true, false), |
| false |
| ), |
| null |
| ); |
| |
| Assert.assertEquals(indexTask.getId(), indexTask.getGroupId()); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(1, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| } |
| |
| @Test |
| public void testWithArbitraryGranularity() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| new ArbitraryGranularitySpec( |
| Granularities.MINUTE, |
| Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) |
| ), |
| createTuningConfig(10, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(1, segments.size()); |
| } |
| |
| @Test |
| public void testIntervalBucketing() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T07:59:59.977Z,a,1\n"); |
| writer.write("2014-01-01T08:00:00.000Z,b,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| new UniformGranularitySpec( |
| Granularities.HOUR, |
| Granularities.HOUR, |
| Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z")) |
| ), |
| createTuningConfig(50, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(1, segments.size()); |
| } |
| |
| @Test |
| public void testNumShardsProvided() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| null, |
| createTuningConfig(null, 1, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(1, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class)); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| } |
| |
| @Test |
| public void testAppendToExisting() throws Exception |
| { |
| segmentAllocatePartitionCounter = 0; |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| null, |
| createTuningConfig(2, null, false, false), |
| true |
| ), |
| null |
| ); |
| |
| Assert.assertEquals("index_append_test", indexTask.getGroupId()); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(2, segmentAllocatePartitionCounter); |
| Assert.assertEquals(2, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class)); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| |
| Assert.assertEquals("test", segments.get(1).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); |
| Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class)); |
| Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); |
| } |
| |
| @Test |
| public void testIntervalNotSpecified() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| new UniformGranularitySpec( |
| Granularities.HOUR, |
| Granularities.MINUTE, |
| null |
| ), |
| createTuningConfig(2, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(3, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014-01-01T00/PT1H"), segments.get(0).getInterval()); |
| Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class)); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| |
| Assert.assertEquals("test", segments.get(1).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014-01-01T01/PT1H"), segments.get(1).getInterval()); |
| Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NoneShardSpec.class)); |
| Assert.assertEquals(0, segments.get(1).getShardSpec().getPartitionNum()); |
| |
| Assert.assertEquals("test", segments.get(2).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014-01-01T02/PT1H"), segments.get(2).getInterval()); |
| Assert.assertTrue(segments.get(2).getShardSpec().getClass().equals(NoneShardSpec.class)); |
| Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum()); |
| } |
| |
| @Test |
| public void testCSVFileWithHeader() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,d,val\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| new CSVParseSpec( |
| new TimestampSpec( |
| "time", |
| "auto", |
| null |
| ), |
| new DimensionsSpec( |
| null, |
| Lists.<String>newArrayList(), |
| Lists.<SpatialDimensionSchema>newArrayList() |
| ), |
| null, |
| null, |
| true, |
| 0 |
| ), |
| null, |
| createTuningConfig(2, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(1, segments.size()); |
| |
| Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); |
| Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| } |
| |
| @Test |
| public void testCSVFileWithHeaderColumnOverride() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,d,val\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| new CSVParseSpec( |
| new TimestampSpec( |
| "time", |
| "auto", |
| null |
| ), |
| new DimensionsSpec( |
| null, |
| Lists.<String>newArrayList(), |
| Lists.<SpatialDimensionSchema>newArrayList() |
| ), |
| null, |
| Arrays.asList("time", "dim", "val"), |
| true, |
| 0 |
| ), |
| null, |
| createTuningConfig(2, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(1, segments.size()); |
| |
| Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); |
| Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| } |
| |
| @Test |
| public void testWithSmallMaxTotalRows() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T00:00:10Z,b,2\n"); |
| writer.write("2014-01-01T00:00:10Z,c,3\n"); |
| writer.write("2014-01-01T01:00:20Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,2\n"); |
| writer.write("2014-01-01T01:00:20Z,c,3\n"); |
| writer.write("2014-01-01T02:00:30Z,a,1\n"); |
| writer.write("2014-01-01T02:00:30Z,b,2\n"); |
| writer.write("2014-01-01T02:00:30Z,c,3\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| new UniformGranularitySpec( |
| Granularities.HOUR, |
| Granularities.MINUTE, |
| null |
| ), |
| createTuningConfig(2, 2, 2L, null, false, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(6, segments.size()); |
| |
| for (int i = 0; i < 6; i++) { |
| final DataSegment segment = segments.get(i); |
| final Interval expectedInterval = Intervals.of(StringUtils.format("2014-01-01T0%d/PT1H", (i / 2))); |
| final int expectedPartitionNum = i % 2; |
| |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(expectedInterval, segment.getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); |
| Assert.assertEquals(expectedPartitionNum, segment.getShardSpec().getPartitionNum()); |
| } |
| } |
| |
| @Test |
| public void testPerfectRollup() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| populateRollupTestData(tmpFile); |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.DAY, |
| true, |
| null |
| ), |
| createTuningConfig(3, 2, 2L, null, false, true, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(3, segments.size()); |
| |
| for (int i = 0; i < 3; i++) { |
| final DataSegment segment = segments.get(i); |
| final Interval expectedInterval = Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); |
| |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(expectedInterval, segment.getInterval()); |
| Assert.assertTrue(segment.getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class)); |
| Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); |
| } |
| } |
| |
| @Test |
| public void testBestEffortRollup() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| populateRollupTestData(tmpFile); |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createIngestionSpec( |
| tmpDir, |
| null, |
| new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.DAY, |
| true, |
| null |
| ), |
| createTuningConfig(3, 2, 2L, null, false, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(5, segments.size()); |
| |
| for (int i = 0; i < 5; i++) { |
| final DataSegment segment = segments.get(i); |
| final Interval expectedInterval = Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); |
| |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(expectedInterval, segment.getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); |
| Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); |
| } |
| } |
| |
| private static void populateRollupTestData(File tmpFile) throws IOException |
| { |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,a,1\n"); |
| writer.write("2014-01-01T00:00:10Z,b,2\n"); |
| writer.write("2014-01-01T00:00:10Z,c,3\n"); |
| writer.write("2014-01-01T01:00:20Z,b,2\n"); |
| writer.write("2014-01-01T02:00:30Z,a,1\n"); |
| writer.write("2014-01-01T02:00:30Z,b,2\n"); |
| writer.write("2014-01-01T01:00:20Z,c,3\n"); |
| writer.write("2014-01-01T02:00:30Z,c,3\n"); |
| } |
| } |
| |
| @Test |
| public void testIgnoreParseException() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,d,val\n"); |
| writer.write("unparseable,a,1\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| // GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in |
| // IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments() |
| final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( |
| tmpDir, |
| new CSVParseSpec( |
| new TimestampSpec( |
| "time", |
| "auto", |
| null |
| ), |
| new DimensionsSpec( |
| null, |
| Lists.<String>newArrayList(), |
| Lists.<SpatialDimensionSchema>newArrayList() |
| ), |
| null, |
| Arrays.asList("time", "dim", "val"), |
| true, |
| 0 |
| ), |
| null, |
| createTuningConfig(2, null, null, null, false, false, false), // ignore parse exception, |
| false |
| ); |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| parseExceptionIgnoreSpec, |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| |
| Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); |
| Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| } |
| |
| @Test |
| public void testReportParseException() throws Exception |
| { |
| expectedException.expect(ParseException.class); |
| expectedException.expectMessage("Unparseable timestamp found!"); |
| |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,d,val\n"); |
| writer.write("unparseable,a,1\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( |
| tmpDir, |
| new CSVParseSpec( |
| new TimestampSpec( |
| "time", |
| "auto", |
| null |
| ), |
| new DimensionsSpec( |
| null, |
| Lists.<String>newArrayList(), |
| Lists.<SpatialDimensionSchema>newArrayList() |
| ), |
| null, |
| Arrays.asList("time", "dim", "val"), |
| true, |
| 0 |
| ), |
| null, |
| createTuningConfig(2, null, null, null, false, false, true), // report parse exception |
| false |
| ); |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| parseExceptionIgnoreSpec, |
| null |
| ); |
| |
| runTask(indexTask); |
| } |
| |
| @Test |
| public void testCsvWithHeaderOfEmptyColumns() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,,\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,dim,\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,,val\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( |
| tmpDir, |
| new CSVParseSpec( |
| new TimestampSpec( |
| "time", |
| "auto", |
| null |
| ), |
| new DimensionsSpec( |
| null, |
| null, |
| null |
| ), |
| null, |
| null, |
| true, |
| 0 |
| ), |
| null, |
| createTuningConfig(2, 1, null, null, false, true, true), // report parse exception |
| false |
| ); |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| parseExceptionIgnoreSpec, |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask); |
| // the order of result segments can be changed because hash shardSpec is used. |
| // the below loop is to make this test deterministic. |
| Assert.assertEquals(2, segments.size()); |
| Assert.assertNotEquals(segments.get(0), segments.get(1)); |
| |
| for (DataSegment segment : segments) { |
| System.out.println(segment.getDimensions()); |
| } |
| |
| for (int i = 0; i < 2; i++) { |
| final DataSegment segment = segments.get(i); |
| final Set<String> dimensions = new HashSet<>(segment.getDimensions()); |
| |
| Assert.assertTrue( |
| StringUtils.format("Actual dimensions: %s", dimensions), |
| dimensions.equals(Sets.newHashSet("dim", "column_3")) || |
| dimensions.equals(Sets.newHashSet("column_2", "column_3")) |
| ); |
| |
| Assert.assertEquals(Arrays.asList("val"), segment.getMetrics()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval()); |
| } |
| } |
| |
| @Test |
| public void testCsvWithHeaderOfEmptyTimestamp() throws Exception |
| { |
| expectedException.expect(ParseException.class); |
| expectedException.expectMessage("Unparseable timestamp found!"); |
| |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write(",,\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( |
| tmpDir, |
| new CSVParseSpec( |
| new TimestampSpec( |
| "time", |
| "auto", |
| null |
| ), |
| new DimensionsSpec( |
| null, |
| Lists.<String>newArrayList(), |
| Lists.<SpatialDimensionSchema>newArrayList() |
| ), |
| null, |
| Arrays.asList("time", "", ""), |
| true, |
| 0 |
| ), |
| null, |
| createTuningConfig(2, null, null, null, false, false, true), // report parse exception |
| false |
| ); |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| parseExceptionIgnoreSpec, |
| null |
| ); |
| |
| runTask(indexTask); |
| } |
| |
| private List<DataSegment> runTask(IndexTask indexTask) throws Exception |
| { |
| final List<DataSegment> segments = Lists.newArrayList(); |
| |
| final TaskActionClient actionClient = new TaskActionClient() |
| { |
| @Override |
| public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException |
| { |
| if (taskAction instanceof LockListAction) { |
| return (RetType) Collections.singletonList( |
| new TaskLock( |
| TaskLockType.EXCLUSIVE, |
| "", |
| "", |
| Intervals.of("2014/P1Y"), DateTimes.nowUtc().toString(), |
| Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY |
| ) |
| ); |
| } |
| |
| if (taskAction instanceof LockAcquireAction) { |
| return (RetType) new TaskLock( |
| TaskLockType.EXCLUSIVE, "groupId", |
| "test", |
| ((LockAcquireAction) taskAction).getInterval(), |
| DateTimes.nowUtc().toString(), |
| Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY |
| ); |
| } |
| |
| if (taskAction instanceof LockTryAcquireAction) { |
| return (RetType) new TaskLock( |
| TaskLockType.EXCLUSIVE, |
| "groupId", |
| "test", |
| ((LockTryAcquireAction) taskAction).getInterval(), |
| DateTimes.nowUtc().toString(), |
| Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY |
| ); |
| } |
| |
| if (taskAction instanceof SegmentTransactionalInsertAction) { |
| return (RetType) new SegmentPublishResult( |
| ((SegmentTransactionalInsertAction) taskAction).getSegments(), |
| true |
| ); |
| } |
| |
| if (taskAction instanceof SegmentAllocateAction) { |
| SegmentAllocateAction action = (SegmentAllocateAction) taskAction; |
| Interval interval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp()); |
| ShardSpec shardSpec = new NumberedShardSpec(segmentAllocatePartitionCounter++, 0); |
| return (RetType) new SegmentIdentifier(action.getDataSource(), interval, "latestVersion", shardSpec); |
| } |
| |
| return null; |
| } |
| }; |
| |
| final DataSegmentPusher pusher = new DataSegmentPusher() |
| { |
| @Deprecated |
| @Override |
| public String getPathForHadoop(String dataSource) |
| { |
| return getPathForHadoop(); |
| } |
| |
| @Override |
| public String getPathForHadoop() |
| { |
| return null; |
| } |
| |
| @Override |
| public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException |
| { |
| segments.add(segment); |
| return segment; |
| } |
| |
| @Override |
| public Map<String, Object> makeLoadSpec(URI uri) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| }; |
| |
| final TaskToolbox box = new TaskToolbox( |
| null, |
| actionClient, |
| null, |
| pusher, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| jsonMapper, |
| temporaryFolder.newFolder(), |
| indexIO, |
| null, |
| null, |
| indexMergerV9, |
| null, |
| null, |
| null, |
| null |
| ); |
| |
| indexTask.isReady(box.getTaskActionClient()); |
| indexTask.run(box); |
| |
| Collections.sort(segments); |
| |
| return segments; |
| } |
| |
| private IndexTask.IndexIngestionSpec createIngestionSpec( |
| File baseDir, |
| ParseSpec parseSpec, |
| GranularitySpec granularitySpec, |
| IndexTuningConfig tuningConfig, |
| boolean appendToExisting |
| ) |
| { |
| return createIngestionSpec(baseDir, parseSpec, TransformSpec.NONE, granularitySpec, tuningConfig, appendToExisting); |
| } |
| |
| private IndexTask.IndexIngestionSpec createIngestionSpec( |
| File baseDir, |
| ParseSpec parseSpec, |
| TransformSpec transformSpec, |
| GranularitySpec granularitySpec, |
| IndexTuningConfig tuningConfig, |
| boolean appendToExisting |
| ) |
| { |
| return new IndexTask.IndexIngestionSpec( |
| new DataSchema( |
| "test", |
| jsonMapper.convertValue( |
| new StringInputRowParser( |
| parseSpec != null ? parseSpec : DEFAULT_PARSE_SPEC, |
| null |
| ), |
| Map.class |
| ), |
| new AggregatorFactory[]{ |
| new LongSumAggregatorFactory("val", "val") |
| }, |
| granularitySpec != null ? granularitySpec : new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.MINUTE, |
| Arrays.asList(Intervals.of("2014/2015")) |
| ), |
| transformSpec, |
| jsonMapper |
| ), |
| new IndexTask.IndexIOConfig( |
| new LocalFirehoseFactory( |
| baseDir, |
| "druid*", |
| null |
| ), |
| appendToExisting |
| ), |
| tuningConfig |
| ); |
| } |
| |
| private static IndexTuningConfig createTuningConfig( |
| Integer targetPartitionSize, |
| Integer numShards, |
| boolean forceExtendableShardSpecs, |
| boolean forceGuaranteedRollup |
| ) |
| { |
| return createTuningConfig( |
| targetPartitionSize, |
| 1, |
| null, |
| numShards, |
| forceExtendableShardSpecs, |
| forceGuaranteedRollup, |
| true |
| ); |
| } |
| |
| private static IndexTuningConfig createTuningConfig( |
| Integer targetPartitionSize, |
| Integer maxRowsInMemory, |
| Long maxTotalRows, |
| Integer numShards, |
| boolean forceExtendableShardSpecs, |
| boolean forceGuaranteedRollup, |
| boolean reportParseException |
| ) |
| { |
| return new IndexTask.IndexTuningConfig( |
| targetPartitionSize, |
| maxRowsInMemory, |
| maxTotalRows, |
| null, |
| numShards, |
| indexSpec, |
| null, |
| true, |
| forceExtendableShardSpecs, |
| forceGuaranteedRollup, |
| reportParseException, |
| null, |
| null, |
| null |
| ); |
| } |
| } |