| /** |
| * |
| * Copyright 2005-2006 The Apache Software Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activeio.packet.async.vmpipe; |
| |
| import java.io.IOException; |
| import java.lang.reflect.InvocationHandler; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Proxy; |
| import java.net.URI; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.apache.activeio.packet.Packet; |
| import org.apache.activeio.packet.async.AsyncChannel; |
| import org.apache.activeio.packet.async.AsyncChannelFactory; |
| import org.apache.activeio.packet.async.AsyncChannelListener; |
| import org.apache.activeio.packet.async.AsyncChannelServer; |
| |
| /** |
| * |
| * @version $Revision$ |
| */ |
| final public class VMPipeAsyncChannelFactory implements AsyncChannelFactory { |
| |
| // |
| // We do all this crazy stuff of looking the server map using System |
| // properties |
| // because this class could be loaded multiple times in different |
| // classloaders. |
| // |
| private static final String SERVER_MAP_LOCATION = VMPipeAsyncChannelFactory.class.getName() + ".SERVER_MAP"; |
| |
| private static final Map SERVER_MAP; |
| static { |
| Map m = null; |
| m = (Map) System.getProperties().get(SERVER_MAP_LOCATION); |
| if (m == null) { |
| m = Collections.synchronizedMap(new HashMap()); |
| System.getProperties().put(SERVER_MAP_LOCATION, m); |
| } |
| SERVER_MAP = m; |
| } |
| |
| private final static ClassLoader MY_CLASSLOADER = Packet.class.getClassLoader(); |
| |
| |
| /** |
| * Used to marshal calls to a PipeChannel in a different classloader. |
| */ |
| static public class ClassloaderAsyncChannelAdapter implements AsyncChannel { |
| |
| private final ClassLoader cl; |
| private final Object channel; |
| private final Method writeMethod; |
| private final Method setListenerMethod; |
| private final Class listenerClazz; |
| private final Class packetClazz; |
| private final Object listenerProxy; |
| private final Method duplicateMethod; |
| private final Method startMethod; |
| private final Method stopMethod; |
| private final Method disposeMethod; |
| |
| private AsyncChannelListener channelListener; |
| |
| public class ListenerProxyHandler implements InvocationHandler { |
| public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { |
| switch (method.getName().length()) { |
| case 8: // onPacket |
| Object packet = duplicateMethod.invoke(args[0], new Object[]{MY_CLASSLOADER}); |
| channelListener.onPacket((Packet) packet); |
| break; |
| case 13: // onPacketError |
| channelListener.onPacketError((IOException) args[0]); |
| break; |
| default: |
| channelListener.onPacketError(new IOException("Unknown proxy method invocation: "+method.getName())); |
| } |
| return null; |
| } |
| } |
| |
| public ClassloaderAsyncChannelAdapter(Object channel) throws SecurityException, NoSuchMethodException, |
| ClassNotFoundException { |
| this.channel = channel; |
| Class clazz = channel.getClass(); |
| cl = clazz.getClassLoader(); |
| |
| listenerClazz = cl.loadClass(AsyncChannelListener.class.getName()); |
| packetClazz = cl.loadClass(Packet.class.getName()); |
| writeMethod = clazz.getMethod("write", new Class[] { packetClazz }); |
| startMethod = clazz.getMethod("start", new Class[] { }); |
| stopMethod = clazz.getMethod("stop", new Class[] {}); |
| disposeMethod = clazz.getMethod("dispose", new Class[] { }); |
| |
| setListenerMethod = clazz.getMethod("setAsyncChannelListener", new Class[] { listenerClazz }); |
| duplicateMethod = packetClazz.getMethod("duplicate", new Class[] { ClassLoader.class }); |
| |
| ListenerProxyHandler handler = new ListenerProxyHandler(); |
| listenerProxy = Proxy.newProxyInstance(cl, new Class[] { listenerClazz }, handler); |
| } |
| |
| public void write(Packet packet) throws IOException { |
| callIOExceptionMethod(writeMethod, new Object[] { packet.duplicate(cl) }); |
| } |
| |
| public void setAsyncChannelListener(AsyncChannelListener channelListener) { |
| this.channelListener = channelListener; |
| callMethod(setListenerMethod, new Object[] { channelListener == null ? null : listenerProxy }); |
| } |
| |
| public AsyncChannelListener getAsyncChannelListener() { |
| return channelListener; |
| } |
| |
| public void dispose() { |
| callMethod(disposeMethod, new Object[] { }); |
| } |
| |
| public void start() throws IOException { |
| callIOExceptionMethod(startMethod, new Object[] {}); |
| } |
| |
| public void stop() throws IOException { |
| callIOExceptionMethod(stopMethod, new Object[] {}); |
| } |
| |
| private void callMethod(Method method, Object[] args) { |
| try { |
| method.invoke(channel, args); |
| } catch (InvocationTargetException e) { |
| if (e.getTargetException() instanceof RuntimeException) { |
| throw (RuntimeException) e.getTargetException(); |
| } |
| throw new RuntimeException(e.getTargetException()); |
| } catch (Throwable e) { |
| throw new RuntimeException("Reflexive invocation failed: " + e, e); |
| } |
| } |
| |
| private void callIOExceptionMethod(Method method, Object[] args) throws IOException { |
| try { |
| method.invoke(channel, args); |
| } catch (InvocationTargetException e) { |
| if (e.getTargetException() instanceof IOException) { |
| throw (IOException) e.getTargetException(); |
| } |
| if (e.getTargetException() instanceof RuntimeException) { |
| throw (RuntimeException) e.getTargetException(); |
| } |
| throw new RuntimeException(e.getTargetException()); |
| } catch (Throwable e) { |
| throw (IOException) new IOException("Reflexive invocation failed: " + e).initCause(e); |
| } |
| } |
| |
| // |
| // The following methods do not need to delegate since they |
| // are implemented as noops in the PipeChannel |
| // |
| public Object getAdapter(Class target) { |
| if (target.isAssignableFrom(getClass())) { |
| return this; |
| } |
| return null; |
| } |
| |
| public void flush() throws IOException { |
| } |
| |
| } |
| |
| private boolean forceRefelection; |
| |
| public AsyncChannel openAsyncChannel(URI location) throws IOException { |
| |
| Object server = lookupServer(location); |
| if (!forceRefelection && server.getClass() == VMPipeAsyncChannelServer.class) { |
| return ((VMPipeAsyncChannelServer) server).connect(); |
| } |
| |
| // Asume server is in a different classloader. |
| // Use reflection to connect. |
| try { |
| Method method = server.getClass().getMethod("connect", new Class[] {}); |
| Object channel = method.invoke(server, new Object[] {}); |
| return new ClassloaderAsyncChannelAdapter(channel); |
| } catch (Throwable e) { |
| throw (IOException) new IOException("Connection could not be established: " + e).initCause(e); |
| } |
| } |
| |
| public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException { |
| VMPipeAsyncChannelServer server = new VMPipeAsyncChannelServer(bindURI); |
| bindServer(bindURI, server); |
| return server; |
| } |
| |
| private static Map getServerMap() { |
| return SERVER_MAP; |
| } |
| |
| static public String getServerKeyForURI(URI location) { |
| return location.getHost(); |
| } |
| |
| public static void bindServer(URI bindURI, VMPipeAsyncChannelServer server) throws IOException { |
| String key = getServerKeyForURI(bindURI); |
| if (getServerMap().get(key) != null) |
| throw new IOException("Server is allready bound at: " + bindURI); |
| getServerMap().put(key, server); |
| } |
| |
| public static Object lookupServer(URI location) throws IOException { |
| String key = getServerKeyForURI(location); |
| Object server = getServerMap().get(key); |
| if (server == null) { |
| throw new IOException("Connection refused."); |
| } |
| return server; |
| } |
| |
| public static void unbindServer(URI bindURI) { |
| String key = getServerKeyForURI(bindURI); |
| getServerMap().remove(key); |
| } |
| |
| public boolean isForceRefelection() { |
| return forceRefelection; |
| } |
| |
| public void setForceRefelection(boolean forceRefelection) { |
| this.forceRefelection = forceRefelection; |
| } |
| |
| } |