blob: 2cf78cc937f061c44178319e2083ed9d09609e12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime.appenderator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
public class BatchAppenderatorDriverTest extends EasyMockSupport
{
private static final String DATA_SOURCE = "foo";
private static final String VERSION = "abc123";
private static final int MAX_ROWS_IN_MEMORY = 100;
private static final long TIMEOUT = 1000;
private static final List<InputRow> ROWS = Arrays.asList(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("dim1"),
ImmutableMap.of("dim1", "foo", "met1", "1")
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("dim1"),
ImmutableMap.of("dim1", "foo", "met1", 2.0)
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("dim2"),
ImmutableMap.of("dim2", "bar", "met1", 2.0)
)
);
private SegmentAllocator allocator;
private BatchAppenderatorTester appenderatorTester;
private BatchAppenderatorDriver driver;
private DataSegmentKiller dataSegmentKiller;
static {
NullHandling.initializeForTests();
}
@Before
public void setup()
{
appenderatorTester = new BatchAppenderatorTester(MAX_ROWS_IN_MEMORY);
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
driver = new BatchAppenderatorDriver(
appenderatorTester.getAppenderator(),
allocator,
new TestUsedSegmentChecker(appenderatorTester.getPushedSegments()),
dataSegmentKiller
);
EasyMock.replay(dataSegmentKiller);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(dataSegmentKiller);
driver.clear();
driver.close();
}
@Test
public void testSimple() throws Exception
{
Assert.assertNull(driver.startJob(null));
for (InputRow row : ROWS) {
Assert.assertTrue(driver.add(row, "dummy").isOk());
}
checkSegmentStates(2, SegmentState.APPENDING);
driver.pushAllAndClear(TIMEOUT);
checkSegmentStates(2, SegmentState.PUSHED_AND_DROPPED);
final SegmentsAndCommitMetadata published =
driver.publishAll(null, null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)),
new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0))
),
published.getSegments()
.stream()
.map(SegmentIdWithShardSpec::fromDataSegment)
.collect(Collectors.toSet())
);
Assert.assertNull(published.getCommitMetadata());
}
@Test
public void testIncrementalPush() throws Exception
{
Assert.assertNull(driver.startJob(null));
int i = 0;
for (InputRow row : ROWS) {
Assert.assertTrue(driver.add(row, "dummy").isOk());
checkSegmentStates(1, SegmentState.APPENDING);
checkSegmentStates(i, SegmentState.PUSHED_AND_DROPPED);
driver.pushAllAndClear(TIMEOUT);
checkSegmentStates(0, SegmentState.APPENDING);
checkSegmentStates(++i, SegmentState.PUSHED_AND_DROPPED);
}
final SegmentsAndCommitMetadata published =
driver.publishAll(null, null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertEquals(
ImmutableSet.of(
new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)),
new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0)),
new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(1, 0))
),
published.getSegments()
.stream()
.map(SegmentIdWithShardSpec::fromDataSegment)
.collect(Collectors.toSet())
);
Assert.assertNull(published.getCommitMetadata());
}
@Test
public void testRestart()
{
Assert.assertNull(driver.startJob(null));
driver.close();
appenderatorTester.getAppenderator().close();
Assert.assertNull(driver.startJob(null));
}
private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState expectedState)
{
final SegmentsForSequence segmentsForSequence = driver.getSegments().get("dummy");
Assert.assertNotNull(segmentsForSequence);
final List<SegmentWithState> segmentWithStates = segmentsForSequence
.allSegmentStateStream()
.filter(segmentWithState -> segmentWithState.getState() == expectedState)
.collect(Collectors.toList());
Assert.assertEquals(expectedNumSegmentsInState, segmentWithStates.size());
}
static TransactionalSegmentPublisher makeOkPublisher()
{
return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of());
}
static class TestSegmentAllocator implements SegmentAllocator
{
private final String dataSource;
private final Granularity granularity;
private final Map<Long, AtomicInteger> counters = new HashMap<>();
public TestSegmentAllocator(String dataSource, Granularity granularity)
{
this.dataSource = dataSource;
this.granularity = granularity;
}
@Override
public SegmentIdWithShardSpec allocate(
final InputRow row,
final String sequenceName,
final String previousSegmentId,
final boolean skipSegmentLineageCheck
)
{
DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp());
final long timestampTruncated = dateTimeTruncated.getMillis();
counters.putIfAbsent(timestampTruncated, new AtomicInteger());
final int partitionNum = counters.get(timestampTruncated).getAndIncrement();
return new SegmentIdWithShardSpec(
dataSource,
granularity.bucket(dateTimeTruncated),
VERSION,
new NumberedShardSpec(partitionNum, 0)
);
}
}
}