blob: eb72a5bf25a18b29bbd4e2ca744a283527262fc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.synapse.transport.passthru;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import org.apache.synapse.transport.passthru.connections.TargetConnections;
import org.apache.synapse.transport.passthru.util.TargetRequestFactory;
import java.io.OutputStream;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class acts as a gateway for differed delivery of the messages. When a message is to be
* delivered it is submitted to this class. If a connection is available to the target this
* class will try to deliver the message immediately over that connection. If a connection is
* not available it will queue the message and request a connection from the pool. When a new
* connection is available a queued message will be sent through it.
*/
public class DeliveryAgent {
private static final Log log = LogFactory.getLog(DeliveryAgent.class);
/**
* This Map holds the messages that need to be delivered. But at the moment maximum
* number of connections to the host:pair is being used. So these messages has to wait
* until a new connection is available.
*/
private Map<String, Queue<MessageContext>> waitingMessages =
new ConcurrentHashMap<String, Queue<MessageContext>>();
/** The connection management */
private TargetConnections targetConnections;
/** Configuration of the sender */
private TargetConfiguration targetConfiguration;
private TargetErrorHandler targetErrorHandler;
/** Lock for synchronizing access */
private Lock lock = new ReentrantLock();
/**
* Create a delivery agent with the target configuration and connection management.
*
* @param targetConfiguration configuration of the sender
* @param targetConnections connection management
*/
public DeliveryAgent(TargetConfiguration targetConfiguration,
TargetConnections targetConnections) {
this.targetConfiguration = targetConfiguration;
this.targetConnections = targetConnections;
this.targetErrorHandler = new TargetErrorHandler(targetConfiguration);
}
/**
* This method queues the message for delivery. If a connection is already existing for
* the destination epr, the message will be delivered immediately. Otherwise message has
* to wait until a connection is established. In this case this method will inform the
* system about the need for a connection.
*
* @param msgContext the message context to be sent
* @param host host name of epr
* @param port port of the of epr
* @throws AxisFault if an error occurs
*/
public void submit(MessageContext msgContext, String host, int port)
throws AxisFault {
String key = host + ":" + port;
// first we queue the message
Queue<MessageContext> queue = null;
lock.lock();
try {
queue = waitingMessages.get(key);
if (queue == null) {
queue = new ConcurrentLinkedQueue<MessageContext>();
waitingMessages.put(key, queue);
}
if (queue.size() == Integer.MAX_VALUE) {
MessageContext msgCtx = queue.poll();
targetErrorHandler.handleError(msgCtx,
ErrorCodes.CONNECTION_TIMEOUT,
"Error connecting to the back end",
null,
ProtocolState.REQUEST_READY);
}
queue.add(msgContext);
} finally {
lock.unlock();
}
NHttpClientConnection conn = targetConnections.getConnection(host, port);
if (conn != null) {
conn.resetInput();
conn.resetOutput();
MessageContext messageContext = queue.poll();
if (messageContext != null) {
tryNextMessage(messageContext, conn);
}
}
}
public void errorConnecting(String host, int port, int errorCode, String message) {
String key = host + ":" + port;
Queue<MessageContext> queue = waitingMessages.get(key);
if (queue != null) {
MessageContext msgCtx = queue.poll();
if (msgCtx != null) {
String errorMessage = "Error while connecting to the endpoint";
if (message != null) {
errorMessage += " (" + message + ")";
}
targetErrorHandler.handleError(msgCtx, errorCode, errorMessage,
null, ProtocolState.REQUEST_READY);
}
} else {
throw new IllegalStateException("Queue cannot be null for: " + key);
}
}
/**
* Notification for a connection availability. When this occurs a message in the
* queue for delivery will be tried.
*
* @param host name of the remote host
* @param port remote port number
*/
public void connected(String host, int port) {
Queue<MessageContext> queue = null;
lock.lock();
try {
queue = waitingMessages.get(host + ":" + port);
} finally {
lock.unlock();
}
while (queue.size() > 0) {
NHttpClientConnection conn = targetConnections.getConnection(host, port);
if (conn != null) {
MessageContext messageContext = queue.poll();
if (messageContext != null) {
tryNextMessage(messageContext, conn);
}
} else {
break;
}
}
}
private void tryNextMessage(MessageContext messageContext, NHttpClientConnection conn) {
if (conn != null) {
try {
TargetContext.get(conn).setRequestMsgCtx(messageContext);
submitRequest(conn, messageContext);
} catch (AxisFault e) {
log.error("IO error while sending the request out", e);
}
}
}
private void submitRequest(NHttpClientConnection conn, MessageContext msgContext) throws AxisFault {
if (log.isDebugEnabled()) {
log.debug("Submitting new request to the connection: " + conn);
}
TargetRequest request = TargetRequestFactory.create(msgContext, targetConfiguration);
TargetContext.setRequest(conn, request);
Pipe pipe = (Pipe) msgContext.getProperty(PassThroughConstants.PASS_THROUGH_PIPE);
if (pipe != null) {
pipe.attachConsumer(conn);
request.connect(pipe);
if (Boolean.TRUE.equals(msgContext.getProperty(
PassThroughConstants.MESSAGE_BUILDER_INVOKED))) {
synchronized (msgContext) {
OutputStream out = pipe.getOutputStream();
msgContext.setProperty("GET_ME_OUT", out);
msgContext.setProperty("READY2ROCK", Boolean.TRUE);
msgContext.notifyAll();
}
return;
}
}
conn.requestOutput();
}
}