blob: 6563fc42f2e8a06ed2b60e54dbef6e939a4e1a3f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.contrib.kinesis;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.Pair;
/**
* Base implementation of Kinesis Output Operator. Convert tuples to records and emits to Kinesis.<br/>
*
* Configurations:<br/>
* {@link #accessKey} : AWS Credentials AccessKeyId <br/>
* {@link #secretKey} : AWS Credentials SecretAccessKey <br/>
* streamName : Name of the stream from where the records to be accessed
*
* @param <T>
* @since 2.0.0
*/
public abstract class AbstractKinesisOutputOperator<V, T> implements Operator, Operator.CheckpointNotificationListener
{
private static final Logger logger = LoggerFactory.getLogger( AbstractKinesisOutputOperator.class );
protected String streamName;
@NotNull
private String accessKey;
@NotNull
private String secretKey;
private String endPoint;
protected static transient AmazonKinesisClient client = null;
protected int sendCount;
protected boolean isBatchProcessing = true;
/**
* convert the value to record. the value is value of KeyValue pair.
* @see tupleToKeyValue()
* @param value
* @return
*/
protected abstract byte[] getRecord(V value);
/**
* convert tuple to pair of key and value. the key will be used as PartitionKey, and the value used as Data
* @param tuple
* @return
*/
protected abstract Pair<String, V> tupleToKeyValue(T tuple);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
// Max size of each record: 50KB, Max size of putRecords: 4.5MB
// So, default capacity would be 4.5MB/50KB = 92
@Min(2)
@Max(500)
protected int batchSize = 92;
/**
* Implement Operator Interface.
*/
@Override
public void beginWindow(long windowId)
{
}
@Override
public void endWindow()
{
}
/**
* Implement Component Interface.
*/
@Override
public void teardown()
{
}
/**
* Implement Operator Interface.
*/
@Override
public void beforeCheckpoint(long windowId)
{
if (isBatchProcessing && putRecordsRequestEntryList.size() != 0) {
try {
flushRecords();
} catch (AmazonClientException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void checkpointed(long windowId)
{
}
@Override
public void committed(long windowId)
{
}
/**
* Implement Component Interface.
*
* @param context
*/
@Override
public void setup(OperatorContext context)
{
try {
KinesisUtil.getInstance().createKinesisClient(accessKey, secretKey, endPoint);
} catch (Exception e) {
throw new RuntimeException("Unable to load Credentials", e);
}
this.setClient(KinesisUtil.getInstance().getClient());
if (isBatchProcessing) {
putRecordsRequestEntryList.clear();
}
}
/**
* This input port receives tuples that will be written out to Kinesis.
*/
public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
{
@Override
public void process(T tuple)
{
processTuple( tuple );
}
};
public void processTuple(T tuple)
{
// Send out single data
try {
if (isBatchProcessing) {
if (putRecordsRequestEntryList.size() == batchSize) {
flushRecords();
logger.debug( "flushed {} records.", batchSize );
}
addRecord(tuple);
} else {
Pair<String, V> keyValue = tupleToKeyValue(tuple);
PutRecordRequest requestRecord = new PutRecordRequest();
requestRecord.setStreamName(streamName);
requestRecord.setPartitionKey(keyValue.first);
requestRecord.setData(ByteBuffer.wrap(getRecord(keyValue.second)));
client.putRecord(requestRecord);
}
sendCount++;
} catch (AmazonClientException e) {
throw new RuntimeException(e);
}
}
private void addRecord(T tuple)
{
try {
Pair<String, V> keyValue = tupleToKeyValue(tuple);
PutRecordsRequestEntry putRecordsEntry = new PutRecordsRequestEntry();
putRecordsEntry.setData(ByteBuffer.wrap(getRecord(keyValue.second)));
putRecordsEntry.setPartitionKey(keyValue.first);
putRecordsRequestEntryList.add(putRecordsEntry);
} catch (AmazonClientException e) {
throw new RuntimeException(e);
}
}
private void flushRecords()
{
try {
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(streamName);
putRecordsRequest.setRecords(putRecordsRequestEntryList);
client.putRecords(putRecordsRequest);
putRecordsRequestEntryList.clear();
logger.debug( "Records flushed." );
} catch (AmazonClientException e) {
logger.warn( "PutRecordsRequest exception.", e );
throw new RuntimeException(e);
}
}
public void setClient(AmazonKinesisClient _client)
{
client = _client;
}
public String getStreamName()
{
return streamName;
}
public void setStreamName(String streamName)
{
this.streamName = streamName;
}
public int getBatchSize()
{
return batchSize;
}
public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}
public boolean isBatchProcessing()
{
return isBatchProcessing;
}
public void setBatchProcessing(boolean isBatchProcessing)
{
this.isBatchProcessing = isBatchProcessing;
}
public String getAccessKey()
{
return accessKey;
}
public void setAccessKey(String accessKey)
{
this.accessKey = accessKey;
}
public String getSecretKey()
{
return secretKey;
}
public void setSecretKey(String secretKey)
{
this.secretKey = secretKey;
}
public String getEndPoint()
{
return endPoint;
}
public void setEndPoint(String endPoint)
{
this.endPoint = endPoint;
}
}