| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.common.task; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Sets; |
| import com.google.common.io.Files; |
| import nl.jqno.equalsverifier.EqualsVerifier; |
| import org.apache.druid.data.input.InputFormat; |
| import org.apache.druid.data.input.impl.CSVParseSpec; |
| import org.apache.druid.data.input.impl.CsvInputFormat; |
| import org.apache.druid.data.input.impl.DimensionsSpec; |
| import org.apache.druid.data.input.impl.FloatDimensionSchema; |
| import org.apache.druid.data.input.impl.JSONParseSpec; |
| import org.apache.druid.data.input.impl.JsonInputFormat; |
| import org.apache.druid.data.input.impl.LocalInputSource; |
| import org.apache.druid.data.input.impl.LongDimensionSchema; |
| import org.apache.druid.data.input.impl.ParseSpec; |
| import org.apache.druid.data.input.impl.StringDimensionSchema; |
| import org.apache.druid.data.input.impl.StringInputRowParser; |
| import org.apache.druid.data.input.impl.TimestampSpec; |
| import org.apache.druid.indexer.TaskState; |
| import org.apache.druid.indexer.TaskStatus; |
| import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; |
| import org.apache.druid.indexer.partitions.HashedPartitionsSpec; |
| import org.apache.druid.indexer.partitions.PartitionsSpec; |
| import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; |
| import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; |
| import org.apache.druid.indexing.common.LockGranularity; |
| import org.apache.druid.indexing.common.TaskReport; |
| import org.apache.druid.indexing.common.actions.SegmentAllocateAction; |
| import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; |
| import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; |
| import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.guava.Sequence; |
| import org.apache.druid.math.expr.ExprMacroTable; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| import org.apache.druid.query.aggregation.LongSumAggregatorFactory; |
| import org.apache.druid.query.dimension.DefaultDimensionSpec; |
| import org.apache.druid.query.filter.SelectorDimFilter; |
| import org.apache.druid.segment.Cursor; |
| import org.apache.druid.segment.DimensionSelector; |
| import org.apache.druid.segment.IndexIO; |
| import org.apache.druid.segment.IndexSpec; |
| import org.apache.druid.segment.QueryableIndexStorageAdapter; |
| import org.apache.druid.segment.VirtualColumns; |
| import org.apache.druid.segment.incremental.RowIngestionMeters; |
| import org.apache.druid.segment.incremental.RowIngestionMetersFactory; |
| import org.apache.druid.segment.indexing.DataSchema; |
| import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; |
| import org.apache.druid.segment.indexing.granularity.GranularitySpec; |
| import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; |
| import org.apache.druid.segment.loading.SegmentLoader; |
| import org.apache.druid.segment.loading.SegmentLoaderConfig; |
| import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; |
| import org.apache.druid.segment.loading.StorageLocationConfig; |
| import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; |
| import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory; |
| import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; |
| import org.apache.druid.segment.transform.ExpressionTransform; |
| import org.apache.druid.segment.transform.TransformSpec; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; |
| import org.apache.druid.timeline.partition.HashPartitionFunction; |
| import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec; |
| import org.apache.druid.timeline.partition.NumberedShardSpec; |
| import org.apache.druid.timeline.partition.PartitionIds; |
| import org.hamcrest.CoreMatchers; |
| import org.joda.time.Interval; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import javax.annotation.Nullable; |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| @RunWith(Parameterized.class) |
| public class IndexTaskTest extends IngestionTestBase |
| { |
| @Rule |
| public TemporaryFolder temporaryFolder = new TemporaryFolder(); |
| |
| @Rule |
| public ExpectedException expectedException = ExpectedException.none(); |
| |
| private static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null); |
| private static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new DimensionsSpec( |
| DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")) |
| ); |
| private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( |
| DEFAULT_TIMESTAMP_SPEC, |
| DEFAULT_DIMENSIONS_SPEC, |
| null, |
| Arrays.asList("ts", "dim", "val"), |
| false, |
| 0 |
| ); |
| private static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat( |
| Arrays.asList("ts", "dim", "val"), |
| null, |
| null, |
| false, |
| 0 |
| ); |
| |
| @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}") |
| public static Iterable<Object[]> constructorFeeder() |
| { |
| return ImmutableList.of( |
| new Object[]{LockGranularity.TIME_CHUNK, false}, |
| new Object[]{LockGranularity.TIME_CHUNK, true}, |
| new Object[]{LockGranularity.SEGMENT, true} |
| ); |
| } |
| |
| private static final IndexSpec INDEX_SPEC = new IndexSpec(); |
| private final ObjectMapper jsonMapper; |
| private final IndexIO indexIO; |
| private final RowIngestionMetersFactory rowIngestionMetersFactory; |
| private final LockGranularity lockGranularity; |
| private final boolean useInputFormatApi; |
| |
| private AppenderatorsManager appenderatorsManager; |
| private SegmentLoader segmentLoader; |
| private TestTaskRunner taskRunner; |
| |
| public IndexTaskTest(LockGranularity lockGranularity, boolean useInputFormatApi) |
| { |
| this.jsonMapper = getObjectMapper(); |
| this.indexIO = getIndexIO(); |
| this.rowIngestionMetersFactory = getRowIngestionMetersFactory(); |
| this.lockGranularity = lockGranularity; |
| this.useInputFormatApi = useInputFormatApi; |
| } |
| |
| @Before |
| public void setup() throws IOException |
| { |
| appenderatorsManager = new TestAppenderatorsManager(); |
| |
| final File cacheDir = temporaryFolder.newFolder(); |
| segmentLoader = new SegmentLoaderLocalCacheManager( |
| indexIO, |
| new SegmentLoaderConfig() |
| { |
| @Override |
| public List<StorageLocationConfig> getLocations() |
| { |
| return Collections.singletonList( |
| new StorageLocationConfig(cacheDir, null, null) |
| ); |
| } |
| }, |
| jsonMapper |
| ); |
| taskRunner = new TestTaskRunner(); |
| } |
| |
| @Test |
| public void testDeterminePartitions() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| null, |
| null, |
| createTuningConfigWithMaxRowsPerSegment(2, true), |
| false |
| ), |
| null |
| ); |
| |
| Assert.assertFalse(indexTask.supportsQueries()); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(2, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| Assert.assertEquals(2, segments.get(0).getShardSpec().getNumCorePartitions()); |
| Assert.assertEquals( |
| HashPartitionFunction.MURMUR3_32_ABS, |
| ((HashBasedNumberedShardSpec) segments.get(0).getShardSpec()).getPartitionFunction() |
| ); |
| |
| Assert.assertEquals("test", segments.get(1).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); |
| Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); |
| Assert.assertEquals(2, segments.get(1).getShardSpec().getNumCorePartitions()); |
| Assert.assertEquals( |
| HashPartitionFunction.MURMUR3_32_ABS, |
| ((HashBasedNumberedShardSpec) segments.get(1).getShardSpec()).getPartitionFunction() |
| ); |
| } |
| |
| @Test |
| public void testTransformSpec() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,an|array,1|2|3,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,another|array,3|4,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,and|another,0|1,1\n"); |
| } |
| |
| final DimensionsSpec dimensionsSpec = new DimensionsSpec( |
| DimensionsSpec.getDefaultSchemas( |
| Arrays.asList( |
| "ts", |
| "dim", |
| "dim_array", |
| "dim_num_array", |
| "dimt", |
| "dimtarray1", |
| "dimtarray2", |
| "dimtnum_array" |
| ) |
| ) |
| ); |
| final List<String> columns = Arrays.asList("ts", "dim", "dim_array", "dim_num_array", "val"); |
| final String listDelimiter = "|"; |
| final TransformSpec transformSpec = new TransformSpec( |
| new SelectorDimFilter("dim", "b", null), |
| ImmutableList.of( |
| new ExpressionTransform("dimt", "concat(dim,dim)", ExprMacroTable.nil()), |
| new ExpressionTransform("dimtarray1", "array(dim, dim)", ExprMacroTable.nil()), |
| new ExpressionTransform( |
| "dimtarray2", |
| "map(d -> concat(d, 'foo'), dim_array)", |
| ExprMacroTable.nil() |
| ), |
| new ExpressionTransform("dimtnum_array", "map(d -> d + 3, dim_num_array)", ExprMacroTable.nil()) |
| ) |
| ); |
| final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, false); |
| final IndexIngestionSpec indexIngestionSpec; |
| if (useInputFormatApi) { |
| indexIngestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| DEFAULT_TIMESTAMP_SPEC, |
| dimensionsSpec, |
| new CsvInputFormat(columns, listDelimiter, null, false, 0), |
| transformSpec, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| indexIngestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, dimensionsSpec, listDelimiter, columns, false, 0), |
| transformSpec, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| indexIngestionSpec, |
| null |
| ); |
| |
| Assert.assertEquals(indexTask.getId(), indexTask.getGroupId()); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(1, segments.size()); |
| DataSegment segment = segments.get(0); |
| final File segmentFile = segmentLoader.getSegmentFiles(segment); |
| |
| final WindowedStorageAdapter adapter = new WindowedStorageAdapter( |
| new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)), |
| segment.getInterval() |
| ); |
| final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors( |
| null, |
| segment.getInterval(), |
| VirtualColumns.EMPTY, |
| Granularities.ALL, |
| false, |
| null |
| ); |
| final List<Map<String, Object>> transforms = cursorSequence |
| .map(cursor -> { |
| final DimensionSelector selector1 = cursor.getColumnSelectorFactory() |
| .makeDimensionSelector(new DefaultDimensionSpec("dimt", "dimt")); |
| final DimensionSelector selector2 = cursor.getColumnSelectorFactory() |
| .makeDimensionSelector(new DefaultDimensionSpec( |
| "dimtarray1", |
| "dimtarray1" |
| )); |
| final DimensionSelector selector3 = cursor.getColumnSelectorFactory() |
| .makeDimensionSelector(new DefaultDimensionSpec( |
| "dimtarray2", |
| "dimtarray2" |
| )); |
| final DimensionSelector selector4 = cursor.getColumnSelectorFactory() |
| .makeDimensionSelector(new DefaultDimensionSpec( |
| "dimtnum_array", |
| "dimtnum_array" |
| )); |
| |
| |
| Map<String, Object> row = new HashMap<>(); |
| row.put("dimt", selector1.defaultGetObject()); |
| row.put("dimtarray1", selector2.defaultGetObject()); |
| row.put("dimtarray2", selector3.defaultGetObject()); |
| row.put("dimtnum_array", selector4.defaultGetObject()); |
| cursor.advance(); |
| return row; |
| }) |
| .toList(); |
| Assert.assertEquals(1, transforms.size()); |
| Assert.assertEquals("bb", transforms.get(0).get("dimt")); |
| Assert.assertEquals(ImmutableList.of("b", "b"), transforms.get(0).get("dimtarray1")); |
| Assert.assertEquals(ImmutableList.of("anotherfoo", "arrayfoo"), transforms.get(0).get("dimtarray2")); |
| Assert.assertEquals(ImmutableList.of("6.0", "7.0"), transforms.get(0).get("dimtnum_array")); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| } |
| |
| @Test |
| public void testWithArbitraryGranularity() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new ArbitraryGranularitySpec( |
| Granularities.MINUTE, |
| Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) |
| ), |
| null, |
| createTuningConfigWithMaxRowsPerSegment(10, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(1, segments.size()); |
| } |
| |
| @Test |
| public void testIntervalBucketing() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T07:59:59.977Z,a,1\n"); |
| writer.write("2014-01-01T08:00:00.000Z,b,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new UniformGranularitySpec( |
| Granularities.HOUR, |
| Granularities.HOUR, |
| Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z")) |
| ), |
| null, |
| createTuningConfigWithMaxRowsPerSegment(50, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(1, segments.size()); |
| } |
| |
| @Test |
| public void testNumShardsProvided() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| null, |
| null, |
| createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 1, null), true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(1, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| Assert.assertEquals( |
| HashPartitionFunction.MURMUR3_32_ABS, |
| ((HashBasedNumberedShardSpec) segments.get(0).getShardSpec()).getPartitionFunction() |
| ); |
| } |
| |
| @Test |
| public void testNumShardsAndHashPartitionFunctionProvided() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| null, |
| null, |
| createTuningConfigWithPartitionsSpec( |
| new HashedPartitionsSpec(null, 1, null, HashPartitionFunction.MURMUR3_32_ABS), true |
| ), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(1, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| Assert.assertEquals( |
| HashPartitionFunction.MURMUR3_32_ABS, |
| ((HashBasedNumberedShardSpec) segments.get(0).getShardSpec()).getPartitionFunction() |
| ); |
| } |
| |
| @Test |
| public void testNumShardsAndPartitionDimensionsProvided() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| final IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| null, |
| null, |
| createTuningConfigWithPartitionsSpec(new HashedPartitionsSpec(null, 2, ImmutableList.of("dim")), true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(2, segments.size()); |
| |
| for (DataSegment segment : segments) { |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); |
| final HashBasedNumberedShardSpec hashBasedNumberedShardSpec = (HashBasedNumberedShardSpec) segment.getShardSpec(); |
| Assert.assertEquals(HashPartitionFunction.MURMUR3_32_ABS, hashBasedNumberedShardSpec.getPartitionFunction()); |
| |
| final File segmentFile = segmentLoader.getSegmentFiles(segment); |
| |
| final WindowedStorageAdapter adapter = new WindowedStorageAdapter( |
| new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)), |
| segment.getInterval() |
| ); |
| |
| final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors( |
| null, |
| segment.getInterval(), |
| VirtualColumns.EMPTY, |
| Granularities.ALL, |
| false, |
| null |
| ); |
| final List<Integer> hashes = cursorSequence |
| .map(cursor -> { |
| final DimensionSelector selector = cursor.getColumnSelectorFactory() |
| .makeDimensionSelector(new DefaultDimensionSpec("dim", "dim")); |
| final int hash = HashPartitionFunction.MURMUR3_32_ABS.hash( |
| HashBasedNumberedShardSpec.serializeGroupKey( |
| jsonMapper, |
| Collections.singletonList(selector.getObject()) |
| ), |
| hashBasedNumberedShardSpec.getNumBuckets() |
| ); |
| cursor.advance(); |
| return hash; |
| }) |
| .toList(); |
| |
| Assert.assertTrue(hashes.stream().allMatch(h -> h.intValue() == hashes.get(0))); |
| } |
| } |
| |
| @Test |
| public void testWriteNewSegmentsWithAppendToExistingWithLinearPartitioningSuccessfullyAppend() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| null, |
| null, |
| createTuningConfigWithMaxRowsPerSegment(2, false), |
| true |
| ), |
| null |
| ); |
| |
| Assert.assertEquals("index_append_test", indexTask.getGroupId()); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(2, taskRunner.getTaskActionClient().getActionCount(SegmentAllocateAction.class)); |
| Assert.assertEquals(2, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| |
| Assert.assertEquals("test", segments.get(1).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); |
| Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); |
| } |
| |
| @Test |
| public void testIntervalNotSpecified() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,1\n"); |
| writer.write("2014-01-01T02:00:30Z,c,1\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new UniformGranularitySpec( |
| Granularities.HOUR, |
| Granularities.MINUTE, |
| null |
| ), |
| null, |
| createTuningConfigWithMaxRowsPerSegment(2, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(3, segments.size()); |
| |
| Assert.assertEquals("test", segments.get(0).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014-01-01T00/PT1H"), segments.get(0).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); |
| |
| Assert.assertEquals("test", segments.get(1).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014-01-01T01/PT1H"), segments.get(1).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(1).getShardSpec().getPartitionNum()); |
| |
| Assert.assertEquals("test", segments.get(2).getDataSource()); |
| Assert.assertEquals(Intervals.of("2014-01-01T02/PT1H"), segments.get(2).getInterval()); |
| Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(2).getShardSpec().getClass()); |
| Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum()); |
| } |
| |
| @Test |
| public void testCSVFileWithHeader() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,d,val\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); |
| final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, true); |
| final IndexIngestionSpec ingestionSpec; |
| if (useInputFormatApi) { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| timestampSpec, |
| DimensionsSpec.EMPTY, |
| new CsvInputFormat(null, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| ingestionSpec, |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(1, segments.size()); |
| |
| Assert.assertEquals(Collections.singletonList("d"), segments.get(0).getDimensions()); |
| Assert.assertEquals(Collections.singletonList("val"), segments.get(0).getMetrics()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| } |
| |
| @Test |
| public void testCSVFileWithHeaderColumnOverride() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,d,val\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); |
| final List<String> columns = Arrays.asList("time", "dim", "val"); |
| final IndexTuningConfig tuningConfig = createTuningConfigWithMaxRowsPerSegment(2, true); |
| final IndexIngestionSpec ingestionSpec; |
| if (useInputFormatApi) { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| timestampSpec, |
| DimensionsSpec.EMPTY, |
| new CsvInputFormat(columns, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| ingestionSpec, |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(1, segments.size()); |
| |
| Assert.assertEquals(Collections.singletonList("d"), segments.get(0).getDimensions()); |
| Assert.assertEquals(Collections.singletonList("val"), segments.get(0).getMetrics()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| } |
| |
| @Test |
| public void testWithSmallMaxTotalRows() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T00:00:10Z,b,2\n"); |
| writer.write("2014-01-01T00:00:10Z,c,3\n"); |
| writer.write("2014-01-01T01:00:20Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,b,2\n"); |
| writer.write("2014-01-01T01:00:20Z,c,3\n"); |
| writer.write("2014-01-01T02:00:30Z,a,1\n"); |
| writer.write("2014-01-01T02:00:30Z,b,2\n"); |
| writer.write("2014-01-01T02:00:30Z,c,3\n"); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new UniformGranularitySpec( |
| Granularities.HOUR, |
| Granularities.MINUTE, |
| null |
| ), |
| null, |
| createTuningConfig(2, 2, null, 2L, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(6, segments.size()); |
| |
| for (int i = 0; i < 6; i++) { |
| final DataSegment segment = segments.get(i); |
| final Interval expectedInterval = Intervals.of(StringUtils.format("2014-01-01T0%d/PT1H", (i / 2))); |
| final int expectedPartitionNum = i % 2; |
| |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(expectedInterval, segment.getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); |
| Assert.assertEquals(expectedPartitionNum, segment.getShardSpec().getPartitionNum()); |
| } |
| } |
| |
| @Test |
| public void testPerfectRollup() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| populateRollupTestData(tmpFile); |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.DAY, |
| true, |
| null |
| ), |
| null, |
| createTuningConfig(3, 2, null, 2L, null, true, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(3, segments.size()); |
| |
| for (int i = 0; i < 3; i++) { |
| final DataSegment segment = segments.get(i); |
| final Interval expectedInterval = Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); |
| |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(expectedInterval, segment.getInterval()); |
| Assert.assertTrue(segment.getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class)); |
| Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); |
| } |
| } |
| |
| @Test |
| public void testBestEffortRollup() throws Exception |
| { |
| File tmpDir = temporaryFolder.newFolder(); |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| populateRollupTestData(tmpFile); |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.DAY, |
| true, |
| null |
| ), |
| null, |
| createTuningConfig(3, 2, null, 2L, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(5, segments.size()); |
| |
| final Interval expectedInterval = Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); |
| for (int i = 0; i < 5; i++) { |
| final DataSegment segment = segments.get(i); |
| |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(expectedInterval, segment.getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); |
| Assert.assertEquals(i, segment.getShardSpec().getPartitionNum()); |
| } |
| } |
| |
| private static void populateRollupTestData(File tmpFile) throws IOException |
| { |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| writer.write("2014-01-01T01:00:20Z,a,1\n"); |
| writer.write("2014-01-01T00:00:10Z,b,2\n"); |
| writer.write("2014-01-01T00:00:10Z,c,3\n"); |
| writer.write("2014-01-01T01:00:20Z,b,2\n"); |
| writer.write("2014-01-01T02:00:30Z,a,1\n"); |
| writer.write("2014-01-01T02:00:30Z,b,2\n"); |
| writer.write("2014-01-01T01:00:20Z,c,3\n"); |
| writer.write("2014-01-01T02:00:30Z,c,3\n"); |
| } |
| } |
| |
| @Test |
| public void testIgnoreParseException() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,d,val\n"); |
| writer.write("unparseable,a,1\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); |
| final List<String> columns = Arrays.asList("time", "dim", "val"); |
| // ignore parse exception |
| final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, false); |
| |
| // GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in |
| // IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments() |
| final IndexIngestionSpec parseExceptionIgnoreSpec; |
| if (useInputFormatApi) { |
| parseExceptionIgnoreSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| timestampSpec, |
| DimensionsSpec.EMPTY, |
| new CsvInputFormat(columns, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| parseExceptionIgnoreSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| parseExceptionIgnoreSpec, |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(Collections.singletonList("d"), segments.get(0).getDimensions()); |
| Assert.assertEquals(Collections.singletonList("val"), segments.get(0).getMetrics()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); |
| } |
| |
| @Test |
| public void testReportParseException() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,d,val\n"); |
| writer.write("unparseable,a,1\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); |
| final List<String> columns = Arrays.asList("time", "dim", "val"); |
| // report parse exception |
| final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true); |
| final IndexIngestionSpec indexIngestionSpec; |
| if (useInputFormatApi) { |
| indexIngestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| timestampSpec, |
| DimensionsSpec.EMPTY, |
| new CsvInputFormat(columns, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| indexIngestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(timestampSpec, DimensionsSpec.EMPTY, null, columns, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| indexIngestionSpec, |
| null |
| ); |
| |
| TaskStatus status = runTask(indexTask).lhs; |
| Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); |
| checkTaskStatusErrorMsgForParseExceptionsExceeded(status); |
| |
| Map<String, Object> expectedUnparseables = ImmutableMap.of( |
| RowIngestionMeters.DETERMINE_PARTITIONS, |
| new ArrayList<>(), |
| RowIngestionMeters.BUILD_SEGMENTS, |
| Collections.singletonList("Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a, val=1}") |
| ); |
| IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); |
| Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents()); |
| } |
| |
| @Test |
| public void testMultipleParseExceptionsSuccess() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("{\"time\":\"unparseable\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n"); // unparseable time |
| writer.write("{\"time\":\"2014-01-01T00:00:10Z\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n"); // valid row |
| writer.write( |
| "{\"time\":\"2014-01-01T00:00:10Z\",\"dim\":\"b\",\"dimLong\":\"notnumber\",\"dimFloat\":3.0,\"val\":1}\n"); // row with invalid long dimension |
| writer.write( |
| "{\"time\":\"2014-01-01T00:00:10Z\",\"dim\":\"b\",\"dimLong\":2,\"dimFloat\":\"notnumber\",\"val\":1}\n"); // row with invalid float dimension |
| writer.write( |
| "{\"time\":\"2014-01-01T00:00:10Z\",\"dim\":\"b\",\"dimLong\":2,\"dimFloat\":4.0,\"val\":\"notnumber\"}\n"); // row with invalid metric |
| writer.write("{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n"); // invalid JSON |
| writer.write( |
| "{\"time\":\"3014-03-01T00:00:10Z\",\"dim\":\"outsideofinterval\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n"); // thrown away |
| writer.write("{\"time\":\"99999999999-01-01T00:00:10Z\",\"dim\":\"b\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n"); // unparseable time |
| writer.write("this is not JSON\n"); // invalid JSON |
| } |
| |
| final IndexTuningConfig tuningConfig = new IndexTuningConfig( |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| new HashedPartitionsSpec(2, null, null), |
| INDEX_SPEC, |
| null, |
| null, |
| true, |
| false, |
| null, |
| null, |
| null, |
| true, |
| 7, |
| 7 |
| ); |
| |
| final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); |
| final DimensionsSpec dimensionsSpec = new DimensionsSpec( |
| Arrays.asList( |
| new StringDimensionSchema("dim"), |
| new LongDimensionSchema("dimLong"), |
| new FloatDimensionSchema("dimFloat") |
| ) |
| ); |
| final IndexIngestionSpec ingestionSpec; |
| if (useInputFormatApi) { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| timestampSpec, |
| dimensionsSpec, |
| new JsonInputFormat(null, null, null), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new JSONParseSpec(timestampSpec, dimensionsSpec, null, null, null), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| ingestionSpec, |
| null |
| ); |
| |
| TaskStatus status = runTask(indexTask).lhs; |
| Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); |
| Assert.assertEquals(null, status.getErrorMsg()); |
| |
| IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); |
| |
| Map<String, Object> expectedMetrics = ImmutableMap.of( |
| RowIngestionMeters.DETERMINE_PARTITIONS, |
| ImmutableMap.of( |
| RowIngestionMeters.PROCESSED_WITH_ERROR, 0, |
| RowIngestionMeters.PROCESSED, 4, |
| RowIngestionMeters.UNPARSEABLE, 4, |
| RowIngestionMeters.THROWN_AWAY, 1 |
| ), |
| RowIngestionMeters.BUILD_SEGMENTS, |
| ImmutableMap.of( |
| |
| RowIngestionMeters.PROCESSED_WITH_ERROR, 3, |
| RowIngestionMeters.PROCESSED, 1, |
| RowIngestionMeters.UNPARSEABLE, 4, |
| RowIngestionMeters.THROWN_AWAY, 1 |
| ) |
| ); |
| Assert.assertEquals(expectedMetrics, reportData.getRowStats()); |
| |
| Map<String, Object> expectedUnparseables = ImmutableMap.of( |
| RowIngestionMeters.DETERMINE_PARTITIONS, |
| Arrays.asList( |
| "Unable to parse row [this is not JSON]", |
| "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}", |
| "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]", |
| "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}" |
| ), |
| RowIngestionMeters.BUILD_SEGMENTS, |
| Arrays.asList( |
| "Unable to parse row [this is not JSON]", |
| "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}", |
| "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]", |
| "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=4.0, val=notnumber}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [Unable to parse value[notnumber] for field[val],]", |
| "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=notnumber, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to float,]", |
| "Found unparseable columns in row: [MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z, event={time=2014-01-01T00:00:10Z, dim=b, dimLong=notnumber, dimFloat=3.0, val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,]", |
| "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}" |
| ) |
| ); |
| |
| Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents()); |
| } |
| |
| @Test |
| public void testMultipleParseExceptionsFailure() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,dim,dimLong,dimFloat,val\n"); |
| writer.write("unparseable,a,2,3.0,1\n"); // unparseable |
| writer.write("2014-01-01T00:00:10Z,a,2,3.0,1\n"); // valid row |
| writer.write("9.0,a,2,3.0,1\n"); // unparseable |
| writer.write("3014-03-01T00:00:10Z,outsideofinterval,2,3.0,1\n"); // thrown away |
| writer.write("99999999999-01-01T00:00:10Z,b,2,3.0,1\n"); // unparseable |
| } |
| |
| // Allow up to 3 parse exceptions, and save up to 2 parse exceptions |
| final IndexTuningConfig tuningConfig = new IndexTuningConfig( |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| new DynamicPartitionsSpec(2, null), |
| INDEX_SPEC, |
| null, |
| null, |
| false, |
| false, |
| null, |
| null, |
| null, |
| true, |
| 2, |
| 5 |
| ); |
| |
| final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); |
| final DimensionsSpec dimensionsSpec = new DimensionsSpec( |
| Arrays.asList( |
| new StringDimensionSchema("dim"), |
| new LongDimensionSchema("dimLong"), |
| new FloatDimensionSchema("dimFloat") |
| ) |
| ); |
| final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"); |
| final IndexIngestionSpec ingestionSpec; |
| if (useInputFormatApi) { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| timestampSpec, |
| dimensionsSpec, |
| new CsvInputFormat(columns, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| ingestionSpec, |
| null |
| ); |
| |
| TaskStatus status = runTask(indexTask).lhs; |
| Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); |
| checkTaskStatusErrorMsgForParseExceptionsExceeded(status); |
| |
| IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); |
| |
| Map<String, Object> expectedMetrics = ImmutableMap.of( |
| RowIngestionMeters.DETERMINE_PARTITIONS, |
| ImmutableMap.of( |
| RowIngestionMeters.PROCESSED_WITH_ERROR, 0, |
| RowIngestionMeters.PROCESSED, 0, |
| RowIngestionMeters.UNPARSEABLE, 0, |
| RowIngestionMeters.THROWN_AWAY, 0 |
| ), |
| RowIngestionMeters.BUILD_SEGMENTS, |
| ImmutableMap.of( |
| RowIngestionMeters.PROCESSED_WITH_ERROR, 0, |
| RowIngestionMeters.PROCESSED, 1, |
| RowIngestionMeters.UNPARSEABLE, 3, |
| RowIngestionMeters.THROWN_AWAY, useInputFormatApi ? 1 : 2 |
| ) |
| ); |
| |
| Assert.assertEquals(expectedMetrics, reportData.getRowStats()); |
| |
| Map<String, Object> expectedUnparseables = ImmutableMap.of( |
| RowIngestionMeters.DETERMINE_PARTITIONS, |
| new ArrayList<>(), |
| RowIngestionMeters.BUILD_SEGMENTS, |
| Arrays.asList( |
| "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}", |
| "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}", |
| "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}" |
| ) |
| ); |
| |
| Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents()); |
| } |
| |
| @Test |
| public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("time,dim,dimLong,dimFloat,val\n"); |
| writer.write("unparseable,a,2,3.0,1\n"); // unparseable |
| writer.write("2014-01-01T00:00:10Z,a,2,3.0,1\n"); // valid row |
| writer.write("9.0,a,2,3.0,1\n"); // unparseable |
| writer.write("3014-03-01T00:00:10Z,outsideofinterval,2,3.0,1\n"); // thrown away |
| writer.write("99999999999-01-01T00:00:10Z,b,2,3.0,1\n"); // unparseable |
| } |
| |
| // Allow up to 3 parse exceptions, and save up to 2 parse exceptions |
| final IndexTuningConfig tuningConfig = new IndexTuningConfig( |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| new HashedPartitionsSpec(2, null, null), |
| INDEX_SPEC, |
| null, |
| null, |
| true, |
| false, |
| null, |
| null, |
| null, |
| true, |
| 2, |
| 5 |
| ); |
| |
| final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); |
| final DimensionsSpec dimensionsSpec = new DimensionsSpec( |
| Arrays.asList( |
| new StringDimensionSchema("dim"), |
| new LongDimensionSchema("dimLong"), |
| new FloatDimensionSchema("dimFloat") |
| ) |
| ); |
| final List<String> columns = Arrays.asList("time", "dim", "dimLong", "dimFloat", "val"); |
| final IndexIngestionSpec ingestionSpec; |
| if (useInputFormatApi) { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| timestampSpec, |
| dimensionsSpec, |
| new CsvInputFormat(columns, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(timestampSpec, dimensionsSpec, null, columns, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| ingestionSpec, |
| null |
| ); |
| |
| TaskStatus status = runTask(indexTask).lhs; |
| Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); |
| checkTaskStatusErrorMsgForParseExceptionsExceeded(status); |
| |
| IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); |
| |
| Map<String, Object> expectedMetrics = ImmutableMap.of( |
| RowIngestionMeters.DETERMINE_PARTITIONS, |
| ImmutableMap.of( |
| RowIngestionMeters.PROCESSED_WITH_ERROR, 0, |
| RowIngestionMeters.PROCESSED, 1, |
| RowIngestionMeters.UNPARSEABLE, 3, |
| RowIngestionMeters.THROWN_AWAY, useInputFormatApi ? 1 : 2 |
| ), |
| RowIngestionMeters.BUILD_SEGMENTS, |
| ImmutableMap.of( |
| RowIngestionMeters.PROCESSED_WITH_ERROR, 0, |
| RowIngestionMeters.PROCESSED, 0, |
| RowIngestionMeters.UNPARSEABLE, 0, |
| RowIngestionMeters.THROWN_AWAY, 0 |
| ) |
| ); |
| |
| Assert.assertEquals(expectedMetrics, reportData.getRowStats()); |
| |
| Map<String, Object> expectedUnparseables = ImmutableMap.of( |
| RowIngestionMeters.DETERMINE_PARTITIONS, |
| Arrays.asList( |
| "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}", |
| "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}", |
| "Timestamp[unparseable] is unparseable! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}" |
| ), |
| RowIngestionMeters.BUILD_SEGMENTS, |
| new ArrayList<>() |
| ); |
| |
| Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents()); |
| } |
| |
| @Test |
| public void testCsvWithHeaderOfEmptyColumns() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("ts,,\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("ts,dim,\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write("ts,,val\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| // report parse exception |
| final IndexTuningConfig tuningConfig = createTuningConfig(2, 1, null, null, null, true, true); |
| final IndexIngestionSpec ingestionSpec; |
| if (useInputFormatApi) { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| DEFAULT_TIMESTAMP_SPEC, |
| DimensionsSpec.EMPTY, |
| new CsvInputFormat(null, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| ingestionSpec, |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| // the order of result segments can be changed because hash shardSpec is used. |
| // the below loop is to make this test deterministic. |
| Assert.assertEquals(2, segments.size()); |
| Assert.assertNotEquals(segments.get(0), segments.get(1)); |
| |
| for (DataSegment segment : segments) { |
| System.out.println(segment.getDimensions()); |
| } |
| |
| for (int i = 0; i < 2; i++) { |
| final DataSegment segment = segments.get(i); |
| final Set<String> dimensions = new HashSet<>(segment.getDimensions()); |
| |
| Assert.assertTrue( |
| StringUtils.format("Actual dimensions: %s", dimensions), |
| dimensions.equals(Sets.newHashSet("column_2")) || |
| dimensions.equals(Sets.newHashSet("dim", "column_2", "column_3")) |
| ); |
| |
| Assert.assertEquals(Collections.singletonList("val"), segment.getMetrics()); |
| Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval()); |
| } |
| } |
| |
| @Test |
| public void testCsvWithHeaderOfEmptyTimestamp() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { |
| writer.write(",,\n"); |
| writer.write("2014-01-01T00:00:10Z,a,1\n"); |
| } |
| |
| final List<String> columns = Arrays.asList("ts", "", ""); |
| // report parse exception |
| final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null, null, null, false, true); |
| final IndexIngestionSpec ingestionSpec; |
| if (useInputFormatApi) { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| DEFAULT_TIMESTAMP_SPEC, |
| DimensionsSpec.EMPTY, |
| new CsvInputFormat(columns, null, null, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } else { |
| ingestionSpec = createIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new CSVParseSpec(DEFAULT_TIMESTAMP_SPEC, DimensionsSpec.EMPTY, null, columns, true, 0), |
| null, |
| null, |
| tuningConfig, |
| false |
| ); |
| } |
| |
| IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| ingestionSpec, |
| null |
| ); |
| |
| TaskStatus status = runTask(indexTask).lhs; |
| Assert.assertEquals(TaskState.FAILED, status.getStatusCode()); |
| |
| checkTaskStatusErrorMsgForParseExceptionsExceeded(status); |
| |
| IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); |
| |
| Map<String, Object> expectedUnparseables = ImmutableMap.of( |
| RowIngestionMeters.DETERMINE_PARTITIONS, |
| new ArrayList<>(), |
| RowIngestionMeters.BUILD_SEGMENTS, |
| Collections.singletonList( |
| "Timestamp[null] is unparseable! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}") |
| ); |
| Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents()); |
| } |
| |
| @Test |
| public void testOverwriteWithSameSegmentGranularity() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| populateRollupTestData(tmpFile); |
| |
| for (int i = 0; i < 2; i++) { |
| final IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.DAY, |
| true, |
| null |
| ), |
| null, |
| createTuningConfig(3, 2, null, 2L, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(5, segments.size()); |
| |
| final Interval expectedInterval = Intervals.of("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z"); |
| for (int j = 0; j < 5; j++) { |
| final DataSegment segment = segments.get(j); |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(expectedInterval, segment.getInterval()); |
| if (i == 0) { |
| Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); |
| Assert.assertEquals(j, segment.getShardSpec().getPartitionNum()); |
| } else { |
| if (lockGranularity == LockGranularity.SEGMENT) { |
| Assert.assertEquals(NumberedOverwriteShardSpec.class, segment.getShardSpec().getClass()); |
| final NumberedOverwriteShardSpec numberedOverwriteShardSpec = |
| (NumberedOverwriteShardSpec) segment.getShardSpec(); |
| Assert.assertEquals( |
| j + PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, |
| numberedOverwriteShardSpec.getPartitionNum() |
| ); |
| Assert.assertEquals(1, numberedOverwriteShardSpec.getMinorVersion()); |
| Assert.assertEquals(5, numberedOverwriteShardSpec.getAtomicUpdateGroupSize()); |
| Assert.assertEquals(0, numberedOverwriteShardSpec.getStartRootPartitionId()); |
| Assert.assertEquals(5, numberedOverwriteShardSpec.getEndRootPartitionId()); |
| } else { |
| Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); |
| final NumberedShardSpec numberedShardSpec = (NumberedShardSpec) segment.getShardSpec(); |
| Assert.assertEquals(j, numberedShardSpec.getPartitionNum()); |
| } |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testOverwriteWithDifferentSegmentGranularity() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| final File tmpFile = File.createTempFile("druid", "index", tmpDir); |
| |
| populateRollupTestData(tmpFile); |
| |
| for (int i = 0; i < 2; i++) { |
| final Granularity segmentGranularity = i == 0 ? Granularities.DAY : Granularities.MONTH; |
| final IndexTask indexTask = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| new UniformGranularitySpec( |
| segmentGranularity, |
| Granularities.DAY, |
| true, |
| null |
| ), |
| null, |
| createTuningConfig(3, 2, null, 2L, null, false, true), |
| false |
| ), |
| null |
| ); |
| |
| final List<DataSegment> segments = runTask(indexTask).rhs; |
| |
| Assert.assertEquals(5, segments.size()); |
| |
| final Interval expectedInterval = i == 0 |
| ? Intervals.of("2014-01-01/2014-01-02") |
| : Intervals.of("2014-01-01/2014-02-01"); |
| for (int j = 0; j < 5; j++) { |
| final DataSegment segment = segments.get(j); |
| Assert.assertEquals("test", segment.getDataSource()); |
| Assert.assertEquals(expectedInterval, segment.getInterval()); |
| Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass()); |
| Assert.assertEquals(j, segment.getShardSpec().getPartitionNum()); |
| } |
| } |
| } |
| |
| @Test |
| public void testIndexTaskWithSingleDimPartitionsSpecThrowingException() throws Exception |
| { |
| final File tmpDir = temporaryFolder.newFolder(); |
| final IndexTask task = new IndexTask( |
| null, |
| null, |
| createDefaultIngestionSpec( |
| jsonMapper, |
| tmpDir, |
| null, |
| null, |
| createTuningConfigWithPartitionsSpec(new SingleDimensionPartitionsSpec(null, 1, null, false), true), |
| false |
| ), |
| null |
| ); |
| expectedException.expect(UnsupportedOperationException.class); |
| expectedException.expectMessage( |
| "partitionsSpec[org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec] is not supported" |
| ); |
| task.isReady(createActionClient(task)); |
| } |
| |
| public static void checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status) |
| { |
| // full stacktrace will be too long and make tests brittle (e.g. if line # changes), just match the main message |
| Assert.assertThat( |
| status.getErrorMsg(), |
| CoreMatchers.containsString("Max parse exceptions") |
| ); |
| } |
| |
| private Pair<TaskStatus, List<DataSegment>> runTask(IndexTask task) throws Exception |
| { |
| task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); |
| final TaskStatus status = taskRunner.run(task).get(); |
| final List<DataSegment> segments = taskRunner.getPublishedSegments(); |
| return Pair.of(status, segments); |
| } |
| |
| private static IndexTuningConfig createTuningConfigWithMaxRowsPerSegment( |
| int maxRowsPerSegment, |
| boolean forceGuaranteedRollup |
| ) |
| { |
| return createTuningConfig( |
| maxRowsPerSegment, |
| 1, |
| null, |
| null, |
| null, |
| forceGuaranteedRollup, |
| true |
| ); |
| } |
| |
| private static IndexTuningConfig createTuningConfigWithPartitionsSpec( |
| PartitionsSpec partitionsSpec, |
| boolean forceGuaranteedRollup |
| ) |
| { |
| return createTuningConfig( |
| null, |
| 1, |
| null, |
| null, |
| partitionsSpec, |
| forceGuaranteedRollup, |
| true |
| ); |
| } |
| |
| static IndexTuningConfig createTuningConfig( |
| @Nullable Integer maxRowsPerSegment, |
| @Nullable Integer maxRowsInMemory, |
| @Nullable Long maxBytesInMemory, |
| @Nullable Long maxTotalRows, |
| @Nullable PartitionsSpec partitionsSpec, |
| boolean forceGuaranteedRollup, |
| boolean reportParseException |
| ) |
| { |
| return new IndexTuningConfig( |
| null, |
| maxRowsPerSegment, |
| null, |
| maxRowsInMemory, |
| maxBytesInMemory, |
| maxTotalRows, |
| null, |
| null, |
| null, |
| partitionsSpec, |
| INDEX_SPEC, |
| null, |
| null, |
| forceGuaranteedRollup, |
| reportParseException, |
| null, |
| null, |
| null, |
| null, |
| null, |
| 1 |
| ); |
| } |
| |
| private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException |
| { |
| Map<String, TaskReport> taskReports = jsonMapper.readValue( |
| taskRunner.getTaskReportsFile(), |
| new TypeReference<Map<String, TaskReport>>() |
| { |
| } |
| ); |
| return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports( |
| taskReports |
| ); |
| } |
| |
| private IndexIngestionSpec createDefaultIngestionSpec( |
| ObjectMapper objectMapper, |
| File baseDir, |
| @Nullable GranularitySpec granularitySpec, |
| @Nullable TransformSpec transformSpec, |
| IndexTuningConfig tuningConfig, |
| boolean appendToExisting |
| ) |
| { |
| if (useInputFormatApi) { |
| return createIngestionSpec( |
| objectMapper, |
| baseDir, |
| DEFAULT_TIMESTAMP_SPEC, |
| DEFAULT_DIMENSIONS_SPEC, |
| DEFAULT_INPUT_FORMAT, |
| transformSpec, |
| granularitySpec, |
| tuningConfig, |
| appendToExisting |
| ); |
| } else { |
| return createIngestionSpec( |
| objectMapper, |
| baseDir, |
| DEFAULT_PARSE_SPEC, |
| transformSpec, |
| granularitySpec, |
| tuningConfig, |
| appendToExisting |
| ); |
| } |
| } |
| |
| static IndexIngestionSpec createIngestionSpec( |
| ObjectMapper objectMapper, |
| File baseDir, |
| @Nullable ParseSpec parseSpec, |
| @Nullable TransformSpec transformSpec, |
| @Nullable GranularitySpec granularitySpec, |
| IndexTuningConfig tuningConfig, |
| boolean appendToExisting |
| ) |
| { |
| return createIngestionSpec( |
| objectMapper, |
| baseDir, |
| parseSpec, |
| null, |
| null, |
| null, |
| transformSpec, |
| granularitySpec, |
| tuningConfig, |
| appendToExisting |
| ); |
| } |
| |
| static IndexIngestionSpec createIngestionSpec( |
| ObjectMapper objectMapper, |
| File baseDir, |
| TimestampSpec timestampSpec, |
| DimensionsSpec dimensionsSpec, |
| InputFormat inputFormat, |
| @Nullable TransformSpec transformSpec, |
| @Nullable GranularitySpec granularitySpec, |
| IndexTuningConfig tuningConfig, |
| boolean appendToExisting |
| ) |
| { |
| return createIngestionSpec( |
| objectMapper, |
| baseDir, |
| null, |
| timestampSpec, |
| dimensionsSpec, |
| inputFormat, |
| transformSpec, |
| granularitySpec, |
| tuningConfig, |
| appendToExisting |
| ); |
| } |
| |
| private static IndexIngestionSpec createIngestionSpec( |
| ObjectMapper objectMapper, |
| File baseDir, |
| @Nullable ParseSpec parseSpec, |
| @Nullable TimestampSpec timestampSpec, |
| @Nullable DimensionsSpec dimensionsSpec, |
| @Nullable InputFormat inputFormat, |
| @Nullable TransformSpec transformSpec, |
| @Nullable GranularitySpec granularitySpec, |
| IndexTuningConfig tuningConfig, |
| boolean appendToExisting |
| ) |
| { |
| if (inputFormat != null) { |
| Preconditions.checkArgument(parseSpec == null, "Can't use parseSpec"); |
| return new IndexIngestionSpec( |
| new DataSchema( |
| "test", |
| Preconditions.checkNotNull(timestampSpec, "timestampSpec"), |
| Preconditions.checkNotNull(dimensionsSpec, "dimensionsSpec"), |
| new AggregatorFactory[]{ |
| new LongSumAggregatorFactory("val", "val") |
| }, |
| granularitySpec != null ? granularitySpec : new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.MINUTE, |
| Collections.singletonList(Intervals.of("2014/2015")) |
| ), |
| transformSpec |
| ), |
| new IndexIOConfig( |
| null, |
| new LocalInputSource(baseDir, "druid*"), |
| inputFormat, |
| appendToExisting |
| ), |
| tuningConfig |
| ); |
| } else { |
| return new IndexIngestionSpec( |
| new DataSchema( |
| "test", |
| objectMapper.convertValue( |
| new StringInputRowParser( |
| parseSpec != null ? parseSpec : DEFAULT_PARSE_SPEC, |
| null |
| ), |
| Map.class |
| ), |
| new AggregatorFactory[]{ |
| new LongSumAggregatorFactory("val", "val") |
| }, |
| granularitySpec != null ? granularitySpec : new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.MINUTE, |
| Collections.singletonList(Intervals.of("2014/2015")) |
| ), |
| transformSpec, |
| objectMapper |
| ), |
| new IndexIOConfig( |
| new LocalFirehoseFactory( |
| baseDir, |
| "druid*", |
| null |
| ), |
| appendToExisting |
| ), |
| tuningConfig |
| ); |
| } |
| } |
| |
| @Test |
| public void testEqualsAndHashCode() |
| { |
| EqualsVerifier.forClass(IndexTuningConfig.class) |
| .usingGetClass() |
| .verify(); |
| } |
| } |