blob: 996bfd48e8b44ac0ca4a8de978cf9622f594600d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.jms;
import java.io.IOException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.Stateless;
/**
* This transactionable store commits the messages sent within a window along with the windowId of the completed window
* to JMS. WindowIds will be sent to a subject specific meta-data queue with the name of the form '{subject}.metadata'.
* It is the responsibility of the user to create the meta-data queue in the JMS provider.
* A MessageSelector and an unique 'appOperatorId' message property ensure each operator receives its own windowId.
* This store ensures that the JMS output operator is capable of outputting data to JMS exactly once.
*
* @since 2.0.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class JMSTransactionableStore extends JMSBaseTransactionableStore
{
private static final Logger logger = LoggerFactory.getLogger(JMSTransactionableStore.class);
private transient MessageProducer producer;
private transient MessageConsumer consumer;
private String metaQueueName;
private static final String APP_OPERATOR_ID = "appOperatorId";
/**
* Indicates whether the store is connected or not.
*/
private transient boolean connected = false;
/**
* Indicates whether the store is in a transaction or not.
*/
private transient boolean inTransaction = false;
public JMSTransactionableStore()
{
}
/**
* Get the meta queue name for this store
*
* @return the metaQueueName
*/
public String getMetaQueueName()
{
return metaQueueName;
}
/**
* Set the meta queue name for this store
*
* @param metaQueueName the metaQueueName to set
*/
public void setMetaQueueName(String metaQueueName)
{
this.metaQueueName = metaQueueName;
}
@Override
@SuppressWarnings("rawtypes")
public long getCommittedWindowId(String appId, int operatorId)
{
logger.debug("Getting committed windowId appId {} operatorId {}", appId, operatorId);
try {
beginTransaction();
BytesMessage message = (BytesMessage)consumer.receive();
logger.debug("Retrieved committed window messageId: {}, messageAppOperatorIdProp: {}", message.getJMSMessageID(),
message.getStringProperty(APP_OPERATOR_ID));
long windowId = message.readLong();
writeWindowId(appId, operatorId, windowId);
commitTransaction();
logger.debug("metaQueueName: " + metaQueueName);
logger.debug("Retrieved windowId {}", windowId);
return windowId;
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@Override
public void storeCommittedWindowId(String appId, int operatorId, long windowId)
{
if (!inTransaction) {
throw new RuntimeException("This should be called while you are in an existing transaction");
}
logger.debug("storing window appId {} operatorId {} windowId {}",
appId, operatorId, windowId);
try {
removeCommittedWindowId(appId, operatorId);
writeWindowId(appId, operatorId, windowId);
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@Override
public void removeCommittedWindowId(String appId, int operatorId)
{
try {
consumer.receive();
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
private void writeWindowId(String appId, int operatorId, long windowId) throws JMSException
{
BytesMessage message = getBase().getSession().createBytesMessage();
message.setStringProperty(APP_OPERATOR_ID, appId + "_" + operatorId);
message.writeLong(windowId);
producer.send(message);
logger.debug("Message with windowId {} sent", windowId);
}
@Override
public void beginTransaction()
{
logger.debug("beginning transaction");
if (inTransaction) {
throw new RuntimeException("Cannot start a transaction twice.");
}
inTransaction = true;
}
@Override
public void commitTransaction()
{
logger.debug("committing transaction.");
if (!inTransaction) {
throw new RuntimeException("Cannot commit a transaction if you are not in one.");
}
try {
getBase().getSession().commit();
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
inTransaction = false;
logger.debug("finished committing transaction.");
}
@Override
public void rollbackTransaction()
{
try {
getBase().getSession().rollback();
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
@Override
public boolean isInTransaction()
{
return inTransaction;
}
@Override
public void connect() throws IOException
{
logger.debug("Entering connect. is in transaction: {}", inTransaction);
try {
logger.debug("Base is null: {}", getBase() == null);
if (getBase() != null) {
logger.debug("Session is null: {}", getBase().getSession() == null);
}
if (metaQueueName == null) {
metaQueueName = getBase().getSubject() + ".metadata";
}
String appOperatorId = getAppId() + "_" + getOperatorId();
Queue queue = getBase().getSession().createQueue(metaQueueName);
QueueBrowser browser = getBase().getSession().createBrowser(queue, APP_OPERATOR_ID + " = '" + appOperatorId + "'");
boolean hasStore;
try {
Enumeration enumeration = browser.getEnumeration();
hasStore = enumeration.hasMoreElements();
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
producer = getBase().getSession().createProducer(queue);
consumer = getBase().getSession().createConsumer(queue, APP_OPERATOR_ID + " = '" + appOperatorId + "'");
connected = true;
logger.debug("Connected. is in transaction: {}", inTransaction);
if (!hasStore) {
beginTransaction();
writeWindowId(getAppId(), getOperatorId(), Stateless.WINDOW_ID);
commitTransaction();
}
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
logger.debug("Exiting connect. is in transaction: {}", inTransaction);
}
@Override
public void disconnect() throws IOException
{
logger.debug("disconnectiong");
try {
producer.close();
consumer.close();
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
inTransaction = false;
connected = false;
logger.debug("done disconnectiong");
}
@Override
public boolean isConnected()
{
return connected;
}
}