blob: ac4febaa1b66e810761fe31a77ed437bf7d7dda1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Protocol;
import org.apache.avro.ipc.NettyTransportCodec.NettyDataPack;
import org.apache.avro.ipc.NettyTransportCodec.NettyFrameDecoder;
import org.apache.avro.ipc.NettyTransportCodec.NettyFrameEncoder;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Netty-based {@link Transceiver} implementation.
*/
public class NettyTransceiver extends Transceiver {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
.getName());
private ChannelFactory channelFactory;
private Channel channel;
private AtomicInteger serialGenerator = new AtomicInteger(0);
private Map<Integer, CallFuture> requests =
new ConcurrentHashMap<Integer, CallFuture>();
private Protocol remote;
NettyTransceiver() {}
public NettyTransceiver(InetSocketAddress addr) {
this(addr, new NioClientSocketChannelFactory(Executors.
newCachedThreadPool(), Executors.newCachedThreadPool()));
}
public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) {
// Set up.
this.channelFactory = channelFactory;
ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
// Configure the event pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = Channels.pipeline();
p.addLast("frameDecoder", new NettyFrameDecoder());
p.addLast("frameEncoder", new NettyFrameEncoder());
p.addLast("handler", new NettyClientAvroHandler());
return p;
}
});
bootstrap.setOption("tcpNoDelay", true);
// Make a new connection.
ChannelFuture channelFuture = bootstrap.connect(addr);
channelFuture.awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
channelFuture.getCause().printStackTrace();
throw new RuntimeException(channelFuture.getCause());
}
channel = channelFuture.getChannel();
}
public void close() {
// Close the connection.
channel.close().awaitUninterruptibly();
// Shut down all thread pools to exit.
channelFactory.releaseExternalResources();
}
@Override
public String getRemoteName() {
return channel.getRemoteAddress().toString();
}
/**
* Override as non-synchronized method because the method is thread safe.
*/
@Override
public List<ByteBuffer> transceive(List<ByteBuffer> request)
throws IOException {
int serial = serialGenerator.incrementAndGet();
NettyDataPack dataPack = new NettyDataPack(serial, request);
CallFuture callFuture = new CallFuture();
requests.put(serial, callFuture);
channel.write(dataPack);
try {
return callFuture.get();
} catch (InterruptedException e) {
LOG.warn("failed to get the response", e);
return null;
} catch (ExecutionException e) {
LOG.warn("failed to get the response", e);
return null;
} finally {
requests.remove(serial);
}
}
@Override
public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public List<ByteBuffer> readBuffers() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Protocol getRemote() {
return remote;
}
@Override
public boolean isConnected() {
return remote!=null;
}
@Override
public void setRemote(Protocol protocol) {
this.remote = protocol;
}
/**
* Future class for a RPC call
*/
class CallFuture implements Future<List<ByteBuffer>>{
private Semaphore sem = new Semaphore(0);
private List<ByteBuffer> response = null;
public void setResponse(List<ByteBuffer> response) {
this.response = response;
sem.release();
}
public void releaseSemphore() {
sem.release();
}
public List<ByteBuffer> getResponse() {
return response;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public List<ByteBuffer> get() throws InterruptedException,
ExecutionException {
sem.acquire();
return response;
}
@Override
public List<ByteBuffer> get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (sem.tryAcquire(timeout, unit)) {
return response;
} else {
throw new TimeoutException();
}
}
@Override
public boolean isDone() {
return sem.availablePermits()>0;
}
}
/**
* Avro client handler for the Netty transport
*/
class NettyClientAvroHandler extends SimpleChannelUpstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
LOG.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// channel = e.getChannel();
super.channelOpen(ctx, e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
NettyDataPack dataPack = (NettyDataPack)e.getMessage();
CallFuture callFuture = requests.get(dataPack.getSerial());
if (callFuture==null) {
throw new RuntimeException("Missing previous call info");
}
callFuture.setResponse(dataPack.getDatas());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
LOG.warn("Unexpected exception from downstream.", e.getCause());
e.getChannel().close();
// let the blocking waiting exit
Iterator<CallFuture> it = requests.values().iterator();
while(it.hasNext()) {
it.next().releaseSemphore();
it.remove();
}
}
}
}