blob: ec292a7cdb7b016f33cecc4e77951fdac736ccb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.synapse.message.processors.forward;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.axis2.Axis2BlockingClient;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.endpoints.AbstractEndpoint;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.message.store.MessageStore;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class MessageForwardingProcessorView implements MessageForwardingProcessorViewMBean {
private static final Log log = LogFactory.getLog(MessageForwardingProcessorView.class);
private MessageStore messageStore;
private Axis2BlockingClient sender;
private ScheduledMessageForwardingProcessor processor;
public MessageForwardingProcessorView(MessageStore messageStore, Axis2BlockingClient sender,
ScheduledMessageForwardingProcessor processor) {
if (messageStore != null) {
this.messageStore = messageStore;
} else {
throw new SynapseException("Cannot create Message Forwarding Processor " +
"view with null message store");
}
if (sender != null) {
this.sender = sender;
} else {
throw new SynapseException("Cannot create Message Forwarding Processor " +
"view with null message sender");
}
if (processor != null) {
this.processor = processor;
} else {
throw new SynapseException("Cannot create Message Forwarding Processor " +
"view with null message processor");
}
}
public void resendAll() throws Exception {
if (!processor.isActive()) {
while (messageStore.peek() != null) {
sendMessage(messageStore.peek() , true);
}
} else {
throw new Exception("Message Processor is Active, Manual operations are " +
"not supported!");
}
}
public void deleteAll() throws Exception {
if (!processor.isActive()) {
messageStore.clear();
} else {
throw new Exception("Message Processor is Active, Manual operations are " +
"not supported!");
}
}
public List<String> messageIdList() throws Exception {
if (!processor.isActive()) {
int size = messageStore.size();
List<String> idList = new ArrayList<String>();
for (int i = 0; i < size; i++) {
MessageContext context = messageStore.get(i);
if (context != null) {
idList.add(context.getMessageID());
} else {
break;
}
}
return idList;
} else {
throw new Exception("Message Processor is Active, Manual operations are " +
"not supported!");
}
}
public void resend(String messageID) throws Exception {
if (!processor.isActive()) {
if (messageID != null && !"".equals(messageID.trim())) {
MessageContext msgCtx = messageStore.get(messageID);
if (msgCtx != null) {
sendMessage(msgCtx ,false);
messageStore.remove(messageID);
}
}
} else {
throw new Exception("Message Processor is Active, Manual operations are " +
"not supported!");
}
}
public void delete(String messageID) throws Exception {
if (!processor.isActive()) {
if (messageID != null && !"".equals(messageID.trim())) {
messageStore.remove(messageID);
}
} else {
throw new Exception("Message Processor is Active, Manual operations are " +
"not supported!");
}
}
public String getEnvelope(String messageID) throws Exception {
if (!processor.isActive()) {
if (messageID != null && !"".equals(messageID.trim())) {
MessageContext msgCtx = messageStore.get(messageID);
if (msgCtx != null) {
SOAPEnvelope env =
((Axis2MessageContext) msgCtx).getAxis2MessageContext().getEnvelope();
if(env != null) {
return env.toString();
}
}
}
} else {
throw new Exception("Message Processor is Active, Manual operations are " +
"not supported!");
}
return null;
}
public int getSize() {
return messageStore.size();
}
public boolean isActive() {
assert processor != null;
return processor.isActive();
}
public void activate() {
assert processor != null;
processor.resetSentAttemptCount();
processor.activate();
}
public void deactivate() {
assert processor != null;
processor.deactivate();
}
private void sendMessage(MessageContext messageContext, boolean delete) throws Exception {
if (messageContext != null) {
Set proSet = messageContext.getPropertyKeySet();
if (proSet != null && proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
}
String targetEp = (String) messageContext.getProperty(
ForwardingProcessorConstants.TARGET_ENDPOINT);
if (targetEp != null) {
Endpoint ep = messageContext.getEndpoint(targetEp);
if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
try {
sender.send(ep, messageContext);
// If no Exception Occurred We remove the Message
if (delete) {
messageStore.poll();
}
} catch (Exception e) {
log.error("Error Forwarding Message ", e);
throw new Exception(e);
}
} else {
String logMsg;
if (ep == null) {
logMsg = "Endpoint named " + targetEp + "not found. Hence removing " +
"the message form store";
} else {
logMsg = "Unsupported endpoint type. Only address/wsdl/default endpoint " +
"types supported";
}
log.warn(logMsg);
if (delete) {
messageStore.poll();
}
throw new Exception(logMsg);
}
} else {
//No Target Endpoint defined for the Message
//So we do not have a place to deliver.
//Here we log a warning and remove the message
//todo: we can improve this by implementing a target inferring mechanism
String logMsg = "Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
" not found in the message context , Hence removing the message ";
log.warn(logMsg);
if (delete) {
messageStore.poll();
}
throw new Exception(logMsg);
}
} else {
throw new Exception("MessageContext must not be null");
}
}
}