blob: aeadd46adb666fe4dce618f675ffc13cca04bfcf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
*/
public class IndexGeneratorCombinerTest
{
private AggregatorFactory[] aggregators;
private IndexGeneratorJob.IndexGeneratorCombiner combiner;
@Before
public void setUp() throws Exception
{
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "keywords")), null, null)
),
null
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
new UniformGranularitySpec(
Granularities.DAY,
Granularities.NONE,
ImmutableList.of(Intervals.of("2010/2011"))
),
null,
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(
ImmutableMap.of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
Configuration hadoopConfig = new Configuration();
hadoopConfig.set(
HadoopDruidIndexerConfig.CONFIG_PROPERTY,
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(config)
);
Reducer.Context context = EasyMock.createMock(Reducer.Context.class);
EasyMock.expect(context.getConfiguration()).andReturn(hadoopConfig);
EasyMock.replay(context);
aggregators = config.getSchema().getDataSchema().getAggregators();
combiner = new IndexGeneratorJob.IndexGeneratorCombiner();
combiner.setup(context);
}
@Test
public void testSingleRowNoMergePassThrough() throws Exception
{
Reducer.Context context = EasyMock.createMock(Reducer.Context.class);
Capture<BytesWritable> captureKey = Capture.newInstance();
Capture<BytesWritable> captureVal = Capture.newInstance();
context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal));
EasyMock.replay(context);
BytesWritable key = new BytesWritable(StringUtils.toUtf8("dummy_key"));
BytesWritable val = new BytesWritable(StringUtils.toUtf8("dummy_row"));
combiner.reduce(key, Collections.singletonList(val), context);
Assert.assertTrue(captureKey.getValue() == key);
Assert.assertTrue(captureVal.getValue() == val);
}
@Test
public void testMultipleRowsMerged() throws Exception
{
long timestamp = System.currentTimeMillis();
Bucket bucket = new Bucket(0, DateTimes.utc(timestamp), 0);
SortableBytes keySortableBytes = new SortableBytes(
bucket.toGroupKey(),
new byte[0]
);
BytesWritable key = keySortableBytes.toBytesWritable();
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("host"),
new StringDimensionSchema("keywords")
),
null,
null
);
Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap = InputRowSerde.getTypeHelperMap(dimensionsSpec);
InputRow row1 = new MapBasedInputRow(
timestamp,
ImmutableList.of("keywords"),
ImmutableMap.of(
"host", "host1",
"keywords", Arrays.asList("foo", "bar"),
"visited", 10
)
);
InputRow row2 = new MapBasedInputRow(
timestamp,
ImmutableList.of("keywords"),
ImmutableMap.of(
"host", "host2",
"keywords", Arrays.asList("foo", "bar"),
"visited", 5
)
);
List<BytesWritable> rows = Lists.newArrayList(
new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row1, aggregators).getSerializedRow()),
new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row2, aggregators).getSerializedRow())
);
Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class);
Capture<BytesWritable> captureKey = Capture.newInstance();
Capture<BytesWritable> captureVal = Capture.newInstance();
context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal));
EasyMock.replay(context);
combiner.reduce(
key,
rows,
context
);
EasyMock.verify(context);
Assert.assertTrue(captureKey.getValue() == key);
InputRow capturedRow = InputRowSerde.fromBytes(typeHelperMap, captureVal.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow.getDimensions());
Assert.assertEquals(ImmutableList.of(), capturedRow.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow.getDimension("keywords"));
Assert.assertEquals(15, capturedRow.getMetric("visited_sum").longValue());
Assert.assertEquals(
2.0,
(Double) HyperUniquesAggregatorFactory.estimateCardinality(
capturedRow.getRaw("unique_hosts"),
false
),
0.001
);
}
@Test
public void testMultipleRowsNotMerged() throws Exception
{
long timestamp = System.currentTimeMillis();
Bucket bucket = new Bucket(0, DateTimes.utc(timestamp), 0);
SortableBytes keySortableBytes = new SortableBytes(
bucket.toGroupKey(),
new byte[0]
);
BytesWritable key = keySortableBytes.toBytesWritable();
InputRow row1 = new MapBasedInputRow(
timestamp,
ImmutableList.of("host", "keywords"),
ImmutableMap.of(
"host", "host1",
"keywords", Arrays.asList("foo", "bar"),
"visited", 10
)
);
InputRow row2 = new MapBasedInputRow(
timestamp,
ImmutableList.of("host", "keywords"),
ImmutableMap.of(
"host", "host2",
"keywords", Arrays.asList("foo", "bar"),
"visited", 5
)
);
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("host"),
new StringDimensionSchema("keywords")
),
null,
null
);
Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap = InputRowSerde.getTypeHelperMap(dimensionsSpec);
List<BytesWritable> rows = Lists.newArrayList(
new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row1, aggregators).getSerializedRow()),
new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row2, aggregators).getSerializedRow())
);
Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class);
Capture<BytesWritable> captureKey1 = Capture.newInstance();
Capture<BytesWritable> captureVal1 = Capture.newInstance();
Capture<BytesWritable> captureKey2 = Capture.newInstance();
Capture<BytesWritable> captureVal2 = Capture.newInstance();
context.write(EasyMock.capture(captureKey1), EasyMock.capture(captureVal1));
context.write(EasyMock.capture(captureKey2), EasyMock.capture(captureVal2));
EasyMock.replay(context);
combiner.reduce(
key,
rows,
context
);
EasyMock.verify(context);
Assert.assertTrue(captureKey1.getValue() == key);
Assert.assertTrue(captureKey2.getValue() == key);
InputRow capturedRow1 = InputRowSerde.fromBytes(typeHelperMap, captureVal1.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow1.getDimensions());
Assert.assertEquals(Collections.singletonList("host1"), capturedRow1.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow1.getDimension("keywords"));
Assert.assertEquals(10, capturedRow1.getMetric("visited_sum").longValue());
Assert.assertEquals(
1.0,
(Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow1.getRaw("unique_hosts"), false),
0.001
);
InputRow capturedRow2 = InputRowSerde.fromBytes(typeHelperMap, captureVal2.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow2.getDimensions());
Assert.assertEquals(Collections.singletonList("host2"), capturedRow2.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow2.getDimension("keywords"));
Assert.assertEquals(5, capturedRow2.getMetric("visited_sum").longValue());
Assert.assertEquals(
1.0,
(Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow2.getRaw("unique_hosts"), false),
0.001
);
}
}