blob: 3c20dfd48fa2377b61ed24d9f101ced7b9f1f256 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.services.messaging.adapters;
import javax.jms.JMSException;
import javax.jms.Message;
import flex.messaging.log.Log;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* A <code>MessageReceiver</code> that receives messages from JMS using
* synchronous <code>javax.jms.MessageConsumer.receive</code> call.
*/
class SyncMessageReceiver implements MessageReceiver {
private ScheduledExecutorService messageReceiverService;
private boolean isScheduled = false;
private JMSConsumer jmsConsumer;
private int syncMaxReceiveThreads;
private long syncReceiveIntervalMillis;
private long syncReceiveWaitMillis;
/**
* Constructs a new <code>SyncMessageReceiver</code> with default delivery settings.
*
* @param jmsConsumer JMSConsumer associated with the SynMessageReceiver.
*/
public SyncMessageReceiver(JMSConsumer jmsConsumer) {
this.jmsConsumer = jmsConsumer;
syncReceiveIntervalMillis = JMSConfigConstants.defaultSyncReceiveIntervalMillis;
syncReceiveWaitMillis = JMSConfigConstants.defaultSyncReceiveWaitMillis;
syncMaxReceiveThreads = 1; // Always use one thread.
}
/**
* Returns the interval of the sync receive message call.
*
* @return The interval of the sync receive message call.
*/
public long getSyncReceiveIntervalMillis() {
return syncReceiveIntervalMillis;
}
/**
* Sets the interval of the receive message call. This property
* is optional and defaults to 100.
*
* @param syncReceiveIntervalMillis A positive long that indicates
* the interval of the receive message call.
*/
public void setSyncReceiveIntervalMillis(long syncReceiveIntervalMillis) {
if (syncReceiveIntervalMillis < 1)
syncReceiveIntervalMillis = JMSConfigConstants.defaultSyncReceiveIntervalMillis;
this.syncReceiveIntervalMillis = syncReceiveIntervalMillis;
}
/**
* Returns how long a JMS proxy waits for a message before returning.
*
* @return How long a JMS proxy waits for a message before returning.
*/
public long getSyncReceiveWaitMillis() {
return syncReceiveWaitMillis;
}
/**
* Sets how long a JMS proxy waits for a message before returning.
* This property is optional and defaults to zero (no wait).
*
* @param syncReceiveWaitMillis A non-negative value that indicates how
* long a JMS proxy waits for a message before returning. Zero means no
* wait, negative one means wait until a message arrives.
*/
public void setSyncReceiveWaitMillis(long syncReceiveWaitMillis) {
if (syncReceiveWaitMillis < -1)
syncReceiveWaitMillis = JMSConfigConstants.defaultSyncReceiveWaitMillis;
this.syncReceiveWaitMillis = syncReceiveWaitMillis;
}
/**
* Implements MessageReceiver.startReceive.
*/
public void startReceive() {
if (!isScheduled) {
if (Log.isDebug())
Log.getLogger(JMSAdapter.LOG_CATEGORY).debug(Thread.currentThread()
+ " JMS consumer sync receive thread for JMS destination '"
+ jmsConsumer.destinationJndiName + "' is starting to poll the JMS server for new messages.");
ThreadFactory mrtf = new MessageReceiveThreadFactory();
messageReceiverService = Executors.newScheduledThreadPool(syncMaxReceiveThreads, mrtf);
messageReceiverService.scheduleAtFixedRate(new MessageReceiveThread(), syncReceiveIntervalMillis, syncReceiveIntervalMillis, TimeUnit.MILLISECONDS);
isScheduled = true;
}
}
/**
* Implements MessageReceivers.stopReceive.
*/
public void stopReceive() {
if (messageReceiverService != null)
messageReceiverService.shutdown();
}
/**
* Used internally to receive messages as determined by syncReceiveWaitMillis.
*/
private Message receiveMessage() throws JMSException {
if (syncReceiveWaitMillis == -1)
return jmsConsumer.receive();
else if (syncReceiveWaitMillis == 0)
return jmsConsumer.receiveNoWait();
else if (syncReceiveWaitMillis > 0)
return jmsConsumer.receive(syncReceiveWaitMillis);
return null;
}
/**
* Thread Factory used to create message receive threads.
*/
class MessageReceiveThreadFactory implements ThreadFactory {
/**
* Used to uniquely identify each new message receive thread created.
*/
private int receiveThreadCount;
/**
* Creates a new message receive thread.
* Synchronized to uniquely identify each receive thread at construct time safely.
*
* @param r The runnable to assign to the new thread.
*/
public synchronized Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("MessageReceiveThread" + "-" + receiveThreadCount++);
t.setDaemon(true);
if (Log.isDebug())
Log.getLogger(JMSAdapter.LOG_CATEGORY).debug("Created message receive thread: " + t.getName());
return t;
}
}
/**
* Message receive threads that perform sync javax.jms.MessageConsumer.receive
* calls.
*/
class MessageReceiveThread implements Runnable {
public void run() {
try {
while (true) {
Message message = receiveMessage();
if (message == null) break;
jmsConsumer.onMessage(message);
}
} catch (JMSException jmsEx) {
jmsConsumer.onException(jmsEx);
}
}
}
}