blob: 235c8296c0abc1a48108a079e39f9ff171621fb3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.kinesis.consumer;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.model.Record;
import org.apache.samza.system.kinesis.KinesisConfig;
import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
import static org.apache.samza.system.kinesis.consumer.TestKinesisRecordProcessor.*;
import static org.mockito.Mockito.*;
/**
* These class of tests test KinesisSystemConsumer and KinesisRecordProcessor together.
*/
public class TestKinesisSystemConsumer {
private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // Could be any string
@Test
public void testProcessRecords() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
String system = "kinesis";
String stream = "stream";
int numShards = 2;
int numRecordsPerShard = 5;
testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
}
@Test
public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
String system = "kinesis";
String stream = "stream";
int numShards = 1;
int numRecordsPerShard = 0;
testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
}
/**
* Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards
* 1. Creation of record processors.
* 2. Initialization of record processors.
* 3. Processing records via record processors.
* 4. Calling checkpoint on record processors.
* 5. Shutting down (due to re-assignment or lease expiration) record processors.
*/
private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
throws InterruptedException, NoSuchFieldException, IllegalAccessException {
KinesisConfig kConfig = new KinesisConfig(new MapConfig());
// Create consumer
KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry());
initializeMetrics(consumer, stream);
List<SystemStreamPartition> ssps = new LinkedList<>();
IntStream.range(0, numShards)
.forEach(p -> {
SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
ssps.add(ssp);
});
ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));
// Create Kinesis record processor factory
IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream);
// Create and initialize Kinesis record processor
Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards);
List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values());
// Generate records to Kinesis record processor
Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList);
// Verification steps
// Read events from the BEM queue
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
if (numRecordsPerShard > 0) {
Assert.assertEquals(messages.size(), numShards);
} else {
// No input records and hence no messages
Assert.assertEquals(messages.size(), 0);
return;
}
Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
ssps.forEach(ssp -> {
try {
KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
// Verify that the read messages are received in order and are the same as input records
Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
List<Record> inputRecords = inputRecordMap.get(processor);
verifyRecords(envelopes, inputRecords, processor.getShardId());
// Call checkpoint on consumer and verify that the checkpoint is called with the right offset
IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
verify(getCheckpointer(processor)).checkpoint(argument.capture());
Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
// Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
shutDownProcessor(processor, ShutdownReason.ZOMBIE);
Assert.assertFalse(sspToProcessorMap.containsValue(processor));
Assert.assertTrue(isSspAvailable(consumer, ssp));
} catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
throw new RuntimeException(ex);
}
});
}
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
IntStream.range(0, numShards)
.forEach(p -> {
String shardId = String.format("shard-%05d", p);
// Create Kinesis processor
KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
// Initialize the shard
ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
InitializationInput initializationInput =
new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
processor.initialize(initializationInput);
processorMap.put(shardId, processor);
});
return processorMap;
}
private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> readEvents(Set<SystemStreamPartition> ssps,
KinesisSystemConsumer consumer, int numEvents) throws InterruptedException {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<>();
int totalEventsConsumed = 0;
while (totalEventsConsumed < numEvents) {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages =
consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
receivedMessages.forEach((key, value) -> {
if (messages.containsKey(key)) {
messages.get(key).addAll(value);
} else {
messages.put(key, new ArrayList<>(value));
}
});
totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum();
}
if (totalEventsConsumed < numEvents) {
String msg = String.format("Received only %d of %d events", totalEventsConsumed, numEvents);
throw new SamzaException(msg);
}
return messages;
}
private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) {
Iterator outputRecordsIter = outputRecords.iterator();
inputRecords.forEach(record -> {
IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
String outputKey = (String) envelope.getKey();
KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
Assert.assertEquals(outputKey, record.getPartitionKey());
Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
record.getApproximateArrivalTimestamp());
Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
record.getData().rewind();
Assert.assertEquals(outputData, record.getData());
verifyOffset(envelope.getOffset(), record, shardId);
});
}
private void verifyOffset(String offset, Record inputRecord, String shardId) {
KinesisSystemConsumerOffset ckpt = KinesisSystemConsumerOffset.parse(offset);
Assert.assertEquals(ckpt.getSeqNumber(), inputRecord.getSequenceNumber());
Assert.assertEquals(ckpt.getShardId(), shardId);
}
@SuppressWarnings("unchecked")
private void initializeMetrics(KinesisSystemConsumer consumer, String stream)
throws NoSuchFieldException, IllegalAccessException {
Field f = consumer.getClass().getDeclaredField("metrics");
f.setAccessible(true);
KinesisSystemConsumerMetrics metrics = (KinesisSystemConsumerMetrics) f.get(consumer);
metrics.initializeMetrics(Collections.singleton(stream));
}
@SuppressWarnings("unchecked")
private Map<SystemStreamPartition, KinesisRecordProcessor> getProcessorMap(KinesisSystemConsumer consumer)
throws NoSuchFieldException, IllegalAccessException {
Field f = consumer.getClass().getDeclaredField("processors");
f.setAccessible(true);
return (Map<SystemStreamPartition, KinesisRecordProcessor>) f.get(consumer);
}
@SuppressWarnings("unchecked")
private boolean isSspAvailable(KinesisSystemConsumer consumer, SystemStreamPartition ssp)
throws NoSuchFieldException, IllegalAccessException {
SSPAllocator sspAllocator = getSspAllocator(consumer);
Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
f.setAccessible(true);
Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get(
sspAllocator);
return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp);
}
private SSPAllocator getSspAllocator(KinesisSystemConsumer consumer)
throws NoSuchFieldException, IllegalAccessException {
Field f = consumer.getClass().getDeclaredField("sspAllocator");
f.setAccessible(true);
return (SSPAllocator) f.get(consumer);
}
}