blob: 5ea5419c66f20f52263c7003b1429820930b144d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.jaxrs.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationCallback;
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.jaxrs.utils.HttpUtils;
import org.apache.cxf.message.Message;
public class AsyncResponseImpl implements AsyncResponse, ContinuationCallback {
private Continuation cont;
private Message inMessage;
private TimeoutHandler timeoutHandler;
private volatile boolean initialSuspend;
private volatile boolean cancelled;
private volatile boolean done;
private volatile boolean resumedByApplication;
private volatile Long pendingTimeout;
private List<CompletionCallback> completionCallbacks = new LinkedList<CompletionCallback>();
private List<ConnectionCallback> connectionCallbacks = new LinkedList<ConnectionCallback>();
private Throwable unmappedThrowable;
public AsyncResponseImpl(Message inMessage) {
inMessage.put(AsyncResponse.class, this);
inMessage.getExchange().put(ContinuationCallback.class, this);
this.inMessage = inMessage;
initContinuation();
}
@Override
public boolean resume(Object response) {
return doResume(response);
}
@Override
public boolean resume(Throwable response) {
return doResume(response);
}
private boolean isCancelledOrNotSuspended() {
return isCancelled() || !isSuspended();
}
private boolean doResume(Object response) {
if (isCancelledOrNotSuspended()) {
return false;
}
return doResumeFinal(response);
}
private synchronized boolean doResumeFinal(Object response) {
inMessage.getExchange().put(AsyncResponse.class, this);
cont.setObject(response);
resumedByApplication = true;
if (!initialSuspend) {
cont.resume();
} else {
initialSuspend = false;
}
return true;
}
@Override
public boolean cancel() {
return doCancel(null);
}
@Override
public boolean cancel(int retryAfter) {
return doCancel(Integer.toString(retryAfter));
}
@Override
public boolean cancel(Date retryAfter) {
return doCancel(HttpUtils.getHttpDateFormat().format(retryAfter));
}
private boolean doCancel(String retryAfterHeader) {
if (cancelled) {
return true;
}
if (!isSuspended()) {
return false;
}
cancelled = true;
ResponseBuilder rb = Response.status(503);
if (retryAfterHeader != null) {
rb.header(HttpHeaders.RETRY_AFTER, retryAfterHeader);
}
doResumeFinal(rb.build());
return cancelled;
}
@Override
public boolean isSuspended() {
if (cancelled || resumedByApplication) {
return false;
}
return initialSuspend || cont.isPending();
}
@Override
public synchronized boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return done;
}
@Override
public synchronized boolean setTimeout(long time, TimeUnit unit) throws IllegalStateException {
if (isCancelledOrNotSuspended()) {
return false;
}
setAsyncResponseOnExchange();
long timeout = TimeUnit.MILLISECONDS.convert(time, unit);
initialSuspend = false;
if (!cont.isPending()) {
cont.suspend(timeout);
} else {
pendingTimeout = timeout;
cont.resume();
}
return true;
}
private void setAsyncResponseOnExchange() {
inMessage.getExchange().put(AsyncResponse.class, this);
}
@Override
public void setTimeoutHandler(TimeoutHandler handler) {
timeoutHandler = handler;
}
@Override
public Collection<Class<?>> register(Class<?> callback) throws NullPointerException {
return register(callback, new Class<?>[]{}).get(callback);
}
@Override
public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, Class<?>... callbacks)
throws NullPointerException {
try {
Object[] extraCallbacks = new Object[callbacks.length];
for (int i = 0; i < callbacks.length; i++) {
extraCallbacks[i] = callbacks[i].newInstance();
}
return register(callback.newInstance(), extraCallbacks);
} catch (NullPointerException e) {
throw e;
} catch (Throwable t) {
return Collections.emptyMap();
}
}
@Override
public Collection<Class<?>> register(Object callback) throws NullPointerException {
return register(callback, new Object[]{}).get(callback.getClass());
}
@Override
public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... callbacks)
throws NullPointerException {
Map<Class<?>, Collection<Class<?>>> map =
new HashMap<Class<?>, Collection<Class<?>>>();
Object[] allCallbacks = new Object[1 + callbacks.length];
allCallbacks[0] = callback;
System.arraycopy(callbacks, 0, allCallbacks, 1, callbacks.length);
for (int i = 0; i < allCallbacks.length; i++) {
if (allCallbacks[i] == null) {
throw new NullPointerException();
}
Class<?> callbackCls = allCallbacks[i].getClass();
Collection<Class<?>> knownCallbacks = map.get(callbackCls);
if (knownCallbacks == null) {
knownCallbacks = new HashSet<Class<?>>();
map.put(callbackCls, knownCallbacks);
}
if (allCallbacks[i] instanceof CompletionCallback) {
knownCallbacks.add(CompletionCallback.class);
completionCallbacks.add((CompletionCallback)allCallbacks[i]);
} else if (allCallbacks[i] instanceof ConnectionCallback) {
knownCallbacks.add(ConnectionCallback.class);
connectionCallbacks.add((ConnectionCallback)allCallbacks[i]);
}
}
return map;
}
@Override
public void onComplete() {
done = true;
updateCompletionCallbacks(unmappedThrowable);
}
@Override
public void onError(Throwable error) {
updateCompletionCallbacks(error);
}
private void updateCompletionCallbacks(Throwable error) {
Throwable actualError = error instanceof Fault ? ((Fault)error).getCause() : error;
for (CompletionCallback completionCallback : completionCallbacks) {
completionCallback.onComplete(actualError);
}
}
@Override
public void onDisconnect() {
for (ConnectionCallback connectionCallback : connectionCallbacks) {
connectionCallback.onDisconnect(this);
}
}
public synchronized boolean suspendContinuationIfNeeded() {
if (!resumedByApplication && !cont.isPending() && !cont.isResumed()) {
cont.suspend(AsyncResponse.NO_TIMEOUT);
initialSuspend = false;
return true;
} else {
return false;
}
}
@SuppressWarnings("resource") // Response that is built here shouldn't be closed here
public Object getResponseObject() {
Object obj = cont.getObject();
if (!(obj instanceof Response) && !(obj instanceof Throwable)) {
if (obj == null) {
obj = Response.noContent().build();
} else {
obj = Response.ok().entity(obj).build();
}
}
return obj;
}
public boolean isResumedByApplication() {
return resumedByApplication;
}
public synchronized void handleTimeout() {
if (!resumedByApplication) {
if (pendingTimeout != null) {
setAsyncResponseOnExchange();
cont.suspend(pendingTimeout);
pendingTimeout = null;
} else if (timeoutHandler != null) {
timeoutHandler.handleTimeout(this);
} else {
cont.setObject(new ServiceUnavailableException());
}
}
}
private void initContinuation() {
ContinuationProvider provider =
(ContinuationProvider)inMessage.get(ContinuationProvider.class.getName());
if (provider == null) {
throw new IllegalArgumentException(
"Continuation not supported. "
+ "Please ensure that all servlets and servlet filters support async operations");
}
cont = provider.getContinuation();
initialSuspend = true;
}
public void prepareContinuation() {
initContinuation();
}
public void setUnmappedThrowable(Throwable t) {
unmappedThrowable = t;
}
public void reset() {
cont.reset();
}
}