blob: 3479ac761a3294bc3a9f7be5c4f22f10b80bcfb2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.mina.filter.reqres;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.WriteRequest;
import org.apache.mina.filter.util.WriteRequestFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TODO Add documentation
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
* @org.apache.xbean.XBean
*/
public class RequestResponseFilter extends WriteRequestFilter {
private final AttributeKey RESPONSE_INSPECTOR = new AttributeKey(getClass(), "responseInspector");
private final AttributeKey REQUEST_STORE = new AttributeKey(getClass(), "requestStore");
private final AttributeKey UNRESPONDED_REQUEST_STORE = new AttributeKey(getClass(), "unrespondedRequestStore");
private final ResponseInspectorFactory responseInspectorFactory;
private final ScheduledExecutorService timeoutScheduler;
private final static Logger LOGGER = LoggerFactory.getLogger(RequestResponseFilter.class);
public RequestResponseFilter(final ResponseInspector responseInspector, ScheduledExecutorService timeoutScheduler) {
if (responseInspector == null) {
throw new IllegalArgumentException("responseInspector");
}
if (timeoutScheduler == null) {
throw new IllegalArgumentException("timeoutScheduler");
}
this.responseInspectorFactory = new ResponseInspectorFactory() {
public ResponseInspector getResponseInspector() {
return responseInspector;
}
};
this.timeoutScheduler = timeoutScheduler;
}
public RequestResponseFilter(ResponseInspectorFactory responseInspectorFactory,
ScheduledExecutorService timeoutScheduler) {
if (responseInspectorFactory == null) {
throw new IllegalArgumentException("responseInspectorFactory");
}
if (timeoutScheduler == null) {
throw new IllegalArgumentException("timeoutScheduler");
}
this.responseInspectorFactory = responseInspectorFactory;
this.timeoutScheduler = timeoutScheduler;
}
@Override
public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
if (parent.contains(this)) {
throw new IllegalArgumentException(
"You can't add the same filter instance more than once. Create another instance and add it.");
}
IoSession session = parent.getSession();
session.setAttribute(RESPONSE_INSPECTOR, responseInspectorFactory.getResponseInspector());
session.setAttribute(REQUEST_STORE, createRequestStore(session));
session.setAttribute(UNRESPONDED_REQUEST_STORE, createUnrespondedRequestStore(session));
}
@Override
public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
IoSession session = parent.getSession();
destroyUnrespondedRequestStore(getUnrespondedRequestStore(session));
destroyRequestStore(getRequestStore(session));
session.removeAttribute(UNRESPONDED_REQUEST_STORE);
session.removeAttribute(REQUEST_STORE);
session.removeAttribute(RESPONSE_INSPECTOR);
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
ResponseInspector responseInspector = (ResponseInspector) session.getAttribute(RESPONSE_INSPECTOR);
Object requestId = responseInspector.getRequestId(message);
if (requestId == null) {
// Not a response message. Ignore.
nextFilter.messageReceived(session, message);
return;
}
// Retrieve (or remove) the corresponding request.
ResponseType type = responseInspector.getResponseType(message);
if (type == null) {
nextFilter.exceptionCaught(session, new IllegalStateException(responseInspector.getClass().getName()
+ "#getResponseType() may not return null."));
return;
}
Map<Object, Request> requestStore = getRequestStore(session);
Request request;
switch (type) {
case WHOLE:
case PARTIAL_LAST:
synchronized (requestStore) {
request = requestStore.remove(requestId);
}
break;
case PARTIAL:
synchronized (requestStore) {
request = requestStore.get(requestId);
}
break;
default:
throw new InternalError();
}
if (request == null) {
// A response message without request. Swallow the event because
// the response might have arrived too late.
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Unknown request ID '" + requestId + "' for the response message. Timed out already?: "
+ message);
}
} else {
// Found a matching request.
// Cancel the timeout task if needed.
if (type != ResponseType.PARTIAL) {
ScheduledFuture<?> scheduledFuture = request.getTimeoutFuture();
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
synchronized (unrespondedRequests) {
unrespondedRequests.remove(request);
}
}
}
// And forward the event.
Response response = new Response(request, message, type);
request.signal(response);
nextFilter.messageReceived(session, response);
}
}
@Override
protected Object doFilterWrite(final NextFilter nextFilter, IoSession session, WriteRequest writeRequest)
throws Exception {
Object message = writeRequest.getMessage();
if (!(message instanceof Request)) {
return null;
}
final Request request = (Request) message;
if (request.getTimeoutFuture() != null) {
throw new IllegalArgumentException("Request can not be reused.");
}
Map<Object, Request> requestStore = getRequestStore(session);
Object oldValue = null;
Object requestId = request.getId();
synchronized (requestStore) {
oldValue = requestStore.get(requestId);
if (oldValue == null) {
requestStore.put(requestId, request);
}
}
if (oldValue != null) {
throw new IllegalStateException("Duplicate request ID: " + request.getId());
}
// Schedule a task to be executed on timeout.
TimeoutTask timeoutTask = new TimeoutTask(nextFilter, request, session);
ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(timeoutTask, request.getTimeoutMillis(),
TimeUnit.MILLISECONDS);
request.setTimeoutTask(timeoutTask);
request.setTimeoutFuture(timeoutFuture);
// Add the timeout task to the unfinished task set.
Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
synchronized (unrespondedRequests) {
unrespondedRequests.add(request);
}
return request.getMessage();
}
@Override
public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
// Copy the unfinished task set to avoid unnecessary lock acquisition.
// Copying will be cheap because there won't be that many requests queued.
Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
List<Request> unrespondedRequestsCopy;
synchronized (unrespondedRequests) {
unrespondedRequestsCopy = new ArrayList<Request>(unrespondedRequests);
unrespondedRequests.clear();
}
// Generate timeout artificially.
for (Request r : unrespondedRequestsCopy) {
if (r.getTimeoutFuture().cancel(false)) {
r.getTimeoutTask().run();
}
}
// Clear the request store just in case we missed something, though it's unlikely.
Map<Object, Request> requestStore = getRequestStore(session);
synchronized (requestStore) {
requestStore.clear();
}
// Now tell the main subject.
nextFilter.sessionClosed(session);
}
@SuppressWarnings("unchecked")
private Map<Object, Request> getRequestStore(IoSession session) {
return (Map<Object, Request>) session.getAttribute(REQUEST_STORE);
}
@SuppressWarnings("unchecked")
private Set<Request> getUnrespondedRequestStore(IoSession session) {
return (Set<Request>) session.getAttribute(UNRESPONDED_REQUEST_STORE);
}
/**
* Returns a {@link Map} which stores {@code messageId}-{@link Request}
* pairs whose {@link Response}s are not received yet. Please override
* this method if you need to use other {@link Map} implementation
* than the default one ({@link HashMap}).
*/
protected Map<Object, Request> createRequestStore(IoSession session) {
return new ConcurrentHashMap<Object, Request>();
}
/**
* Returns a {@link Set} which stores {@link Request} whose
* {@link Response}s are not received yet. Please override
* this method if you need to use other {@link Set} implementation
* than the default one ({@link LinkedHashSet}). Please note that
* the {@link Iterator} of the returned {@link Set} have to iterate
* its elements in the insertion order to ensure that
* {@link RequestTimeoutException}s are thrown in the order which
* {@link Request}s were written. If you don't need to guarantee
* the order of thrown exceptions, any {@link Set} implementation
* can be used.
*/
protected Set<Request> createUnrespondedRequestStore(IoSession session) {
return new LinkedHashSet<Request>();
}
/**
* Releases any resources related with the {@link Map} created by
* {@link #createRequestStore(IoSession)}. This method is useful
* if you override {@link #createRequestStore(IoSession)}.
*
* @param requestStore what you returned in {@link #createRequestStore(IoSession)}
*/
protected void destroyRequestStore(Map<Object, Request> requestStore) {
// Do nothing
}
/**
* Releases any resources related with the {@link Set} created by
* {@link #createUnrespondedRequestStore(IoSession)}. This method is
* useful if you override {@link #createUnrespondedRequestStore(IoSession)}.
*
* @param unrespondedRequestStore what you returned in {@link #createUnrespondedRequestStore(IoSession)}
*/
protected void destroyUnrespondedRequestStore(Set<Request> unrespondedRequestStore) {
// Do nothing
}
private class TimeoutTask implements Runnable {
private final NextFilter filter;
private final Request request;
private final IoSession session;
private TimeoutTask(NextFilter filter, Request request, IoSession session) {
this.filter = filter;
this.request = request;
this.session = session;
}
public void run() {
Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
if (unrespondedRequests != null) {
synchronized (unrespondedRequests) {
unrespondedRequests.remove(request);
}
}
Map<Object, Request> requestStore = getRequestStore(session);
Object requestId = request.getId();
boolean timedOut;
synchronized (requestStore) {
if (requestStore.get(requestId) == request) {
requestStore.remove(requestId);
timedOut = true;
} else {
timedOut = false;
}
}
if (timedOut) {
// Throw the exception only when it's really timed out.
RequestTimeoutException e = new RequestTimeoutException(request);
request.signal(e);
filter.exceptionCaught(session, e);
}
}
}
}