blob: e025abe8739715c6032396505f38415a4410a4dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.aws.ddbstream;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.Stream;
import com.amazonaws.services.dynamodbv2.model.StreamDescription;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class ShardIteratorHandlerTest {
private ShardIteratorHandler undertest;
@Mock
private AmazonDynamoDBStreams amazonDynamoDBStreams;
private final CamelContext context = new DefaultCamelContext();
private final DdbStreamComponent component = new DdbStreamComponent(context);
private final DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, new DdbStreamConfiguration(), component);
@Before
public void setup() throws Exception {
endpoint.getConfiguration().setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams);
endpoint.start();
undertest = new ShardIteratorHandler(endpoint);
when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn(
new ListStreamsResult()
.withStreams(new Stream()
.withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp")
)
);
when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn(
new DescribeStreamResult()
.withStreamDescription(
new StreamDescription()
.withTableName("table_name")
.withShards(
ShardListTest.createShardsWithSequenceNumbers(null,
"a", "1", "5",
"b", "8", "15",
"c", "16", "16",
"d", "20", null
)
)
)
);
when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() {
@Override
public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable {
return new GetShardIteratorResult()
.withShardIterator("shard_iterator_"
+ ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId()
+ "_000");
}
});
}
@Test
public void latestOnlyUsesTheLastShard() throws Exception {
endpoint.getConfiguration().setIteratorType(ShardIteratorType.LATEST);
String shardIterator = undertest.getShardIterator(null);
ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
assertThat(getIteratorCaptor.getValue().getShardId(), is("d"));
assertThat(shardIterator, is("shard_iterator_d_000"));
}
@Test
public void cachesRecentShardId() throws Exception {
endpoint.getConfiguration().setIteratorType(ShardIteratorType.LATEST);
undertest.updateShardIterator("bar");
String shardIterator = undertest.getShardIterator(null);
verify(amazonDynamoDBStreams, times(0)).getShardIterator(any(GetShardIteratorRequest.class));
assertThat(shardIterator, is("bar"));
}
@Test
public void trimHorizonStartsWithTheFirstShard() throws Exception {
endpoint.getConfiguration().setIteratorType(ShardIteratorType.TRIM_HORIZON);
String shardIterator = undertest.getShardIterator(null);
ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
assertThat(getIteratorCaptor.getValue().getShardId(), is("a"));
assertThat(shardIterator, is("shard_iterator_a_000"));
}
@Test
public void trimHorizonWalksAllShards() throws Exception {
endpoint.getConfiguration().setIteratorType(ShardIteratorType.TRIM_HORIZON);
String[] shardIterators = new String[4];
for (int i = 0; i < shardIterators.length; ++i) {
shardIterators[i] = undertest.getShardIterator(null);
undertest.updateShardIterator(null);
}
ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture());
String[] shards = new String[]{"a", "b", "c", "d"};
for (int i = 0; i < shards.length; ++i) {
assertThat(getIteratorCaptor.getAllValues().get(i).getShardId(), is(shards[i]));
}
assertThat(shardIterators, is(new String[]{"shard_iterator_a_000", "shard_iterator_b_000", "shard_iterator_c_000", "shard_iterator_d_000"}));
}
@Test
public void atSeqNumber12StartsWithShardB() throws Exception {
endpoint.getConfiguration().setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
endpoint.getConfiguration().setSequenceNumberProvider(new StaticSequenceNumberProvider("12"));
String shardIterator = undertest.getShardIterator(null);
ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
assertThat(getIteratorCaptor.getValue().getShardId(), is("b"));
assertThat(shardIterator, is("shard_iterator_b_000"));
}
@Test
public void afterSeqNumber16StartsWithShardD() throws Exception {
endpoint.getConfiguration().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
endpoint.getConfiguration().setSequenceNumberProvider(new StaticSequenceNumberProvider("16"));
String shardIterator = undertest.getShardIterator(null);
ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
assertThat(getIteratorCaptor.getValue().getShardId(), is("d"));
assertThat(shardIterator, is("shard_iterator_d_000"));
}
@Test
public void resumingFromSomewhereActuallyUsesTheAfterSequenceNumber() throws Exception {
endpoint.getConfiguration().setIteratorType(ShardIteratorType.LATEST);
String shardIterator = undertest.getShardIterator("12");
ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
assertThat(getIteratorCaptor.getValue().getShardId(), is("b"));
assertThat(shardIterator, is("shard_iterator_b_000"));
assertThat(getIteratorCaptor.getValue().getShardIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER.name()));
assertThat(getIteratorCaptor.getValue().getSequenceNumber(), is("12"));
}
}