blob: f53992a9891ba58dff11ac67d6d01bc99fd01e4b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. The ASF licenses this file to You
* under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. For additional information regarding
* copyright in this work, please see the NOTICE file in the top level
* directory of this distribution.
*/
package org.apache.abdera2.common.protocol.servlet.async;
import java.io.IOException;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.abdera2.common.pusher.ChannelManager;
import org.apache.abdera2.common.pusher.Listener;
import org.apache.abdera2.common.pusher.Receiver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@WebServlet(asyncSupported=true)
@SuppressWarnings({ "rawtypes", "unchecked" })
public abstract class AbderaChannelServlet extends HttpServlet {
private final static Log log = LogFactory.getLog(AbderaChannelServlet.class);
private static final long serialVersionUID = 3751815744618869423L;
protected abstract String getChannel(AsyncContext context);
protected abstract AsyncListener<?> createListener(AsyncContext context);
protected abstract long getTimeout(ServletConfig config, ServletContext context);
protected void doGet(
final HttpServletRequest request,
final HttpServletResponse response)
throws ServletException, IOException {
final ServletContext sc = getServletContext();
final ChannelManager cm = (ChannelManager) sc.getAttribute(AbderaAsyncService.CM);
if (cm == null || !cm.isShutdown()) {
final AsyncContext context = request.startAsync(request, response);
context.setTimeout(getTimeout(getServletConfig(),sc));
context.start(
new Runnable() {
public void run() {
String channel = getChannel(context);
log.debug(String.format("Selected Channel Name: %s",channel));
if (channel != null) {
final Receiver receiver = cm.getReceiver(channel);
log.debug(String.format("Selected Receiver: %s",receiver));
if (receiver != null) {
final Listener listener = createListener(context);
context.addListener(
new javax.servlet.AsyncListener() {
public void onComplete(AsyncEvent event) throws IOException {
System.out.println("on complete");
try {
receiver.stopListening(listener);
} catch (Throwable t) {}
}
public void onError(AsyncEvent event) throws IOException {
System.out.println("on error");
event.getThrowable().printStackTrace();
try {
receiver.stopListening(listener);
} catch (Throwable t) {}
}
public void onStartAsync(AsyncEvent event)
throws IOException {
}
public void onTimeout(AsyncEvent event) throws IOException {
System.out.println("on timeout");
try {
receiver.stopListening(listener);
} catch (Throwable t) {}
}
}
);
log.debug(String.format("Listener: %s",listener));
if (listener != null) {
request.setAttribute("AbderaChannel", channel);
request.setAttribute("AbderaReceiver", receiver);
request.setAttribute("AbderaListener", listener);
receiver.startListening(listener);
}
}
}
}
}
);
} else {
response.sendError(
HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Abdera Service in unavailable");
}
}
public abstract static class AsyncListener<T> implements Listener<T> {
private final AsyncContext context;
private boolean done = false;
protected AsyncListener(AsyncContext context) {
this.context = context;
}
protected HttpServletRequest getRequest() {
return (HttpServletRequest) context.getRequest();
}
protected HttpServletResponse getResponse() {
return (HttpServletResponse) context.getResponse();
}
protected boolean isDone() {
return done;
}
public void afterItems() {
if (!done) {
try {
finish();
getResponse().flushBuffer();
} catch (Throwable t) {
// whoops, must have lost the connection before the request completed.
}
context.complete();
done = true;
}
}
protected void finish() {
// by default do nothing
}
public void beforeItems() {
// by default do nothing
}
}
}