blob: bac0816c9fe4896f89f784fa48b1ca8e8c2756bb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.jms;
import java.io.IOException;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
/**
* This is the base implementation of an JMS output operator. 
* A concrete operator should be created from this skeleton implementation.
* <p>
* This operator receives tuples from Malhar Streaming Platform through its input ports.
* When the tuple is available in input ports it converts that to JMS message and send into
* a message bus. The concrete class of this has to implement the abstract method
* how to convert tuple into JMS message.
* </p>
* Ports:<br>
* <b>Input</b>: Can have any number of input ports<br>
* <b>Output</b>: No output port<br>
* <br>
* </p>
* @displayName Abstract JMS Output
* @category Messaging
* @tags jms, output operator
*
* @since 0.3.2
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public abstract class AbstractJMSOutputOperator extends JMSBase implements Operator
{
private static final Logger logger = LoggerFactory.getLogger(AbstractJMSOutputOperator.class);
/**
* Use this field to getStore() tuples from which messages are created.
*/
private List<Object> tupleBatch = Lists.newArrayList();
/**
* Use this field to getStore() messages to be sent in batch.
*/
private List<Message> messageBatch = Lists.newArrayList();
private transient String appId;
private transient int operatorId;
private transient long committedWindowId;
/**
* The id of the current window.
* Note this is not transient to handle the case that the operator restarts from a checkpoint that is
* in the middle of the application window.
*/
private long currentWindowId;
private ProcessingMode mode;
private transient MessageProducer producer;
protected JMSBaseTransactionableStore store = new JMSTransactionableStore();
@Override
public void setup(OperatorContext context)
{
appId = context.getValue(DAG.APPLICATION_ID);
operatorId = context.getId();
logger.debug("Application Id {} operatorId {}", appId, operatorId);
store.setBase(this);
store.setAppId(appId);
store.setOperatorId(operatorId);
transacted = store.isTransactable();
try {
createConnection();
} catch (JMSException ex) {
logger.debug(ex.getLocalizedMessage());
throw new RuntimeException(ex);
}
logger.debug("Session is null {}:", getSession() == null);
try {
store.connect();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
logger.debug("Done connecting store.");
mode = context.getValue(OperatorContext.PROCESSING_MODE);
if (mode == ProcessingMode.AT_MOST_ONCE) {
//Batch must be cleared to avoid writing same data twice
tupleBatch.clear();
}
for (Object tempObject: this.tupleBatch) {
messageBatch.add(createMessage(tempObject));
}
committedWindowId = store.getCommittedWindowId(appId, operatorId);
logger.debug("committedWindowId {}", committedWindowId);
logger.debug("End of setup store in transaction: {}", store.isInTransaction());
}
@Override
public void teardown()
{
tupleBatch.clear();
messageBatch.clear();
logger.debug("beginning teardown");
try {
store.disconnect();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
cleanup();
logger.debug("ending teardown");
}
/**
* Implement Operator Interface.
*/
@Override
public void beginWindow(long windowId)
{
currentWindowId = windowId;
store.beginTransaction();
logger.debug("Transaction started for window {}", windowId);
}
@Override
public void endWindow()
{
logger.debug("Ending window {}", currentWindowId);
if (store.isExactlyOnce()) {
//Store committed window and data in same transaction
if (committedWindowId < currentWindowId) {
store.storeCommittedWindowId(appId, operatorId, currentWindowId);
committedWindowId = currentWindowId;
}
flushBatch();
store.commitTransaction();
} else {
//For transactionable stores which cannot support exactly once, At least
//once can be insured by for storing the data and then the committed window
//id.
flushBatch();
store.commitTransaction();
if (committedWindowId < currentWindowId) {
store.storeCommittedWindowId(appId, operatorId, currentWindowId);
committedWindowId = currentWindowId;
}
}
logger.debug("done ending window {}", currentWindowId);
}
/**
* This is a helper method which flushes all the batched data.
*/
protected void flushBatch()
{
logger.debug("flushing batch, batch size {}", tupleBatch.size());
for (Message message : messageBatch) {
try {
producer.send(message);
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
tupleBatch.clear();
messageBatch.clear();
logger.debug("done flushing batch");
}
/**
* This is a helper method which should be called to send a message.
* @param data The data which will be converted into a message.
*/
protected void sendMessage(Object data)
{
if (currentWindowId <= committedWindowId) {
return;
}
tupleBatch.add(data);
Message message = createMessage(data);
messageBatch.add(message);
if (tupleBatch.size() >= this.getBatch()) {
flushBatch();
}
}
public void setStore(JMSBaseTransactionableStore store)
{
this.store = store;
}
public JMSBaseTransactionableStore getStore()
{
return store;
}
/**
* Release resources.
*/
@Override
public void cleanup()
{
try {
producer.close();
producer = null;
super.cleanup();
} catch (JMSException ex) {
logger.error(null, ex);
}
}
/**
* Connection specific setup for JMS service.
*
* @throws JMSException
*/
@Override
public void createConnection() throws JMSException
{
super.createConnection();
// Create producer
producer = getSession().createProducer(getDestination());
}
/**
* Convert tuple into JMS message. Tuple can be any Java Object.
* @param tuple
* @return Message
*/
protected abstract Message createMessage(Object tuple);
}