| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.bookkeeper.proto; |
| |
| import java.io.IOException; |
| import java.nio.channels.ClosedChannelException; |
| import java.util.ArrayDeque; |
| import java.util.Queue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import com.google.protobuf.ByteString; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.BookKeeperClientStats; |
| import org.apache.bookkeeper.conf.ClientConfiguration; |
| import org.apache.bookkeeper.net.BookieSocketAddress; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.Response; |
| import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.stats.OpStatsLogger; |
| import org.apache.bookkeeper.util.MathUtils; |
| import org.apache.bookkeeper.util.OrderedSafeExecutor; |
| import org.apache.bookkeeper.util.SafeRunnable; |
| import org.jboss.netty.bootstrap.ClientBootstrap; |
| import org.jboss.netty.buffer.ChannelBuffer; |
| import org.jboss.netty.buffer.ChannelBuffers; |
| import org.jboss.netty.channel.Channel; |
| import org.jboss.netty.channel.ChannelFuture; |
| import org.jboss.netty.channel.ChannelFutureListener; |
| import org.jboss.netty.channel.ChannelHandlerContext; |
| import org.jboss.netty.channel.ChannelPipeline; |
| import org.jboss.netty.channel.ChannelPipelineFactory; |
| import org.jboss.netty.channel.ChannelStateEvent; |
| import org.jboss.netty.channel.Channels; |
| import org.jboss.netty.channel.ExceptionEvent; |
| import org.jboss.netty.channel.MessageEvent; |
| import org.jboss.netty.channel.SimpleChannelHandler; |
| import org.jboss.netty.channel.socket.ClientSocketChannelFactory; |
| import org.jboss.netty.handler.codec.frame.CorruptedFrameException; |
| import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; |
| import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; |
| import org.jboss.netty.handler.codec.frame.TooLongFrameException; |
| import org.jboss.netty.handler.timeout.ReadTimeoutHandler; |
| import org.jboss.netty.util.HashedWheelTimer; |
| import org.jboss.netty.util.Timeout; |
| import org.jboss.netty.util.TimerTask; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class manages all details of connection to a particular bookie. It also |
| * has reconnect logic if a connection to a bookie fails. |
| * |
| */ |
| public class PerChannelBookieClient extends SimpleChannelHandler implements ChannelPipelineFactory { |
| |
| static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class); |
| |
| public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M |
| public static final AtomicLong txnIdGenerator = new AtomicLong(0); |
| |
| final BookieSocketAddress addr; |
| final ClientSocketChannelFactory channelFactory; |
| final OrderedSafeExecutor executor; |
| final HashedWheelTimer requestTimer; |
| final int addEntryTimeout; |
| final int readEntryTimeout; |
| |
| private final ConcurrentHashMap<CompletionKey, CompletionValue> completionObjects = new ConcurrentHashMap<CompletionKey, CompletionValue>(); |
| |
| private final StatsLogger statsLogger; |
| private final OpStatsLogger readEntryOpLogger; |
| private final OpStatsLogger readTimeoutOpLogger; |
| private final OpStatsLogger addEntryOpLogger; |
| private final OpStatsLogger addTimeoutOpLogger; |
| |
| /** |
| * The following member variables do not need to be concurrent, or volatile |
| * because they are always updated under a lock |
| */ |
| private volatile Queue<GenericCallback<PerChannelBookieClient>> pendingOps = |
| new ArrayDeque<GenericCallback<PerChannelBookieClient>>(); |
| volatile Channel channel = null; |
| |
| enum ConnectionState { |
| DISCONNECTED, CONNECTING, CONNECTED, CLOSED |
| } |
| |
| volatile ConnectionState state; |
| final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock(); |
| private final ClientConfiguration conf; |
| |
| public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, |
| BookieSocketAddress addr) { |
| this(new ClientConfiguration(), executor, channelFactory, addr, null, NullStatsLogger.INSTANCE); |
| } |
| |
| public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, |
| ClientSocketChannelFactory channelFactory, BookieSocketAddress addr, |
| HashedWheelTimer requestTimer, StatsLogger parentStatsLogger) { |
| this.conf = conf; |
| this.addr = addr; |
| this.executor = executor; |
| this.channelFactory = channelFactory; |
| this.state = ConnectionState.DISCONNECTED; |
| this.requestTimer = requestTimer; |
| this.addEntryTimeout = conf.getAddEntryTimeout(); |
| this.readEntryTimeout = conf.getReadEntryTimeout(); |
| |
| StringBuilder nameBuilder = new StringBuilder(); |
| nameBuilder.append(addr.getHostname().replace('.', '_').replace('-', '_')) |
| .append("_").append(addr.getPort()); |
| |
| this.statsLogger = parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE) |
| .scope(nameBuilder.toString()); |
| |
| readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP); |
| addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP); |
| readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ); |
| addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD); |
| } |
| |
| private void completeOperation(GenericCallback<PerChannelBookieClient> op, int rc) { |
| closeLock.readLock().lock(); |
| try { |
| if (ConnectionState.CLOSED == state) { |
| op.operationComplete(BKException.Code.ClientClosedException, this); |
| } else { |
| op.operationComplete(rc, this); |
| } |
| } finally { |
| closeLock.readLock().unlock(); |
| } |
| } |
| |
| private void connect() { |
| LOG.debug("Connecting to bookie: {}", addr); |
| |
| // Set up the ClientBootStrap so we can create a new Channel connection |
| // to the bookie. |
| ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); |
| bootstrap.setPipelineFactory(this); |
| bootstrap.setOption("tcpNoDelay", conf.getClientTcpNoDelay()); |
| bootstrap.setOption("keepAlive", true); |
| bootstrap.setOption("connectTimeoutMillis", conf.getClientConnectTimeoutMillis()); |
| bootstrap.setOption("child.sendBufferSize", conf.getClientSendBufferSize()); |
| bootstrap.setOption("child.receiveBufferSize", conf.getClientReceiveBufferSize()); |
| bootstrap.setOption("writeBufferLowWaterMark", conf.getClientWriteBufferLowWaterMark()); |
| bootstrap.setOption("writeBufferHighWaterMark", conf.getClientWriteBufferHighWaterMark()); |
| |
| ChannelFuture future = bootstrap.connect(addr.getSocketAddress()); |
| future.addListener(new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture future) throws Exception { |
| LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.getChannel()); |
| int rc; |
| Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps; |
| |
| synchronized (PerChannelBookieClient.this) { |
| if (future.isSuccess() && state == ConnectionState.CONNECTING) { |
| LOG.info("Successfully connected to bookie: {}", future.getChannel()); |
| rc = BKException.Code.OK; |
| channel = future.getChannel(); |
| state = ConnectionState.CONNECTED; |
| } else if (future.isSuccess() && (state == ConnectionState.CLOSED |
| || state == ConnectionState.DISCONNECTED)) { |
| LOG.warn("Closed before connection completed, clean up: {}, current state {}", |
| future.getChannel(), state); |
| closeChannel(future.getChannel()); |
| rc = BKException.Code.BookieHandleNotAvailableException; |
| channel = null; |
| } else if (future.isSuccess() && state == ConnectionState.CONNECTED) { |
| LOG.debug("Already connected with another channel({}), so close the new channel({})", |
| channel, future.getChannel()); |
| closeChannel(future.getChannel()); |
| return; // pendingOps should have been completed when other channel connected |
| } else { |
| LOG.error("Could not connect to bookie: {}/{}, current state {} : ", |
| new Object[] { future.getChannel(), addr, |
| state, future.getCause() }); |
| rc = BKException.Code.BookieHandleNotAvailableException; |
| closeChannel(future.getChannel()); |
| channel = null; |
| if (state != ConnectionState.CLOSED) { |
| state = ConnectionState.DISCONNECTED; |
| } |
| } |
| |
| // trick to not do operations under the lock, take the list |
| // of pending ops and assign it to a new variable, while |
| // emptying the pending ops by just assigning it to a new |
| // list |
| oldPendingOps = pendingOps; |
| pendingOps = new ArrayDeque<GenericCallback<PerChannelBookieClient>>(); |
| } |
| |
| for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) { |
| completeOperation(pendingOp, rc); |
| } |
| } |
| }); |
| } |
| |
| void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) { |
| boolean completeOpNow = false; |
| int opRc = BKException.Code.OK; |
| // common case without lock first |
| if (channel != null && state == ConnectionState.CONNECTED) { |
| completeOpNow = true; |
| } else { |
| |
| synchronized (this) { |
| // check the channel status again under lock |
| if (channel != null && state == ConnectionState.CONNECTED) { |
| completeOpNow = true; |
| opRc = BKException.Code.OK; |
| } else if (state == ConnectionState.CLOSED) { |
| completeOpNow = true; |
| opRc = BKException.Code.BookieHandleNotAvailableException; |
| } else { |
| // channel is either null (first connection attempt), or the |
| // channel is disconnected. Connection attempt is still in |
| // progress, queue up this op. Op will be executed when |
| // connection attempt either fails or succeeds |
| pendingOps.add(op); |
| |
| if (state == ConnectionState.CONNECTING) { |
| // just return as connection request has already send |
| // and waiting for the response. |
| return; |
| } |
| // switch state to connecting and do connection attempt |
| state = ConnectionState.CONNECTING; |
| } |
| } |
| if (!completeOpNow) { |
| // Start connection attempt to the input server host. |
| connect(); |
| } |
| } |
| |
| if (completeOpNow) { |
| completeOperation(op, opRc); |
| } |
| |
| } |
| |
| /** |
| * This method should be called only after connection has been checked for |
| * {@link #connectIfNeededAndDoOp(GenericCallback)} |
| * |
| * @param ledgerId |
| * Ledger Id |
| * @param masterKey |
| * Master Key |
| * @param entryId |
| * Entry Id |
| * @param toSend |
| * Buffer to send |
| * @param cb |
| * Write callback |
| * @param ctx |
| * Write callback context |
| * @param options |
| * Add options |
| */ |
| void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb, |
| Object ctx, final int options) { |
| final long txnId = getTxnId(); |
| final int entrySize = toSend.readableBytes(); |
| final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY); |
| completionObjects.put(completionKey, |
| new AddCompletion(addEntryOpLogger, cb, ctx, ledgerId, entryId, |
| scheduleTimeout(completionKey, addEntryTimeout))); |
| |
| // Build the request and calculate the total size to be included in the packet. |
| BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() |
| .setVersion(ProtocolVersion.VERSION_THREE) |
| .setOperation(OperationType.ADD_ENTRY) |
| .setTxnId(txnId); |
| |
| AddRequest.Builder addBuilder = AddRequest.newBuilder() |
| .setLedgerId(ledgerId) |
| .setEntryId(entryId) |
| .setMasterKey(ByteString.copyFrom(masterKey)) |
| .setBody(ByteString.copyFrom(toSend.toByteBuffer())); |
| |
| if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) { |
| addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD); |
| } |
| |
| final Request addRequest = Request.newBuilder() |
| .setHeader(headerBuilder) |
| .setAddRequest(addBuilder) |
| .build(); |
| |
| final Channel c = channel; |
| if (c == null) { |
| errorOutAddKey(completionKey); |
| return; |
| } |
| try { |
| ChannelFuture future = c.write(addRequest); |
| future.addListener(new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture future) throws Exception { |
| if (future.isSuccess()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId |
| + " bookie: " + c.getRemoteAddress() + " entry length: " + entrySize); |
| } |
| // totalBytesOutstanding.addAndGet(entrySize); |
| } else { |
| if (!(future.getCause() instanceof ClosedChannelException)) { |
| LOG.warn("Writing addEntry(lid={}, eid={}) to channel {} failed : ", |
| new Object[] { ledgerId, entryId, c, future.getCause() }); |
| } |
| errorOutAddKey(completionKey); |
| } |
| } |
| }); |
| } catch (Throwable e) { |
| LOG.warn("Add entry operation failed", e); |
| errorOutAddKey(completionKey); |
| } |
| } |
| |
| public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey, |
| final long entryId, |
| ReadEntryCallback cb, Object ctx) { |
| final long txnId = getTxnId(); |
| final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); |
| completionObjects.put(completionKey, |
| new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, entryId, |
| scheduleTimeout(completionKey, readEntryTimeout))); |
| |
| // Build the request and calculate the total size to be included in the packet. |
| BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() |
| .setVersion(ProtocolVersion.VERSION_THREE) |
| .setOperation(OperationType.READ_ENTRY) |
| .setTxnId(txnId); |
| |
| ReadRequest.Builder readBuilder = ReadRequest.newBuilder() |
| .setLedgerId(ledgerId) |
| .setEntryId(entryId) |
| .setMasterKey(ByteString.copyFrom(masterKey)) |
| .setFlag(ReadRequest.Flag.FENCE_LEDGER); |
| |
| final Request readRequest = Request.newBuilder() |
| .setHeader(headerBuilder) |
| .setReadRequest(readBuilder) |
| .build(); |
| |
| final Channel c = channel; |
| if (c == null) { |
| errorOutReadKey(completionKey); |
| return; |
| } |
| |
| try { |
| ChannelFuture future = c.write(readRequest); |
| future.addListener(new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture future) throws Exception { |
| if (future.isSuccess()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Successfully wrote request {} to {}", |
| readRequest, c.getRemoteAddress()); |
| } |
| } else { |
| if (!(future.getCause() instanceof ClosedChannelException)) { |
| LOG.warn("Writing readEntryAndFenceLedger(lid={}, eid={}) to channel {} failed : ", |
| new Object[] { ledgerId, entryId, c, future.getCause() }); |
| } |
| errorOutReadKey(completionKey); |
| } |
| } |
| }); |
| } catch(Throwable e) { |
| LOG.warn("Read entry operation {} failed", completionKey, e); |
| errorOutReadKey(completionKey); |
| } |
| } |
| |
| public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) { |
| final long txnId = getTxnId(); |
| final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY); |
| completionObjects.put(completionKey, |
| new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, entryId, |
| scheduleTimeout(completionKey, readEntryTimeout))); |
| |
| // Build the request and calculate the total size to be included in the packet. |
| BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() |
| .setVersion(ProtocolVersion.VERSION_THREE) |
| .setOperation(OperationType.READ_ENTRY) |
| .setTxnId(txnId); |
| |
| ReadRequest.Builder readBuilder = ReadRequest.newBuilder() |
| .setLedgerId(ledgerId) |
| .setEntryId(entryId); |
| |
| final Request readRequest = Request.newBuilder() |
| .setHeader(headerBuilder) |
| .setReadRequest(readBuilder) |
| .build(); |
| |
| final Channel c = channel; |
| if (c == null) { |
| errorOutReadKey(completionKey); |
| return; |
| } |
| |
| try{ |
| ChannelFuture future = c.write(readRequest); |
| future.addListener(new ChannelFutureListener() { |
| @Override |
| public void operationComplete(ChannelFuture future) throws Exception { |
| if (future.isSuccess()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Successfully wrote request {} to {}", |
| readRequest, c.getRemoteAddress()); |
| } |
| } else { |
| if (!(future.getCause() instanceof ClosedChannelException)) { |
| LOG.warn("Writing readEntry(lid={}, eid={}) to channel {} failed : ", |
| new Object[] { ledgerId, entryId, c, future.getCause() }); |
| } |
| errorOutReadKey(completionKey); |
| } |
| } |
| }); |
| } catch(Throwable e) { |
| LOG.warn("Read entry operation {} failed", readRequest, e); |
| errorOutReadKey(completionKey); |
| } |
| } |
| |
| /** |
| * Disconnects the bookie client. It can be reused. |
| */ |
| public void disconnect() { |
| disconnect(true); |
| } |
| |
| public void disconnect(boolean wait) { |
| LOG.info("Disconnecting the per channel bookie client for {}", addr); |
| closeInternal(false, wait); |
| } |
| |
| /** |
| * Closes the bookie client permanently. It cannot be reused. |
| */ |
| public void close() { |
| close(true); |
| } |
| |
| public void close(boolean wait) { |
| LOG.info("Closing the per channel bookie client for {}", addr); |
| closeLock.writeLock().lock(); |
| try { |
| if (ConnectionState.CLOSED == state) { |
| return; |
| } |
| state = ConnectionState.CLOSED; |
| errorOutOutstandingEntries(BKException.Code.ClientClosedException); |
| } finally { |
| closeLock.writeLock().unlock(); |
| } |
| closeInternal(true, wait); |
| } |
| |
| private void closeInternal(boolean permanent, boolean wait) { |
| Channel toClose = null; |
| synchronized (this) { |
| if (permanent) { |
| state = ConnectionState.CLOSED; |
| } else if (state != ConnectionState.CLOSED) { |
| state = ConnectionState.DISCONNECTED; |
| } |
| toClose = channel; |
| channel = null; |
| } |
| if (toClose != null) { |
| ChannelFuture cf = closeChannel(toClose); |
| if (wait) { |
| cf.awaitUninterruptibly(); |
| } |
| } |
| } |
| |
| private ChannelFuture closeChannel(Channel c) { |
| LOG.debug("Closing channel {}", c); |
| ReadTimeoutHandler timeout = c.getPipeline().get(ReadTimeoutHandler.class); |
| if (timeout != null) { |
| timeout.releaseExternalResources(); |
| } |
| return c.close(); |
| } |
| |
| void errorOutReadKey(final CompletionKey key) { |
| errorOutReadKey(key, BKException.Code.BookieHandleNotAvailableException); |
| } |
| |
| void errorOutReadKey(final CompletionKey key, final int rc) { |
| final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key); |
| if (null == readCompletion) { |
| return; |
| } |
| executor.submitOrdered(readCompletion.ledgerId, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| String bAddress = "null"; |
| Channel c = channel; |
| if (c != null) { |
| bAddress = c.getRemoteAddress().toString(); |
| } |
| |
| LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {}", |
| new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress }); |
| |
| readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId, |
| null, readCompletion.ctx); |
| } |
| }); |
| } |
| |
| void errorOutAddKey(final CompletionKey key) { |
| errorOutAddKey(key, BKException.Code.BookieHandleNotAvailableException); |
| } |
| |
| void errorOutAddKey(final CompletionKey key, final int rc) { |
| final AddCompletion addCompletion = (AddCompletion)completionObjects.remove(key); |
| if (null == addCompletion) { |
| return; |
| } |
| executor.submitOrdered(addCompletion.ledgerId, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| String bAddress = "null"; |
| Channel c = channel; |
| if(c != null) { |
| bAddress = c.getRemoteAddress().toString(); |
| } |
| LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}", |
| new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress }); |
| |
| addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId, |
| addr, addCompletion.ctx); |
| LOG.debug("Invoked callback method: {}", addCompletion.entryId); |
| } |
| }); |
| } |
| |
| /** |
| * Errors out pending entries. We call this method from one thread to avoid |
| * concurrent executions to QuorumOpMonitor (implements callbacks). It seems |
| * simpler to call it from BookieHandle instead of calling directly from |
| * here. |
| */ |
| |
| void errorOutOutstandingEntries(int rc) { |
| |
| // DO NOT rewrite these using Map.Entry iterations. We want to iterate |
| // on keys and see if we are successfully able to remove the key from |
| // the map. Because the add and the read methods also do the same thing |
| // in case they get a write failure on the socket. The one who |
| // successfully removes the key from the map is the one responsible for |
| // calling the application callback. |
| for (CompletionKey key : completionObjects.keySet()) { |
| switch (key.operationType) { |
| case ADD_ENTRY: |
| errorOutAddKey(key, rc); |
| break; |
| case READ_ENTRY: |
| errorOutReadKey(key, rc); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| |
| /** |
| * In the netty pipeline, we need to split packets based on length, so we |
| * use the {@link LengthFieldBasedFrameDecoder}. Other than that all actions |
| * are carried out in this class, e.g., making sense of received messages, |
| * prepending the length to outgoing packets etc. |
| */ |
| @Override |
| public ChannelPipeline getPipeline() throws Exception { |
| ChannelPipeline pipeline = Channels.pipeline(); |
| |
| pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4)); |
| pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); |
| pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder()); |
| pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder()); |
| pipeline.addLast("mainhandler", this); |
| return pipeline; |
| } |
| |
| /** |
| * If our channel has disconnected, we just error out the pending entries |
| */ |
| @Override |
| public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { |
| Channel c = ctx.getChannel(); |
| LOG.info("Disconnected from bookie channel {}", c); |
| if (c != null) { |
| closeChannel(c); |
| } |
| |
| errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException); |
| |
| synchronized (this) { |
| if (this.channel == c |
| && state != ConnectionState.CLOSED) { |
| state = ConnectionState.DISCONNECTED; |
| } |
| } |
| |
| // we don't want to reconnect right away. If someone sends a request to |
| // this address, we will reconnect. |
| } |
| |
| /** |
| * Called by netty when an exception happens in one of the netty threads |
| * (mostly due to what we do in the netty threads) |
| */ |
| @Override |
| public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { |
| Throwable t = e.getCause(); |
| if (t instanceof CorruptedFrameException || t instanceof TooLongFrameException) { |
| LOG.error("Corrupted frame received from bookie: {}", |
| e.getChannel().getRemoteAddress()); |
| return; |
| } |
| |
| if (t instanceof IOException) { |
| // these are thrown when a bookie fails, logging them just pollutes |
| // the logs (the failure is logged from the listeners on the write |
| // operation), so I'll just ignore it here. |
| return; |
| } |
| |
| synchronized (this) { |
| if (state == ConnectionState.CLOSED) { |
| LOG.debug("Unexpected exception caught by bookie client channel handler, " |
| + "but the client is closed, so it isn't important", t); |
| } else { |
| LOG.error("Unexpected exception caught by bookie client channel handler", t); |
| } |
| } |
| // Since we are a library, cant terminate App here, can we? |
| } |
| |
| /** |
| * Called by netty when a message is received on a channel |
| */ |
| @Override |
| public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { |
| if (!(e.getMessage() instanceof Response)) { |
| ctx.sendUpstream(e); |
| return; |
| } |
| |
| final Response response = (Response) e.getMessage(); |
| final BKPacketHeader header = response.getHeader(); |
| |
| final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(), |
| header.getOperation())); |
| |
| if (null == completionValue) { |
| // Unexpected response, so log it. The txnId should have been present. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + header. getOperation() + |
| " and txnId : " + header.getTxnId()); |
| } |
| |
| } else { |
| long orderingKey = completionValue.ledgerId; |
| executor.submitOrdered(orderingKey, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| OperationType type = header.getOperation(); |
| switch (type) { |
| case ADD_ENTRY: |
| handleAddResponse(response.getAddResponse(), completionValue); |
| break; |
| case READ_ENTRY: |
| handleReadResponse(response.getReadResponse(), completionValue); |
| break; |
| default: |
| LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring", |
| type, addr); |
| break; |
| } |
| } |
| }); |
| } |
| } |
| |
| void handleAddResponse(AddResponse response, CompletionValue completionValue) { |
| // The completion value should always be an instance of an AddCompletion object when we reach here. |
| AddCompletion ac = (AddCompletion)completionValue; |
| |
| long ledgerId = response.getLedgerId(); |
| long entryId = response.getEntryId(); |
| StatusCode status = response.getStatus(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: " |
| + entryId + " rc: " + status); |
| } |
| // convert to BKException code because thats what the uppper |
| // layers expect. This is UGLY, there should just be one set of |
| // error codes. |
| Integer rcToRet = statusCodeToExceptionCode(status); |
| if (null == rcToRet) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr |
| + " with code:" + status); |
| } |
| rcToRet = BKException.Code.WriteException; |
| } |
| ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx); |
| } |
| |
| void handleReadResponse(ReadResponse response, CompletionValue completionValue) { |
| // The completion value should always be an instance of a ReadCompletion object when we reach here. |
| ReadCompletion rc = (ReadCompletion)completionValue; |
| |
| long ledgerId = response.getLedgerId(); |
| long entryId = response.getEntryId(); |
| StatusCode status = response.getStatus(); |
| ChannelBuffer buffer = ChannelBuffers.buffer(0); |
| |
| if (response.hasBody()) { |
| buffer = ChannelBuffers.copiedBuffer(response.getBody().asReadOnlyByteBuffer()); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: " |
| + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes()); |
| } |
| |
| // convert to BKException code because thats what the uppper |
| // layers expect. This is UGLY, there should just be one set of |
| // error codes. |
| Integer rcToRet = statusCodeToExceptionCode(status); |
| if (null == rcToRet) { |
| LOG.error("Read entry for ledger:{}, entry:{} failed on bookie:{} with code:{}", |
| new Object[] { ledgerId, entryId, addr, status }); |
| rcToRet = BKException.Code.ReadException; |
| } |
| rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx); |
| } |
| |
| /** |
| * Boiler-plate wrapper classes follow |
| * |
| */ |
| |
| // visible for testing |
| static abstract class CompletionValue { |
| final Object ctx; |
| protected final long ledgerId; |
| protected final long entryId; |
| protected final Timeout timeout; |
| |
| public CompletionValue(Object ctx, long ledgerId, long entryId, |
| Timeout timeout) { |
| this.ctx = ctx; |
| this.ledgerId = ledgerId; |
| this.entryId = entryId; |
| this.timeout = timeout; |
| } |
| |
| void cancelTimeout() { |
| if (null != timeout) { |
| timeout.cancel(); |
| } |
| } |
| } |
| |
| // visible for testing |
| static class ReadCompletion extends CompletionValue { |
| final ReadEntryCallback cb; |
| |
| public ReadCompletion(ReadEntryCallback cb, Object ctx, |
| long ledgerId, long entryId) { |
| this(null, cb, ctx, ledgerId, entryId, null); |
| } |
| |
| public ReadCompletion(final OpStatsLogger readEntryOpLogger, |
| final ReadEntryCallback originalCallback, |
| final Object originalCtx, final long ledgerId, final long entryId, |
| final Timeout timeout) { |
| super(originalCtx, ledgerId, entryId, timeout); |
| final long requestTimeMillis = MathUtils.now(); |
| this.cb = null == readEntryOpLogger ? originalCallback : new ReadEntryCallback() { |
| @Override |
| public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) { |
| cancelTimeout(); |
| long latencyMillis = MathUtils.now() - requestTimeMillis; |
| if (rc != BKException.Code.OK) { |
| readEntryOpLogger.registerFailedEvent(latencyMillis); |
| } else { |
| readEntryOpLogger.registerSuccessfulEvent(latencyMillis); |
| } |
| originalCallback.readEntryComplete(rc, ledgerId, entryId, buffer, originalCtx); |
| } |
| }; |
| } |
| } |
| |
| // visible for testing |
| static class AddCompletion extends CompletionValue { |
| final WriteCallback cb; |
| |
| public AddCompletion(WriteCallback cb, Object ctx, |
| long ledgerId, long entryId) { |
| this(null, cb, ctx, ledgerId, entryId, null); |
| } |
| |
| public AddCompletion(final OpStatsLogger addEntryOpLogger, |
| final WriteCallback originalCallback, |
| final Object originalCtx, final long ledgerId, final long entryId, |
| final Timeout timeout) { |
| super(originalCtx, ledgerId, entryId, timeout); |
| final long requestTimeMillis = MathUtils.now(); |
| this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() { |
| @Override |
| public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { |
| cancelTimeout(); |
| long latencyMillis = MathUtils.now() - requestTimeMillis; |
| if (rc != BKException.Code.OK) { |
| addEntryOpLogger.registerFailedEvent(latencyMillis); |
| } else { |
| addEntryOpLogger.registerSuccessfulEvent(latencyMillis); |
| } |
| originalCallback.writeComplete(rc, ledgerId, entryId, addr, originalCtx); |
| } |
| }; |
| } |
| } |
| |
| // visable for testing |
| CompletionKey newCompletionKey(long txnId, OperationType operationType) { |
| return new CompletionKey(txnId, operationType); |
| } |
| |
| Timeout scheduleTimeout(CompletionKey key, long timeout) { |
| if (null != requestTimer) { |
| return requestTimer.newTimeout(key, timeout, TimeUnit.SECONDS); |
| } else { |
| return null; |
| } |
| } |
| |
| class CompletionKey implements TimerTask { |
| final long txnId; |
| final OperationType operationType; |
| final long requestAt; |
| |
| CompletionKey(long txnId, OperationType operationType) { |
| this.txnId = txnId; |
| this.operationType = operationType; |
| this.requestAt = MathUtils.nowInNano(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (!(obj instanceof CompletionKey)) { |
| return false; |
| } |
| CompletionKey that = (CompletionKey) obj; |
| return this.txnId == that.txnId && this.operationType == that.operationType; |
| } |
| |
| @Override |
| public int hashCode() { |
| return ((int) txnId); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("TxnId(%d), OperationType(%s)", txnId, operationType); |
| } |
| |
| private long elapsedTime() { |
| return MathUtils.elapsedMSec(requestAt); |
| } |
| |
| @Override |
| public void run(Timeout timeout) throws Exception { |
| if (timeout.isCancelled()) { |
| return; |
| } |
| if (OperationType.ADD_ENTRY == operationType) { |
| errorOutAddKey(this); |
| addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime()); |
| } else { |
| errorOutReadKey(this); |
| readTimeoutOpLogger.registerSuccessfulEvent(elapsedTime()); |
| } |
| } |
| } |
| |
| /** |
| * Note : Helper functions follow |
| */ |
| |
| /** |
| * @param status |
| * @return null if the statuscode is unknown. |
| */ |
| private Integer statusCodeToExceptionCode(StatusCode status) { |
| Integer rcToRet = null; |
| switch (status) { |
| case EOK: |
| rcToRet = BKException.Code.OK; |
| break; |
| case ENOENTRY: |
| rcToRet = BKException.Code.NoSuchEntryException; |
| break; |
| case ENOLEDGER: |
| rcToRet = BKException.Code.NoSuchLedgerExistsException; |
| break; |
| case EBADVERSION: |
| rcToRet = BKException.Code.ProtocolVersionException; |
| break; |
| case EUA: |
| rcToRet = BKException.Code.UnauthorizedAccessException; |
| break; |
| case EFENCED: |
| rcToRet = BKException.Code.LedgerFencedException; |
| break; |
| default: |
| break; |
| } |
| return rcToRet; |
| } |
| |
| private long getTxnId() { |
| return txnIdGenerator.incrementAndGet(); |
| } |
| |
| } |