blob: f5295696440b895bc55e9e8d2318e76646a90d05 [file] [log] [blame]
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activeio.adapter;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.Channel;
import org.apache.activeio.ChannelFactory;
import org.apache.activeio.packet.EOSPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.packet.async.AsyncChannel;
import org.apache.activeio.packet.async.AsyncChannelListener;
import org.apache.activeio.packet.sync.SyncChannel;
import java.io.IOException;
/**
* Adapts a {@see org.apache.activeio.SynchChannel} so that it provides an
* {@see org.apache.activeio.AsyncChannel} interface. When this channel
* is started, a background thread is used to poll the {@see org.apache.activeio.SynchChannel}
* for packets comming up the channel which are then delivered to the
* {@see org.apache.activeio.ChannelConsumer}.
*
* @version $Revision$
*/
public class SyncToAsyncChannel implements AsyncChannel, Runnable {
private final AtomicBoolean running = new AtomicBoolean(false);
private final SyncChannel syncChannel;
private final Executor executor;
private AsyncChannelListener channelListener;
private CountDownLatch doneCountDownLatch;
static public AsyncChannel adapt(Channel channel) {
return adapt(channel, ChannelFactory.DEFAULT_EXECUTOR);
}
static public AsyncChannel adapt(Channel channel, Executor executor) {
// It might not need adapting
if( channel instanceof AsyncChannel ) {
return (AsyncChannel) channel;
}
// Can we just just undo the adaptor
if( channel.getClass() == SyncToAsyncChannel.class ) {
return ((AsyncToSyncChannel)channel).getAsyncChannel();
}
return new SyncToAsyncChannel((SyncChannel) channel, executor);
}
/**
* @deprecated {@see #adapt(SynchChannel)}
*/
public SyncToAsyncChannel(SyncChannel syncChannel) {
this(syncChannel, ChannelFactory.DEFAULT_EXECUTOR);
}
/**
* @deprecated {@see #adapt(SynchChannel, Executor)}
*/
public SyncToAsyncChannel(SyncChannel syncChannel, Executor executor) {
this.syncChannel = syncChannel;
this.executor = executor;
}
synchronized public void start() throws IOException {
if (running.compareAndSet(false, true)) {
if (channelListener == null)
throw new IllegalStateException("UpPacketListener must be set before object can be started.");
syncChannel.start();
doneCountDownLatch = new CountDownLatch(1);
executor.execute(this);
}
}
synchronized public void stop() throws IOException {
if (running.compareAndSet(true, false)) {
try {
doneCountDownLatch.await(5, TimeUnit.SECONDS);
} catch (Throwable e) {
}
syncChannel.stop();
}
}
/**
* reads packets from a Socket
*/
public void run() {
// Change the thread name.
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName( syncChannel.toString() );
try {
while (running.get()) {
try {
Packet packet = syncChannel.read(500);
if( packet==null )
continue;
if( packet == EOSPacket.EOS_PACKET ) {
channelListener.onPacket(packet);
return;
}
if( packet.hasRemaining() ) {
channelListener.onPacket(packet);
}
} catch (IOException e) {
channelListener.onPacketError(e);
} catch (Throwable e) {
channelListener.onPacketError((IOException)new IOException("Unexpected Error: "+e).initCause(e));
}
}
} finally {
if( doneCountDownLatch!=null )
doneCountDownLatch.countDown();
Thread.currentThread().setName(oldName);
}
}
/**
* @see org.apache.activeio.packet.async.AsyncChannel#setAsyncChannelListener(org.apache.activeio.UpPacketListener)
*/
public void setAsyncChannelListener(AsyncChannelListener channelListener) {
if (running.get())
throw new IllegalStateException("Cannot change the UpPacketListener while the object is running.");
this.channelListener = channelListener;
}
/**
* @see org.apache.activeio.Channel#write(org.apache.activeio.packet.Packet)
*/
public void write(org.apache.activeio.packet.Packet packet) throws IOException {
syncChannel.write(packet);
}
/**
* @see org.apache.activeio.Channel#flush()
*/
public void flush() throws IOException {
syncChannel.flush();
}
/**
* @see org.apache.activeio.Disposable#dispose()
*/
public void dispose() {
try {
stop();
} catch ( IOException ignore) {
}
syncChannel.dispose();
}
public AsyncChannelListener getAsyncChannelListener() {
return channelListener;
}
public Object getAdapter(Class target) {
if( target.isAssignableFrom(getClass()) ) {
return this;
}
return syncChannel.getAdapter(target);
}
public SyncChannel getSynchChannel() {
return syncChannel;
}
public String toString() {
return syncChannel.toString();
}
}