blob: 38aedb83da66df6fe7c988c676356be719717494 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Mockito.when;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import java.io.IOException;
import java.util.Collections;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
/** Tests {@link ShardRecordsIterator}. */
@RunWith(MockitoJUnitRunner.Silent.class)
public class ShardRecordsIteratorTest {
private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
private static final String STREAM_NAME = "STREAM_NAME";
private static final String SHARD_ID = "SHARD_ID";
private static final Instant NOW = Instant.now();
@Mock private SimplifiedKinesisClient kinesisClient;
@Mock private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
@Mock private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
@Mock private KinesisRecord a, b, c, d;
@Mock private RecordFilter recordFilter;
private ShardRecordsIterator iterator;
@Before
public void setUp() throws IOException, TransientKinesisException {
when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn(firstResult);
when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn(secondResult);
when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID)).thenReturn(thirdResult);
when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
when(firstResult.getRecords()).thenReturn(Collections.emptyList());
when(secondResult.getRecords()).thenReturn(Collections.emptyList());
when(thirdResult.getRecords()).thenReturn(Collections.emptyList());
when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint.class)))
.thenAnswer(new IdentityAnswer());
WatermarkPolicyFactory watermarkPolicyFactory = WatermarkPolicyFactory.withArrivalTimePolicy();
iterator =
new ShardRecordsIterator(
firstCheckpoint, kinesisClient, watermarkPolicyFactory, recordFilter);
}
@Test
public void goesThroughAvailableRecords()
throws IOException, TransientKinesisException, KinesisShardClosedException {
when(firstResult.getRecords()).thenReturn(asList(a, b, c));
when(secondResult.getRecords()).thenReturn(singletonList(d));
when(thirdResult.getRecords()).thenReturn(Collections.emptyList());
assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
assertThat(iterator.readNextBatch()).isEqualTo(asList(a, b, c));
assertThat(iterator.readNextBatch()).isEqualTo(singletonList(d));
assertThat(iterator.readNextBatch()).isEqualTo(Collections.emptyList());
}
@Test
public void conformingRecordsMovesCheckpoint() throws IOException, TransientKinesisException {
when(firstResult.getRecords()).thenReturn(asList(a, b, c));
when(secondResult.getRecords()).thenReturn(singletonList(d));
when(thirdResult.getRecords()).thenReturn(Collections.emptyList());
when(a.getApproximateArrivalTimestamp()).thenReturn(NOW);
when(b.getApproximateArrivalTimestamp()).thenReturn(NOW.plus(Duration.standardSeconds(1)));
when(c.getApproximateArrivalTimestamp()).thenReturn(NOW.plus(Duration.standardSeconds(2)));
when(d.getApproximateArrivalTimestamp()).thenReturn(NOW.plus(Duration.standardSeconds(3)));
iterator.ackRecord(a);
assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
iterator.ackRecord(b);
assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
iterator.ackRecord(c);
assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
iterator.ackRecord(d);
assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
}
@Test
public void refreshesExpiredIterator()
throws IOException, TransientKinesisException, KinesisShardClosedException {
when(firstResult.getRecords()).thenReturn(singletonList(a));
when(secondResult.getRecords()).thenReturn(singletonList(b));
when(a.getApproximateArrivalTimestamp()).thenReturn(NOW);
when(b.getApproximateArrivalTimestamp()).thenReturn(NOW.plus(Duration.standardSeconds(1)));
when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
.thenThrow(ExpiredIteratorException.class);
when(aCheckpoint.getShardIterator(kinesisClient)).thenReturn(SECOND_REFRESHED_ITERATOR);
when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
.thenReturn(secondResult);
assertThat(iterator.readNextBatch()).isEqualTo(singletonList(a));
iterator.ackRecord(a);
assertThat(iterator.readNextBatch()).isEqualTo(singletonList(b));
assertThat(iterator.readNextBatch()).isEqualTo(Collections.emptyList());
}
private static class IdentityAnswer implements Answer<Object> {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
return invocation.getArguments()[0];
}
}
}