blob: bc123b18968497ec71f699c3f290ec0656c6509b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamAppenderatorTest extends InitializedNullHandlingTest
{
private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
si("2000/2001", "A", 0),
si("2000/2001", "A", 1),
si("2001/2002", "A", 0)
);
@Test
public void testSimpleIngestion() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;
final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata);
// startJob
Assert.assertEquals(null, appenderator.startJob());
// getDataSource
Assert.assertEquals(StreamAppenderatorTester.DATASOURCE, appenderator.getDataSource());
// add
commitMetadata.put("x", "1");
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier)
.getNumRowsInSegment()
);
commitMetadata.put("x", "2");
Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar", 2), committerSupplier)
.getNumRowsInSegment()
);
commitMetadata.put("x", "3");
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 4), committerSupplier)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2), sorted(appenderator.getSegments()));
// getRowCount
Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
thrown = false;
try {
appenderator.getRowCount(IDENTIFIERS.get(2));
}
catch (IllegalStateException e) {
thrown = true;
}
Assert.assertTrue(thrown);
// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
committerSupplier.get(),
false
).get();
Assert.assertEquals(ImmutableMap.of("x", "3"), (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata());
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
sorted(
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
new Function<DataSegment, SegmentIdWithShardSpec>()
{
@Override
public SegmentIdWithShardSpec apply(DataSegment input)
{
return SegmentIdWithShardSpec.fromDataSegment(input);
}
}
)
)
);
Assert.assertEquals(sorted(tester.getPushedSegments()), sorted(segmentsAndCommitMetadata.getSegments()));
// clear
appenderator.clear();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
@Test
public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{
try (
final StreamAppenderatorTester tester = new StreamAppenderatorTester(
100,
1024,
null,
true,
new SimpleRowIngestionMeters(),
true
)
) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals(
182 + nullHandlingOverhead,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals(
182 + nullHandlingOverhead,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
appenderator.close();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
}
}
@Test
public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{
try (
final StreamAppenderatorTester tester = new StreamAppenderatorTester(
100,
1024,
null,
true,
new SimpleRowIngestionMeters(),
true
)
) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals(182 + nullHandlingOverhead, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals(
364 + 2 * nullHandlingOverhead,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
appenderator.close();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
}
}
@Test
public void testMaxBytesInMemory() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 15000, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 53; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
}
sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column.
int mappedIndexSize = 1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// Add a single row after persisted
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier);
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
currentInMemoryIndexSize = 182 + nullHandlingOverhead;
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 31; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
}
// currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
// persists.
mappedIndexSize = 2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
appenderator.close();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
}
}
@Test(expected = RuntimeException.class)
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 5180, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
}
}
@Test
public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
{
try (
final StreamAppenderatorTester tester = new StreamAppenderatorTester(
100,
10,
null,
true,
new SimpleRowIngestionMeters(),
true
)
) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
// Expected 0 since we persisted after the add
Assert.assertEquals(
0,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
// Expected 0 since we persisted after the add
Assert.assertEquals(
0,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
}
}
@Test
public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 10000, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = 1 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// Close with row still in memory (no persist)
appenderator.close();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
}
}
@Test
public void testMaxBytesInMemoryInMultipleSinks() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 31100, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
int sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
Assert.assertEquals(
(2 * currentInMemoryIndexSize) + sinkSizeOverhead,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// We do multiple more adds to the same sink to cause persist.
for (int i = 0; i < 49; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1), committerSupplier);
}
sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
// currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column.
int mappedIndexSize = 2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// Add a single row after persisted to sink 0
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier);
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
currentInMemoryIndexSize = 182 + nullHandlingOverhead;
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
0,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// Now add a single row to sink 1
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bob", 1), committerSupplier);
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
Assert.assertEquals(
(2 * currentInMemoryIndexSize) + sinkSizeOverhead + mappedIndexSize,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
// We do multiple more adds to the both sink to cause persist.
for (int i = 0; i < 34; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1), committerSupplier);
}
// currHydrant size is 0 since we just persist all indexes to disk.
currentInMemoryIndexSize = 0;
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(
currentInMemoryIndexSize,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
);
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
// 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
// persists.
mappedIndexSize = 2 * (2 * (1012 + (2 * StreamAppenderator.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
StreamAppenderator.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
StreamAppenderator.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER));
Assert.assertEquals(
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
appenderator.close();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getBytesCurrentlyInMemory());
}
}
@Test
public void testIgnoreMaxBytesInMemory() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, -1, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
//we still calculate the size even when ignoring it to make persist decision
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
Assert.assertEquals(
182 + nullHandlingOverhead,
((StreamAppenderator) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
);
Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
int sinkSizeOverhead = 2 * StreamAppenderator.ROUGH_OVERHEAD_PER_SINK;
Assert.assertEquals(
(364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
((StreamAppenderator) appenderator).getBytesCurrentlyInMemory()
);
Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.close();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
}
}
@Test
public void testMaxRowsInMemory() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
{
@Override
public Committer get()
{
final Object metadata = ImmutableMap.of("eventCount", eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
// Do nothing
}
};
}
};
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier);
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier);
Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier);
Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.close();
}
}
@Test
public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, false)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of("eventCount", eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
// Do nothing
}
};
};
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier, false);
Assert.assertEquals(1, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false);
Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier, false);
Assert.assertEquals(2, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 1), committerSupplier, false);
Assert.assertEquals(3, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "qux", 1), committerSupplier, false);
Assert.assertEquals(4, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier, false);
Assert.assertEquals(5, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((StreamAppenderator) appenderator).getRowsInMemory());
appenderator.close();
}
}
@Test
public void testRestoreFromDisk() throws Exception
{
final RealtimeTuningConfig tuningConfig;
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator();
tuningConfig = tester.getTuningConfig();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
{
@Override
public Committer get()
{
final Object metadata = ImmutableMap.of("eventCount", eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
// Do nothing
}
};
}
};
appenderator.startJob();
eventCount.incrementAndGet();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
eventCount.incrementAndGet();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar", 2), committerSupplier);
eventCount.incrementAndGet();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "baz", 3), committerSupplier);
eventCount.incrementAndGet();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "qux", 4), committerSupplier);
eventCount.incrementAndGet();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5), committerSupplier);
appenderator.close();
try (final StreamAppenderatorTester tester2 = new StreamAppenderatorTester(
2,
-1,
tuningConfig.getBasePersistDirectory(),
true
)) {
final Appenderator appenderator2 = tester2.getAppenderator();
Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob());
Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments());
Assert.assertEquals(4, appenderator2.getRowCount(IDENTIFIERS.get(0)));
}
}
}
@Test(timeout = 60_000L)
public void testTotalRowCount() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata);
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.startJob();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.persistAll(committerSupplier.get()).get();
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(0)).get();
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(1)).get();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), ir("2001", "bar", 1), committerSupplier);
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), ir("2001", "baz", 1), committerSupplier);
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), ir("2001", "qux", 1), committerSupplier);
Assert.assertEquals(3, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), ir("2001", "bob", 1), committerSupplier);
Assert.assertEquals(4, appenderator.getTotalRowCount());
appenderator.persistAll(committerSupplier.get()).get();
Assert.assertEquals(4, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(2)).get();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.close();
Assert.assertEquals(0, appenderator.getTotalRowCount());
}
}
@Test
public void testVerifyRowIngestionMetrics() throws Exception
{
final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Committers.nilSupplier());
Assert.assertEquals(1, rowIngestionMeters.getProcessed());
Assert.assertEquals(1, rowIngestionMeters.getProcessedWithError());
Assert.assertEquals(0, rowIngestionMeters.getUnparseable());
Assert.assertEquals(0, rowIngestionMeters.getThrownAway());
}
}
@Test
public void testQueryByIntervals() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
// Query1: 2000/2001
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2001")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results1 =
QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query1",
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
)
),
results1
);
// Query2: 2000/2002
final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2002")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results2 =
QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query2",
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L))
)
),
results2
);
// Query3: 2000/2001T01
final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2001T01")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results3 =
QueryPlus.wrap(query3).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L))
)
),
results3
);
// Query4: 2000/2001T01, 2001T03/2001T04
final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(
ImmutableList.of(
Intervals.of("2000/2001T01"),
Intervals.of("2001T03/2001T04")
)
)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results4 =
QueryPlus.wrap(query4).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L))
)
),
results4
);
}
}
@Test
public void testQueryBySegments() throws Exception
{
try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 2), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(1), ir("2000", "foo", 4), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001", "foo", 8), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T01", "foo", 16), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T02", "foo", 32), Suppliers.ofInstance(Committers.nil()));
appenderator.add(IDENTIFIERS.get(2), ir("2001T03", "foo", 64), Suppliers.ofInstance(Committers.nil()));
// Query1: segment #2
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
IDENTIFIERS.get(2).getInterval(),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.build();
final List<Result<TimeseriesResultValue>> results1 =
QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query1",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L))
)
),
results1
);
// Query2: segment #2, partial
final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.build();
final List<Result<TimeseriesResultValue>> results2 =
QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query2",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 1L, "met", 8L))
)
),
results2
);
// Query3: segment #2, two disjoint intervals
final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.build();
final List<Result<TimeseriesResultValue>> results3 =
QueryPlus.wrap(query3).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query3",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 2L, "met", 72L))
)
),
results3
);
final ScanQuery query4 = Druids.newScanQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.order(ScanQuery.Order.ASCENDING)
.batchSize(10)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final List<ScanResultValue> results4 =
QueryPlus.wrap(query4).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(2, results4.size()); // 2 segments, 1 row per segment
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L},
((List<Object>) ((List<Object>) results4.get(0).getEvents()).get(0)).toArray()
);
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L},
((List<Object>) ((List<Object>) results4.get(1).getEvents()).get(0)).toArray()
);
}
}
private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(
StreamAppenderatorTester.DATASOURCE,
Intervals.of(interval),
version,
new LinearShardSpec(partitionNum)
);
}
static InputRow ir(String ts, String dim, Object met)
{
return new MapBasedInputRow(
DateTimes.of(ts).getMillis(),
ImmutableList.of("dim"),
ImmutableMap.of(
"dim",
dim,
"met",
met
)
);
}
private static Supplier<Committer> committerSupplierFromConcurrentMap(final ConcurrentMap<String, String> map)
{
return new Supplier<Committer>()
{
@Override
public Committer get()
{
final Map<String, String> mapCopy = ImmutableMap.copyOf(map);
return new Committer()
{
@Override
public Object getMetadata()
{
return mapCopy;
}
@Override
public void run()
{
// Do nothing
}
};
}
};
}
private static <T> List<T> sorted(final List<T> xs)
{
final List<T> xsSorted = Lists.newArrayList(xs);
Collections.sort(
xsSorted,
(T a, T b) -> {
if (a instanceof SegmentIdWithShardSpec && b instanceof SegmentIdWithShardSpec) {
return ((SegmentIdWithShardSpec) a).compareTo(((SegmentIdWithShardSpec) b));
} else if (a instanceof DataSegment && b instanceof DataSegment) {
return ((DataSegment) a).getId().compareTo(((DataSegment) b).getId());
} else {
throw new IllegalStateException("BAD");
}
}
);
return xsSorted;
}
}