blob: 89863a3e0216f8f52b3f63d1991b47e970c75534 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.proto;
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.UnsafeByteOperations;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiPredicate;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
import lombok.SneakyThrows;
import org.apache.bookkeeper.auth.BookKeeperPrincipal;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.util.MdcUtils;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.StartTLSCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.GetListOfEntriesOfLedgerResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadLacResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType;
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
import org.apache.bookkeeper.util.collections.SynchronizedHashMultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
/**
* This class manages all details of connection to a particular bookie. It also
* has reconnect logic if a connection to a bookie fails.
*/
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_SCOPE,
help = "Per channel bookie client stats"
)
@Sharable
public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
// this set contains the bookie error return codes that we do not consider for a bookie to be "faulty"
private static final Set<Integer> expectedBkOperationErrors = Collections.unmodifiableSet(Sets
.newHashSet(BKException.Code.BookieHandleNotAvailableException,
BKException.Code.NoSuchEntryException,
BKException.Code.NoSuchLedgerExistsException,
BKException.Code.LedgerFencedException,
BKException.Code.LedgerExistException,
BKException.Code.DuplicateEntryIdException,
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
final BookieId bookieId;
final BookieAddressResolver bookieAddressResolver;
final EventLoopGroup eventLoopGroup;
final ByteBufAllocator allocator;
final OrderedExecutor executor;
final long addEntryTimeoutNanos;
final long readEntryTimeoutNanos;
final int maxFrameSize;
final long getBookieInfoTimeoutNanos;
final int startTLSTimeout;
private final ConcurrentOpenHashMap<CompletionKey, CompletionValue> completionObjects =
new ConcurrentOpenHashMap<CompletionKey, CompletionValue>();
// Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
// read request for the same ledgerId/entryId
private final SynchronizedHashMultiMap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
new SynchronizedHashMultiMap<>();
private final StatsLogger statsLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_READ_OP,
help = "channel stats of read entries requests"
)
private final OpStatsLogger readEntryOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_READ,
help = "timeout stats of read entries requests"
)
private final OpStatsLogger readTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_ADD_OP,
help = "channel stats of add entries requests"
)
private final OpStatsLogger addEntryOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_WRITE_LAC_OP,
help = "channel stats of write_lac requests"
)
private final OpStatsLogger writeLacOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_FORCE_OP,
help = "channel stats of force requests"
)
private final OpStatsLogger forceLedgerOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_READ_LAC_OP,
help = "channel stats of read_lac requests"
)
private final OpStatsLogger readLacOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_ADD,
help = "timeout stats of add entries requests"
)
private final OpStatsLogger addTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC,
help = "timeout stats of write_lac requests"
)
private final OpStatsLogger writeLacTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE,
help = "timeout stats of force requests"
)
private final OpStatsLogger forceLedgerTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC,
help = "timeout stats of read_lac requests"
)
private final OpStatsLogger readLacTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.GET_BOOKIE_INFO_OP,
help = "channel stats of get_bookie_info requests"
)
private final OpStatsLogger getBookieInfoOpLogger;
@StatsDoc(
name = BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO,
help = "timeout stats of get_bookie_info requests"
)
private final OpStatsLogger getBookieInfoTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_START_TLS_OP,
help = "channel stats of start_tls requests"
)
private final OpStatsLogger startTLSOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP,
help = "timeout stats of start_tls requests"
)
private final OpStatsLogger startTLSTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.CLIENT_CONNECT_TIMER,
help = "channel stats of connect requests"
)
private final OpStatsLogger connectTimer;
private final OpStatsLogger getListOfEntriesOfLedgerCompletionOpLogger;
private final OpStatsLogger getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
@StatsDoc(
name = BookKeeperClientStats.NETTY_EXCEPTION_CNT,
help = "the number of exceptions received from this channel"
)
private final Counter exceptionCounter;
@StatsDoc(
name = BookKeeperClientStats.ADD_OP_OUTSTANDING,
help = "the number of outstanding add_entry requests"
)
private final Counter addEntryOutstanding;
@StatsDoc(
name = BookKeeperClientStats.READ_OP_OUTSTANDING,
help = "the number of outstanding add_entry requests"
)
private final Counter readEntryOutstanding;
/* collect stats on all Ops that flows through netty pipeline */
@StatsDoc(
name = BookKeeperClientStats.NETTY_OPS,
help = "channel stats for all operations flowing through netty pipeline"
)
private final OpStatsLogger nettyOpLogger;
@StatsDoc(
name = BookKeeperClientStats.ACTIVE_NON_TLS_CHANNEL_COUNTER,
help = "the number of active non-tls channels"
)
private final Counter activeNonTlsChannelCounter;
@StatsDoc(
name = BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER,
help = "the number of active tls channels"
)
private final Counter activeTlsChannelCounter;
@StatsDoc(
name = BookKeeperClientStats.FAILED_CONNECTION_COUNTER,
help = "the number of failed connections"
)
private final Counter failedConnectionCounter;
@StatsDoc(
name = BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER,
help = "the number of failed tls handshakes"
)
private final Counter failedTlsHandshakeCounter;
private final boolean useV2WireProtocol;
private final boolean preserveMdcForTaskExecution;
/**
* The following member variables do not need to be concurrent, or volatile
* because they are always updated under a lock.
*/
private volatile Queue<GenericCallback<PerChannelBookieClient>> pendingOps =
new ArrayDeque<GenericCallback<PerChannelBookieClient>>();
volatile Channel channel = null;
private final ClientConnectionPeer connectionPeer;
private volatile BookKeeperPrincipal authorizedId = BookKeeperPrincipal.ANONYMOUS;
@SneakyThrows
private FailedChannelFutureImpl processBookieNotResolvedError(long startTime,
BookieAddressResolver.BookieIdNotResolvedException err) {
FailedChannelFutureImpl failedFuture = new FailedChannelFutureImpl(err);
contextPreservingListener(new ConnectionFutureListener(startTime)).operationComplete(failedFuture);
return failedFuture;
}
enum ConnectionState {
DISCONNECTED, CONNECTING, CONNECTED, CLOSED, START_TLS
}
volatile ConnectionState state;
final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
private final ClientConfiguration conf;
private final PerChannelBookieClientPool pcbcPool;
private final ClientAuthProvider.Factory authProviderFactory;
private final ExtensionRegistry extRegistry;
private final SecurityHandlerFactory shFactory;
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException {
this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null, null,
null, bookieAddressResolver);
}
public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieId bookieId,
ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry, BookieAddressResolver bookieAddressResolver)
throws SecurityException {
this(new ClientConfiguration(), executor, eventLoopGroup, bookieId,
NullStatsLogger.INSTANCE,
authProviderFactory, extRegistry, null, bookieAddressResolver);
}
public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
EventLoopGroup eventLoopGroup, BookieId bookieId,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool, BookieAddressResolver bookieAddressResolver)
throws SecurityException {
this(conf, executor, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, bookieId, NullStatsLogger.INSTANCE,
authProviderFactory, extRegistry, pcbcPool, null, bookieAddressResolver);
}
public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
EventLoopGroup eventLoopGroup,
ByteBufAllocator allocator,
BookieId bookieId,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool,
SecurityHandlerFactory shFactory,
BookieAddressResolver bookieAddressResolver) throws SecurityException {
this.maxFrameSize = conf.getNettyMaxFrameSizeBytes();
this.conf = conf;
this.bookieId = bookieId;
this.bookieAddressResolver = bookieAddressResolver;
this.executor = executor;
if (LocalBookiesRegistry.isLocalBookie(bookieId)) {
this.eventLoopGroup = new DefaultEventLoopGroup();
} else {
this.eventLoopGroup = eventLoopGroup;
}
this.allocator = allocator;
this.state = ConnectionState.DISCONNECTED;
this.addEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
this.readEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
this.getBookieInfoTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getBookieInfoTimeout());
this.startTLSTimeout = conf.getStartTLSTimeout();
this.useV2WireProtocol = conf.getUseV2WireProtocol();
this.preserveMdcForTaskExecution = conf.getPreserveMdcForTaskExecution();
this.authProviderFactory = authProviderFactory;
this.extRegistry = extRegistry;
this.shFactory = shFactory;
if (shFactory != null) {
shFactory.init(NodeType.Client, conf, allocator);
}
this.statsLogger = parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE)
.scope(buildStatsLoggerScopeName(bookieId));
readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP);
addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP);
writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP);
forceLedgerOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_FORCE_OP);
readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP);
getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP);
getListOfEntriesOfLedgerCompletionOpLogger = statsLogger
.getOpStatsLogger(BookKeeperClientStats.GET_LIST_OF_ENTRIES_OF_LEDGER_OP);
readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ);
addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD);
writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC);
forceLedgerTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE);
readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC);
getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
getListOfEntriesOfLedgerCompletionTimeoutOpLogger = statsLogger
.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER);
exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
readEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.READ_OP_OUTSTANDING);
nettyOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.NETTY_OPS);
activeNonTlsChannelCounter = statsLogger.getCounter(BookKeeperClientStats.ACTIVE_NON_TLS_CHANNEL_COUNTER);
activeTlsChannelCounter = statsLogger.getCounter(BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER);
failedConnectionCounter = statsLogger.getCounter(BookKeeperClientStats.FAILED_CONNECTION_COUNTER);
failedTlsHandshakeCounter = statsLogger.getCounter(BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER);
this.pcbcPool = pcbcPool;
this.connectionPeer = new ClientConnectionPeer() {
@Override
public SocketAddress getRemoteAddr() {
Channel c = channel;
if (c != null) {
return c.remoteAddress();
} else {
return null;
}
}
@Override
public Collection<Object> getProtocolPrincipals() {
Channel c = channel;
if (c == null) {
return Collections.emptyList();
}
SslHandler ssl = c.pipeline().get(SslHandler.class);
if (ssl == null) {
return Collections.emptyList();
}
try {
Certificate[] certificates = ssl.engine().getSession().getPeerCertificates();
if (certificates == null) {
return Collections.emptyList();
}
List<Object> result = new ArrayList<>();
result.addAll(Arrays.asList(certificates));
return result;
} catch (SSLPeerUnverifiedException err) {
return Collections.emptyList();
}
}
@Override
public void disconnect() {
Channel c = channel;
if (c != null) {
c.close().addListener(x -> makeWritable());
}
LOG.info("authplugin disconnected channel {}", channel);
}
@Override
public void setAuthorizedId(BookKeeperPrincipal principal) {
authorizedId = principal;
LOG.info("connection {} authenticated as {}", channel, principal);
}
@Override
public BookKeeperPrincipal getAuthorizedId() {
return authorizedId;
}
@Override
public boolean isSecure() {
Channel c = channel;
if (c == null) {
return false;
} else {
return c.pipeline().get(SslHandler.class) != null;
}
}
};
}
public static String buildStatsLoggerScopeName(BookieId addr) {
StringBuilder nameBuilder = new StringBuilder();
nameBuilder.append(addr.toString().replace('.', '_').replace('-', '_').replace(":", "_"));
return nameBuilder.toString();
}
private void completeOperation(GenericCallback<PerChannelBookieClient> op, int rc) {
//Thread.dumpStack();
closeLock.readLock().lock();
try {
if (ConnectionState.CLOSED == state) {
op.operationComplete(BKException.Code.ClientClosedException, this);
} else {
op.operationComplete(rc, this);
}
} finally {
closeLock.readLock().unlock();
}
}
protected long getNumPendingCompletionRequests() {
return completionObjects.size();
}
protected ChannelFuture connect() {
final long startTime = MathUtils.nowInNano();
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to bookie: {}", bookieId);
}
BookieSocketAddress addr;
try {
addr = bookieAddressResolver.resolve(bookieId);
} catch (BookieAddressResolver.BookieIdNotResolvedException err) {
LOG.error("Cannot connect to {} as endpoint resolution failed (probably bookie is down)",
bookieId, err.toString());
return processBookieNotResolvedError(startTime, err);
}
// Set up the ClientBootStrap so we can create a new Channel connection to the bookie.
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.channel(EpollSocketChannel.class);
} else if (eventLoopGroup instanceof DefaultEventLoopGroup) {
bootstrap.channel(LocalChannel.class);
} else {
bootstrap.channel(NioSocketChannel.class);
}
bootstrap.option(ChannelOption.ALLOCATOR, this.allocator);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getClientConnectTimeoutMillis());
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
conf.getClientWriteBufferLowWaterMark(), conf.getClientWriteBufferHighWaterMark()));
if (!(eventLoopGroup instanceof DefaultEventLoopGroup)) {
bootstrap.option(ChannelOption.TCP_NODELAY, conf.getClientTcpNoDelay());
bootstrap.option(ChannelOption.SO_KEEPALIVE, conf.getClientSockKeepalive());
// if buffer sizes are 0, let OS auto-tune it
if (conf.getClientSendBufferSize() > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, conf.getClientSendBufferSize());
}
if (conf.getClientReceiveBufferSize() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, conf.getClientReceiveBufferSize());
}
}
// In the netty pipeline, we need to split packets based on length, so we
// use the {@link LengthFieldBasedFramDecoder}. Other than that all actions
// are carried out in this class, e.g., making sense of received messages,
// prepending the length to outgoing packets etc.
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry));
pipeline.addLast(
"bookieProtoDecoder",
new BookieProtoEncoding.ResponseDecoder(extRegistry, useV2WireProtocol, shFactory != null));
pipeline.addLast("authHandler", new AuthHandler.ClientSideHandler(authProviderFactory, txnIdGenerator,
connectionPeer, useV2WireProtocol));
pipeline.addLast("mainhandler", PerChannelBookieClient.this);
}
});
SocketAddress bookieAddr = addr.getSocketAddress();
if (eventLoopGroup instanceof DefaultEventLoopGroup) {
bookieAddr = new LocalAddress(bookieId.toString());
}
ChannelFuture future = bootstrap.connect(bookieAddr);
future.addListener(contextPreservingListener(new ConnectionFutureListener(startTime)));
future.addListener(x -> makeWritable());
return future;
}
void cleanDisconnectAndClose() {
disconnect();
close();
}
/**
*
* @return boolean, true is PCBC is writable
*/
public boolean isWritable() {
return isWritable;
}
public void setWritable(boolean val) {
isWritable = val;
}
private void makeWritable() {
setWritable(true);
}
void connectIfNeededAndDoOp(GenericCallback<PerChannelBookieClient> op) {
boolean completeOpNow = false;
int opRc = BKException.Code.OK;
// common case without lock first
if (channel != null && state == ConnectionState.CONNECTED) {
completeOpNow = true;
} else {
synchronized (this) {
// check the channel status again under lock
if (channel != null && state == ConnectionState.CONNECTED) {
completeOpNow = true;
opRc = BKException.Code.OK;
} else if (state == ConnectionState.CLOSED) {
completeOpNow = true;
opRc = BKException.Code.BookieHandleNotAvailableException;
} else {
// channel is either null (first connection attempt), or the
// channel is disconnected. Connection attempt is still in
// progress, queue up this op. Op will be executed when
// connection attempt either fails or succeeds
pendingOps.add(op);
if (state == ConnectionState.CONNECTING
|| state == ConnectionState.START_TLS) {
// the connection request has already been sent and it is waiting for the response.
return;
}
// switch state to connecting and do connection attempt
state = ConnectionState.CONNECTING;
}
}
if (!completeOpNow) {
// Start connection attempt to the input server host.
connect();
}
}
if (completeOpNow) {
completeOperation(op, opRc);
}
}
void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteBufList toSend, WriteLacCallback cb,
Object ctx) {
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId,
OperationType.WRITE_LAC);
// writeLac is mostly like addEntry hence uses addEntryTimeout
completionObjects.put(completionKey,
new WriteLacCompletion(completionKey, cb,
ctx, lac));
// Build the request
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.WRITE_LAC)
.setTxnId(txnId);
ByteString body;
if (toSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
} else if (toSend.size() == 1) {
body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
} else {
body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
}
WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
.setLedgerId(ledgerId)
.setLac(lac)
.setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
.setBody(body);
final Request writeLacRequest = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setWriteLacRequest(writeLacBuilder)
.build();
writeAndFlush(channel, completionKey, writeLacRequest);
}
void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
if (useV2WireProtocol) {
LOG.error("force is not allowed with v2 protocol");
executor.executeOrdered(ledgerId, () -> {
cb.forceLedgerComplete(BKException.Code.IllegalOpException, ledgerId, bookieId, ctx);
});
return;
}
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId,
OperationType.FORCE_LEDGER);
// force is mostly like addEntry hence uses addEntryTimeout
completionObjects.put(completionKey,
new ForceLedgerCompletion(completionKey, cb,
ctx, ledgerId));
// Build the request
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.FORCE_LEDGER)
.setTxnId(txnId);
ForceLedgerRequest.Builder writeLacBuilder = ForceLedgerRequest.newBuilder()
.setLedgerId(ledgerId);
final Request forceLedgerRequest = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setForceLedgerRequest(writeLacBuilder)
.build();
writeAndFlush(channel, completionKey, forceLedgerRequest);
}
/**
* This method should be called only after connection has been checked for
* {@link #connectIfNeededAndDoOp(GenericCallback)}.
*
* @param ledgerId
* Ledger Id
* @param masterKey
* Master Key
* @param entryId
* Entry Id
* @param toSend
* Buffer to send
* @param cb
* Write callback
* @param ctx
* Write callback context
* @param allowFastFail
* allowFastFail flag
* @param writeFlags
* WriteFlags
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
executor.executeOrdered(ledgerId, () -> {
cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
});
return;
}
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
request = BookieProtocol.AddRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
(short) options, masterKey, toSend);
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.ADD_ENTRY)
.setTxnId(txnId);
if (((short) options & BookieProtocol.FLAG_HIGH_PRIORITY) == BookieProtocol.FLAG_HIGH_PRIORITY) {
headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
}
ByteString body = null;
if (toSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
} else {
for (int i = 0; i < toSend.size(); i++) {
ByteString piece = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer());
// use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs
body = (body == null) ? piece : body.concat(piece);
}
}
AddRequest.Builder addBuilder = AddRequest.newBuilder()
.setLedgerId(ledgerId)
.setEntryId(entryId)
.setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey))
.setBody(body);
if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
}
if (!writeFlags.isEmpty()) {
// add flags only if needed, in order to be able to talk with old bookies
addBuilder.setWriteFlags(WriteFlag.getWriteFlagsValue(writeFlags));
}
request = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setAddRequest(addBuilder)
.build();
}
putCompletionKeyValue(completionKey,
acquireAddCompletion(completionKey,
cb, ctx, ledgerId, entryId));
final Channel c = channel;
if (c == null) {
// usually checked in writeAndFlush, but we have extra check
// because we need to release toSend.
errorOut(completionKey);
toSend.release();
return;
} else {
// addEntry times out on backpressure
writeAndFlush(c, completionKey, request, allowFastFail);
}
}
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, 0, (short) 0, null);
completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.READ_LAC);
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.READ_LAC)
.setTxnId(txnId);
ReadLacRequest.Builder readLacBuilder = ReadLacRequest.newBuilder()
.setLedgerId(ledgerId);
request = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setReadLacRequest(readLacBuilder)
.build();
}
putCompletionKeyValue(completionKey,
new ReadLacCompletion(completionKey, cb,
ctx, ledgerId));
writeAndFlush(channel, completionKey, request);
}
public void getListOfEntriesOfLedger(final long ledgerId, GetListOfEntriesOfLedgerCallback cb) {
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER);
completionObjects.put(completionKey, new GetListOfEntriesOfLedgerCompletion(completionKey, cb, ledgerId));
// Build the request.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder().setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER).setTxnId(txnId);
GetListOfEntriesOfLedgerRequest.Builder getListOfEntriesOfLedgerRequestBuilder =
GetListOfEntriesOfLedgerRequest.newBuilder().setLedgerId(ledgerId);
final Request getListOfEntriesOfLedgerRequest = Request.newBuilder().setHeader(headerBuilder)
.setGetListOfEntriesOfLedgerRequest(getListOfEntriesOfLedgerRequestBuilder).build();
writeAndFlush(channel, completionKey, getListOfEntriesOfLedgerRequest);
}
/**
* Long Poll Reads.
*/
public void readEntryWaitForLACUpdate(final long ledgerId,
final long entryId,
final long previousLAC,
final long timeOutInMillis,
final boolean piggyBackEntry,
ReadEntryCallback cb,
Object ctx) {
readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis,
piggyBackEntry, cb, ctx, (short) 0, null, false);
}
/**
* Normal Reads.
*/
public void readEntry(final long ledgerId,
final long entryId,
ReadEntryCallback cb,
Object ctx,
int flags,
byte[] masterKey,
boolean allowFastFail) {
readEntryInternal(ledgerId, entryId, null, null, false,
cb, ctx, (short) flags, masterKey, allowFastFail);
}
private void readEntryInternal(final long ledgerId,
final long entryId,
final Long previousLAC,
final Long timeOutInMillis,
final boolean piggyBackEntry,
final ReadEntryCallback cb,
final Object ctx,
int flags,
byte[] masterKey,
boolean allowFastFail) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, entryId, (short) flags, masterKey);
completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.READ_ENTRY);
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.READ_ENTRY)
.setTxnId(txnId);
if (((short) flags & BookieProtocol.FLAG_HIGH_PRIORITY) == BookieProtocol.FLAG_HIGH_PRIORITY) {
headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
}
ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
.setLedgerId(ledgerId)
.setEntryId(entryId);
if (null != previousLAC) {
readBuilder = readBuilder.setPreviousLAC(previousLAC);
}
if (null != timeOutInMillis) {
// Long poll requires previousLAC
if (null == previousLAC) {
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
ledgerId, entryId, null, ctx);
return;
}
readBuilder = readBuilder.setTimeOut(timeOutInMillis);
}
if (piggyBackEntry) {
// Long poll requires previousLAC
if (null == previousLAC) {
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
ledgerId, entryId, null, ctx);
return;
}
readBuilder = readBuilder.setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK);
}
// Only one flag can be set on the read requests
if (((short) flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING) {
readBuilder.setFlag(ReadRequest.Flag.FENCE_LEDGER);
if (masterKey == null) {
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
ledgerId, entryId, null, ctx);
return;
}
readBuilder.setMasterKey(ByteString.copyFrom(masterKey));
}
request = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setReadRequest(readBuilder)
.build();
}
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
putCompletionKeyValue(completionKey, readCompletion);
writeAndFlush(channel, completionKey, request, allowFastFail);
}
public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.GET_BOOKIE_INFO);
completionObjects.put(completionKey,
new GetBookieInfoCompletion(
completionKey, cb, ctx));
// Build the request and calculate the total size to be included in the packet.
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.GET_BOOKIE_INFO)
.setTxnId(txnId);
GetBookieInfoRequest.Builder getBookieInfoBuilder = GetBookieInfoRequest.newBuilder()
.setRequested(requested);
final Request getBookieInfoRequest = withRequestContext(Request.newBuilder())
.setHeader(headerBuilder)
.setGetBookieInfoRequest(getBookieInfoBuilder)
.build();
writeAndFlush(channel, completionKey, getBookieInfoRequest);
}
private static final BiPredicate<CompletionKey, CompletionValue> timeoutCheck = (key, value) -> {
return value.maybeTimeout();
};
public void checkTimeoutOnPendingOperations() {
int timedOutOperations = completionObjects.removeIf(timeoutCheck);
timedOutOperations += completionObjectsV2Conflicts.removeIf(timeoutCheck);
if (timedOutOperations > 0) {
LOG.info("Timed-out {} operations to channel {} for {}",
timedOutOperations, channel, bookieId);
}
}
/**
* Disconnects the bookie client. It can be reused.
*/
public void disconnect() {
disconnect(true);
}
public void disconnect(boolean wait) {
LOG.info("Disconnecting the per channel bookie client for {}", bookieId);
closeInternal(false, wait);
}
/**
* Closes the bookie client permanently. It cannot be reused.
*/
public void close() {
close(true);
}
public void close(boolean wait) {
LOG.info("Closing the per channel bookie client for {}", bookieId);
closeLock.writeLock().lock();
try {
if (ConnectionState.CLOSED == state) {
return;
}
state = ConnectionState.CLOSED;
errorOutOutstandingEntries(BKException.Code.ClientClosedException);
} finally {
closeLock.writeLock().unlock();
}
if (channel != null && channel.pipeline().get(SslHandler.class) != null) {
activeTlsChannelCounter.dec();
} else {
activeNonTlsChannelCounter.dec();
}
closeInternal(true, wait);
}
private void closeInternal(boolean permanent, boolean wait) {
Channel toClose = null;
synchronized (this) {
if (permanent) {
state = ConnectionState.CLOSED;
} else if (state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
}
toClose = channel;
channel = null;
makeWritable();
}
if (toClose != null) {
ChannelFuture cf = closeChannel(toClose);
if (wait) {
cf.awaitUninterruptibly();
}
}
}
private ChannelFuture closeChannel(Channel c) {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing channel {}", c);
}
return c.close().addListener(x -> makeWritable());
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
final Channel c = channel;
if (c == null || c.isWritable()) {
makeWritable();
}
super.channelWritabilityChanged(ctx);
}
private void writeAndFlush(final Channel channel,
final CompletionKey key,
final Object request) {
writeAndFlush(channel, key, request, false);
}
private void writeAndFlush(final Channel channel,
final CompletionKey key,
final Object request,
final boolean allowFastFail) {
if (channel == null) {
LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
errorOut(key);
return;
}
final boolean isChannelWritable = channel.isWritable();
if (isWritable != isChannelWritable) {
// isWritable is volatile so simple "isWritable = channel.isWritable()" would be slower
isWritable = isChannelWritable;
}
if (allowFastFail && !isWritable) {
LOG.warn("Operation {} failed: TooManyRequestsException",
StringUtils.requestToString(request));
errorOut(key, BKException.Code.TooManyRequestsException);
return;
}
try {
final long startTime = MathUtils.nowInNano();
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
CompletionValue completion = completionObjects.get(key);
if (completion != null) {
completion.setOutstanding();
}
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});
channel.writeAndFlush(request, promise);
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
errorOut(key);
}
}
void errorOut(final CompletionKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
}
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut();
} else {
// If there's no completion object here, try in the multimap
completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut());
}
}
void errorOut(final CompletionKey key, final int rc) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
}
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut(rc);
} else {
// If there's no completion object here, try in the multimap
completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut(rc));
}
}
/**
* Errors out pending ops from per channel bookie client. As the channel
* is being closed, all the operations waiting on the connection
* will be sent to completion with error.
*/
void errorOutPendingOps(int rc) {
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
synchronized (this) {
oldPendingOps = pendingOps;
pendingOps = new ArrayDeque<>();
}
for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, PerChannelBookieClient.this);
}
}
/**
* Errors out pending entries. We call this method from one thread to avoid
* concurrent executions to QuorumOpMonitor (implements callbacks). It seems
* simpler to call it from BookieHandle instead of calling directly from
* here.
*/
void errorOutOutstandingEntries(int rc) {
Optional<CompletionKey> multikey = completionObjectsV2Conflicts.getAnyKey();
while (multikey.isPresent()) {
multikey.ifPresent(k -> errorOut(k, rc));
multikey = completionObjectsV2Conflicts.getAnyKey();
}
for (CompletionKey key : completionObjects.keys()) {
errorOut(key, rc);
}
}
void recordError() {
if (pcbcPool != null) {
pcbcPool.recordError();
}
}
/**
* If our channel has disconnected, we just error out the pending entries.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Disconnected from bookie channel {}", ctx.channel());
if (ctx.channel() != null) {
closeChannel(ctx.channel());
if (ctx.channel().pipeline().get(SslHandler.class) != null) {
activeTlsChannelCounter.dec();
} else {
activeNonTlsChannelCounter.dec();
}
}
errorOutOutstandingEntries(BKException.Code.BookieHandleNotAvailableException);
errorOutPendingOps(BKException.Code.BookieHandleNotAvailableException);
synchronized (this) {
if (this.channel == ctx.channel()
&& state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
}
}
// we don't want to reconnect right away. If someone sends a request to
// this address, we will reconnect.
}
/**
* Called by netty when an exception happens in one of the netty threads
* (mostly due to what we do in the netty threads).
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exceptionCounter.inc();
if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) {
LOG.error("Corrupted frame received from bookie: {}", ctx.channel().remoteAddress());
ctx.close();
return;
}
if (cause instanceof AuthHandler.AuthenticationException) {
LOG.error("Error authenticating connection", cause);
errorOutOutstandingEntries(BKException.Code.UnauthorizedAccessException);
Channel c = ctx.channel();
if (c != null) {
closeChannel(c);
}
return;
}
// TLSv1.3 doesn't throw SSLHandshakeException for certificate issues
// see https://stackoverflow.com/a/62465859 for details about the reason
// therefore catch SSLException to also cover TLSv1.3
if (cause instanceof DecoderException && cause.getCause() instanceof SSLException) {
LOG.error("TLS handshake failed", cause);
errorOutPendingOps(BKException.Code.SecurityException);
Channel c = ctx.channel();
if (c != null) {
closeChannel(c);
}
}
if (cause instanceof IOException) {
if (cause instanceof NativeIoException) {
// Stack trace is not very interesting for native IO exceptio, the important part is in
// the exception message
LOG.warn("Exception caught on:{} cause: {}", ctx.channel(), cause.getMessage());
} else {
LOG.warn("Exception caught on:{} cause:", ctx.channel(), cause);
}
ctx.close();
return;
}
synchronized (this) {
if (state == ConnectionState.CLOSED) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected exception caught by bookie client channel handler, "
+ "but the client is closed, so it isn't important", cause);
}
} else {
LOG.error("Unexpected exception caught by bookie client channel handler", cause);
}
}
// Since we are a library, cant terminate App here, can we?
ctx.close();
}
/**
* Called by netty when a message is received on a channel.
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof BookieProtocol.Response) {
BookieProtocol.Response response = (BookieProtocol.Response) msg;
readV2Response(response);
} else if (msg instanceof Response) {
Response response = (Response) msg;
readV3Response(response);
} else {
ctx.fireChannelRead(msg);
}
}
private void readV2Response(final BookieProtocol.Response response) {
OperationType operationType = getOperationType(response.getOpCode());
StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
CompletionKey key = acquireV2Key(response.ledgerId, response.entryId, operationType);
CompletionValue completionValue = getCompletionValue(key);
key.release();
if (null == completionValue) {
// Unexpected response, so log it. The txnId should have been present.
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected response received from bookie : " + bookieId + " for type : " + operationType
+ " and ledger:entry : " + response.ledgerId + ":" + response.entryId);
}
response.release();
} else {
long orderingKey = completionValue.ledgerId;
executor.executeOrdered(orderingKey,
ReadV2ResponseCallback.create(completionValue, response.ledgerId, response.entryId,
status, response));
}
}
private static class ReadV2ResponseCallback extends SafeRunnable {
CompletionValue completionValue;
long ledgerId;
long entryId;
StatusCode status;
BookieProtocol.Response response;
static ReadV2ResponseCallback create(CompletionValue completionValue, long ledgerId, long entryId,
StatusCode status, BookieProtocol.Response response) {
ReadV2ResponseCallback callback = RECYCLER.get();
callback.completionValue = completionValue;
callback.ledgerId = ledgerId;
callback.entryId = entryId;
callback.status = status;
callback.response = response;
return callback;
}
@Override
public void safeRun() {
completionValue.handleV2Response(ledgerId, entryId, status, response);
response.release();
response.recycle();
recycle();
}
void recycle() {
completionValue = null;
ledgerId = -1;
entryId = -1;
status = null;
response = null;
recyclerHandle.recycle(this);
}
private final Handle<ReadV2ResponseCallback> recyclerHandle;
private ReadV2ResponseCallback(Handle<ReadV2ResponseCallback> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private static final Recycler<ReadV2ResponseCallback> RECYCLER = new Recycler<ReadV2ResponseCallback>() {
@Override
protected ReadV2ResponseCallback newObject(Handle<ReadV2ResponseCallback> handle) {
return new ReadV2ResponseCallback(handle);
}
};
}
private static OperationType getOperationType(byte opCode) {
switch (opCode) {
case BookieProtocol.ADDENTRY:
return OperationType.ADD_ENTRY;
case BookieProtocol.READENTRY:
return OperationType.READ_ENTRY;
case BookieProtocol.AUTH:
return OperationType.AUTH;
case BookieProtocol.READ_LAC:
return OperationType.READ_LAC;
case BookieProtocol.WRITE_LAC:
return OperationType.WRITE_LAC;
case BookieProtocol.GET_BOOKIE_INFO:
return OperationType.GET_BOOKIE_INFO;
default:
throw new IllegalArgumentException("Invalid operation type " + opCode);
}
}
private static StatusCode getStatusCodeFromErrorCode(int errorCode) {
switch (errorCode) {
case BookieProtocol.EOK:
return StatusCode.EOK;
case BookieProtocol.ENOLEDGER:
return StatusCode.ENOLEDGER;
case BookieProtocol.ENOENTRY:
return StatusCode.ENOENTRY;
case BookieProtocol.EBADREQ:
return StatusCode.EBADREQ;
case BookieProtocol.EIO:
return StatusCode.EIO;
case BookieProtocol.EUA:
return StatusCode.EUA;
case BookieProtocol.EBADVERSION:
return StatusCode.EBADVERSION;
case BookieProtocol.EFENCED:
return StatusCode.EFENCED;
case BookieProtocol.EREADONLY:
return StatusCode.EREADONLY;
case BookieProtocol.ETOOMANYREQUESTS:
return StatusCode.ETOOMANYREQUESTS;
default:
throw new IllegalArgumentException("Invalid error code: " + errorCode);
}
}
private void readV3Response(final Response response) {
final BKPacketHeader header = response.getHeader();
final CompletionKey key = newCompletionKey(header.getTxnId(), header.getOperation());
final CompletionValue completionValue = completionObjects.get(key);
if (null == completionValue) {
// Unexpected response, so log it. The txnId should have been present.
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected response received from bookie : " + bookieId + " for type : "
+ header.getOperation() + " and txnId : " + header.getTxnId());
}
} else {
long orderingKey = completionValue.ledgerId;
executor.executeOrdered(orderingKey, new SafeRunnable() {
@Override
public void safeRun() {
completionValue.restoreMdcContext();
completionValue.handleV3Response(response);
}
@Override
public String toString() {
return String.format("HandleResponse(Txn=%d, Type=%s, Entry=(%d, %d))",
header.getTxnId(), header.getOperation(),
completionValue.ledgerId, completionValue.entryId);
}
});
}
completionObjects.remove(key);
}
void initTLSHandshake() {
// create TLS handler
PerChannelBookieClient parentObj = PerChannelBookieClient.this;
SslHandler handler = parentObj.shFactory.newTLSHandler();
channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
int rc;
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
synchronized (PerChannelBookieClient.this) {
if (future.isSuccess() && state == ConnectionState.CONNECTING) {
LOG.error("Connection state changed before TLS handshake completed {}/{}", bookieId, state);
rc = BKException.Code.BookieHandleNotAvailableException;
closeChannel(channel);
channel = null;
if (state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
}
} else if (future.isSuccess() && state == ConnectionState.START_TLS) {
rc = BKException.Code.OK;
LOG.info("Successfully connected to bookie using TLS: " + bookieId);
state = ConnectionState.CONNECTED;
AuthHandler.ClientSideHandler authHandler = future.get().pipeline()
.get(AuthHandler.ClientSideHandler.class);
if (conf.getHostnameVerificationEnabled() && !authHandler.verifyTlsHostName(channel)) {
// add HostnameVerification or private classes not
// for validation
rc = BKException.Code.UnauthorizedAccessException;
} else {
authHandler.authProvider.onProtocolUpgrade();
activeTlsChannelCounter.inc();
}
} else if (future.isSuccess()
&& (state == ConnectionState.CLOSED || state == ConnectionState.DISCONNECTED)) {
LOG.warn("Closed before TLS handshake completed, clean up: {}, current state {}",
channel, state);
closeChannel(channel);
rc = BKException.Code.BookieHandleNotAvailableException;
channel = null;
} else if (future.isSuccess() && state == ConnectionState.CONNECTED) {
LOG.debug("Already connected with another channel({}), so close the new channel({})",
channel, channel);
closeChannel(channel);
return; // pendingOps should have been completed when other channel connected
} else {
LOG.error("TLS handshake failed with bookie: {}/{}, current state {} : ",
channel, bookieId, state, future.cause());
rc = BKException.Code.SecurityException;
closeChannel(channel);
channel = null;
if (state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
}
failedTlsHandshakeCounter.inc();
}
// trick to not do operations under the lock, take the list
// of pending ops and assign it to a new variable, while
// emptying the pending ops by just assigning it to a new
// list
oldPendingOps = pendingOps;
pendingOps = new ArrayDeque<>();
}
makeWritable();
for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, PerChannelBookieClient.this);
}
}
});
}
/**
* Boiler-plate wrapper classes follow.
*
*/
// visible for testing
abstract class CompletionValue {
private final OpStatsLogger opLogger;
private final OpStatsLogger timeoutOpLogger;
private final String operationName;
private final Map<String, String> mdcContextMap;
protected Object ctx;
protected long ledgerId;
protected long entryId;
protected long startTime;
public CompletionValue(String operationName,
Object ctx,
long ledgerId, long entryId,
OpStatsLogger opLogger,
OpStatsLogger timeoutOpLogger) {
this.operationName = operationName;
this.ctx = ctx;
this.ledgerId = ledgerId;
this.entryId = entryId;
this.startTime = MathUtils.nowInNano();
this.opLogger = opLogger;
this.timeoutOpLogger = timeoutOpLogger;
this.mdcContextMap = preserveMdcForTaskExecution ? MDC.getCopyOfContextMap() : null;
}
private long latency() {
return MathUtils.elapsedNanos(startTime);
}
void logOpResult(int rc) {
if (rc != BKException.Code.OK) {
opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS);
} else {
opLogger.registerSuccessfulEvent(latency(), TimeUnit.NANOSECONDS);
}
if (rc != BKException.Code.OK
&& !expectedBkOperationErrors.contains(rc)) {
recordError();
}
}
boolean maybeTimeout() {
if (MathUtils.elapsedNanos(startTime) >= readEntryTimeoutNanos) {
timeout();
return true;
} else {
return false;
}
}
void timeout() {
errorOut(BKException.Code.TimeoutException);
timeoutOpLogger.registerSuccessfulEvent(latency(),
TimeUnit.NANOSECONDS);
}
protected void logResponse(StatusCode status, Object... extraInfo) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got {} response from bookie:{} rc:{}, {}", operationName, bookieId, status,
Joiner.on(":").join(extraInfo));
}
}
protected int convertStatus(StatusCode status, int defaultStatus) {
// convert to BKException code
int rcToRet = statusCodeToExceptionCode(status);
if (rcToRet == BKException.Code.UNINITIALIZED) {
LOG.error("{} for failed on bookie {} code {}",
operationName, bookieId, status);
return defaultStatus;
} else {
return rcToRet;
}
}
public void restoreMdcContext() {
MdcUtils.restoreContext(mdcContextMap);
}
public abstract void errorOut();
public abstract void errorOut(int rc);
public void setOutstanding() {
// no-op
}
protected void errorOutAndRunCallback(final Runnable callback) {
executor.executeOrdered(ledgerId,
new SafeRunnable() {
@Override
public void safeRun() {
String bAddress = "null";
Channel c = channel;
if (c != null && c.remoteAddress() != null) {
bAddress = c.remoteAddress().toString();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Could not write {} request to bookie {} for ledger {}, entry {}",
operationName, bAddress,
ledgerId, entryId);
}
callback.run();
}
});
}
public void handleV2Response(
long ledgerId, long entryId, StatusCode status,
BookieProtocol.Response response) {
LOG.warn("Unhandled V2 response {}", response);
}
public abstract void handleV3Response(
BookkeeperProtocol.Response response);
}
// visible for testing
class WriteLacCompletion extends CompletionValue {
final WriteLacCallback cb;
public WriteLacCompletion(final CompletionKey key,
final WriteLacCallback originalCallback,
final Object originalCtx,
final long ledgerId) {
super("WriteLAC",
originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
writeLacOpLogger, writeLacTimeoutOpLogger);
this.cb = new WriteLacCallback() {
@Override
public void writeLacComplete(int rc, long ledgerId,
BookieId addr,
Object ctx) {
logOpResult(rc);
originalCallback.writeLacComplete(rc, ledgerId,
addr, originalCtx);
key.release();
}
};
}
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@Override
public void errorOut(final int rc) {
errorOutAndRunCallback(
() -> cb.writeLacComplete(rc, ledgerId, bookieId, ctx));
}
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
WriteLacResponse writeLacResponse = response.getWriteLacResponse();
StatusCode status = response.getStatus() == StatusCode.EOK
? writeLacResponse.getStatus() : response.getStatus();
long ledgerId = writeLacResponse.getLedgerId();
if (LOG.isDebugEnabled()) {
logResponse(status, "ledger", ledgerId);
}
int rc = convertStatus(status, BKException.Code.WriteException);
cb.writeLacComplete(rc, ledgerId, bookieId, ctx);
}
}
class ForceLedgerCompletion extends CompletionValue {
final ForceLedgerCallback cb;
public ForceLedgerCompletion(final CompletionKey key,
final ForceLedgerCallback originalCallback,
final Object originalCtx,
final long ledgerId) {
super("ForceLedger",
originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
forceLedgerOpLogger, forceLedgerTimeoutOpLogger);
this.cb = new ForceLedgerCallback() {
@Override
public void forceLedgerComplete(int rc, long ledgerId,
BookieId addr,
Object ctx) {
logOpResult(rc);
originalCallback.forceLedgerComplete(rc, ledgerId,
addr, originalCtx);
key.release();
}
};
}
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@Override
public void errorOut(final int rc) {
errorOutAndRunCallback(
() -> cb.forceLedgerComplete(rc, ledgerId, bookieId, ctx));
}
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
ForceLedgerResponse forceLedgerResponse = response.getForceLedgerResponse();
StatusCode status = response.getStatus() == StatusCode.EOK
? forceLedgerResponse.getStatus() : response.getStatus();
long ledgerId = forceLedgerResponse.getLedgerId();
if (LOG.isDebugEnabled()) {
logResponse(status, "ledger", ledgerId);
}
int rc = convertStatus(status, BKException.Code.WriteException);
cb.forceLedgerComplete(rc, ledgerId, bookieId, ctx);
}
}
// visible for testing
class ReadLacCompletion extends CompletionValue {
final ReadLacCallback cb;
public ReadLacCompletion(final CompletionKey key,
ReadLacCallback originalCallback,
final Object ctx, final long ledgerId) {
super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED,
readLacOpLogger, readLacTimeoutOpLogger);
this.cb = new ReadLacCallback() {
@Override
public void readLacComplete(int rc, long ledgerId,
ByteBuf lacBuffer,
ByteBuf lastEntryBuffer,
Object ctx) {
logOpResult(rc);
originalCallback.readLacComplete(
rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
key.release();
}
};
}
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@Override
public void errorOut(final int rc) {
errorOutAndRunCallback(
() -> cb.readLacComplete(rc, ledgerId, null, null, ctx));
}
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
ReadLacResponse readLacResponse = response.getReadLacResponse();
ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
StatusCode status = response.getStatus() == StatusCode.EOK
? readLacResponse.getStatus() : response.getStatus();
if (readLacResponse.hasLacBody()) {
lacBuffer = Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
}
if (readLacResponse.hasLastEntryBody()) {
lastEntryBuffer = Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
}
if (LOG.isDebugEnabled()) {
logResponse(status, "ledgerId", ledgerId);
}
int rc = convertStatus(status, BKException.Code.ReadException);
cb.readLacComplete(rc, ledgerId, lacBuffer.slice(),
lastEntryBuffer.slice(), ctx);
}
}
// visible for testing
class ReadCompletion extends CompletionValue {
final ReadEntryCallback cb;
public ReadCompletion(final CompletionKey key,
final ReadEntryCallback originalCallback,
final Object originalCtx,
long ledgerId, final long entryId) {
super("Read", originalCtx, ledgerId, entryId,
readEntryOpLogger, readTimeoutOpLogger);
this.cb = new ReadEntryCallback() {
@Override
public void readEntryComplete(int rc, long ledgerId,
long entryId, ByteBuf buffer,
Object ctx) {
logOpResult(rc);
originalCallback.readEntryComplete(rc,
ledgerId, entryId,
buffer, originalCtx);
key.release();
}
};
}
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@Override
public void errorOut(final int rc) {
errorOutAndRunCallback(
() -> cb.readEntryComplete(rc, ledgerId,
entryId, null, ctx));
}
@Override
public void setOutstanding() {
readEntryOutstanding.inc();
}
@Override
public void handleV2Response(long ledgerId, long entryId,
StatusCode status,
BookieProtocol.Response response) {
readEntryOutstanding.dec();
if (!(response instanceof BookieProtocol.ReadResponse)) {
return;
}
BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse) response;
handleReadResponse(ledgerId, entryId, status, readResponse.getData(),
INVALID_ENTRY_ID, -1L);
}
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
readEntryOutstanding.dec();
ReadResponse readResponse = response.getReadResponse();
StatusCode status = response.getStatus() == StatusCode.EOK
? readResponse.getStatus() : response.getStatus();
ByteBuf buffer = Unpooled.EMPTY_BUFFER;
if (readResponse.hasBody()) {
buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
}
long maxLAC = INVALID_ENTRY_ID;
if (readResponse.hasMaxLAC()) {
maxLAC = readResponse.getMaxLAC();
}
long lacUpdateTimestamp = -1L;
if (readResponse.hasLacUpdateTimestamp()) {
lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
}
handleReadResponse(readResponse.getLedgerId(),
readResponse.getEntryId(),
status, buffer, maxLAC, lacUpdateTimestamp);
buffer.release(); // meaningless using unpooled, but client may expect to hold the last reference
}
private void handleReadResponse(long ledgerId,
long entryId,
StatusCode status,
ByteBuf buffer,
long maxLAC, // max known lac piggy-back from bookies
long lacUpdateTimestamp) { // the timestamp when the lac is updated.
int readableBytes = buffer.readableBytes();
if (LOG.isDebugEnabled()) {
logResponse(status, "ledger", ledgerId, "entry", entryId, "entryLength", readableBytes);
}
int rc = convertStatus(status, BKException.Code.ReadException);
if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof ReadEntryCallbackCtx)) {
((ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
}
if (lacUpdateTimestamp > -1L && (ctx instanceof ReadLastConfirmedAndEntryContext)) {
((ReadLastConfirmedAndEntryContext) ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
}
cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), ctx);
}
}
class StartTLSCompletion extends CompletionValue {
final StartTLSCallback cb;
public StartTLSCompletion(final CompletionKey key) {
super("StartTLS", null, -1, -1,
startTLSOpLogger, startTLSTimeoutOpLogger);
this.cb = new StartTLSCallback() {
@Override
public void startTLSComplete(int rc, Object ctx) {
logOpResult(rc);
key.release();
}
};
}
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@Override
public void errorOut(final int rc) {
failTLS(rc);
}
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
StatusCode status = response.getStatus();
if (LOG.isDebugEnabled()) {
logResponse(status);
}
int rc = convertStatus(status, BKException.Code.SecurityException);
// Cancel START_TLS request timeout
cb.startTLSComplete(rc, null);
if (state != ConnectionState.START_TLS) {
LOG.error("Connection state changed before TLS response received");
failTLS(BKException.Code.BookieHandleNotAvailableException);
} else if (status != StatusCode.EOK) {
LOG.error("Client received error {} during TLS negotiation", status);
failTLS(BKException.Code.SecurityException);
} else {
initTLSHandshake();
}
}
}
// visible for testing
class GetBookieInfoCompletion extends CompletionValue {
final GetBookieInfoCallback cb;
public GetBookieInfoCompletion(final CompletionKey key,
final GetBookieInfoCallback origCallback,
final Object origCtx) {
super("GetBookieInfo", origCtx, 0L, 0L,
getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger);
this.cb = new GetBookieInfoCallback() {
@Override
public void getBookieInfoComplete(int rc, BookieInfo bInfo,
Object ctx) {
logOpResult(rc);
origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
key.release();
}
};
}
@Override
boolean maybeTimeout() {
if (MathUtils.elapsedNanos(startTime) >= getBookieInfoTimeoutNanos) {
timeout();
return true;
} else {
return false;
}
}
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@Override
public void errorOut(final int rc) {
errorOutAndRunCallback(
() -> cb.getBookieInfoComplete(rc, new BookieInfo(), ctx));
}
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
StatusCode status = response.getStatus() == StatusCode.EOK
? getBookieInfoResponse.getStatus() : response.getStatus();
long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
long totalDiskSpace = getBookieInfoResponse.getTotalDiskCapacity();
if (LOG.isDebugEnabled()) {
logResponse(status, "freeDisk", freeDiskSpace, "totalDisk", totalDiskSpace);
}
int rc = convertStatus(status, BKException.Code.ReadException);
cb.getBookieInfoComplete(rc,
new BookieInfo(totalDiskSpace,
freeDiskSpace), ctx);
}
}
class GetListOfEntriesOfLedgerCompletion extends CompletionValue {
final GetListOfEntriesOfLedgerCallback cb;
public GetListOfEntriesOfLedgerCompletion(final CompletionKey key,
final GetListOfEntriesOfLedgerCallback origCallback, final long ledgerId) {
super("GetListOfEntriesOfLedger", null, ledgerId, 0L, getListOfEntriesOfLedgerCompletionOpLogger,
getListOfEntriesOfLedgerCompletionTimeoutOpLogger);
this.cb = new GetListOfEntriesOfLedgerCallback() {
@Override
public void getListOfEntriesOfLedgerComplete(int rc, long ledgerId,
AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger) {
logOpResult(rc);
origCallback.getListOfEntriesOfLedgerComplete(rc, ledgerId, availabilityOfEntriesOfLedger);
key.release();
}
};
}
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@Override
public void errorOut(final int rc) {
errorOutAndRunCallback(() -> cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, null));
}
@Override
public void handleV3Response(BookkeeperProtocol.Response response) {
GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = response
.getGetListOfEntriesOfLedgerResponse();
ByteBuf availabilityOfEntriesOfLedgerBuffer = Unpooled.EMPTY_BUFFER;
StatusCode status = response.getStatus() == StatusCode.EOK ? getListOfEntriesOfLedgerResponse.getStatus()
: response.getStatus();
if (getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(
getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
}
if (LOG.isDebugEnabled()) {
logResponse(status, "ledgerId", ledgerId);
}
int rc = convertStatus(status, BKException.Code.ReadException);
AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
if (rc == BKException.Code.OK) {
availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
availabilityOfEntriesOfLedgerBuffer.slice());
}
cb.getListOfEntriesOfLedgerComplete(rc, ledgerId, availabilityOfEntriesOfLedger);
}
}
private final Recycler<AddCompletion> addCompletionRecycler = new Recycler<AddCompletion>() {
@Override
protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) {
return new AddCompletion(handle);
}
};
AddCompletion acquireAddCompletion(final CompletionKey key,
final WriteCallback originalCallback,
final Object originalCtx,
final long ledgerId, final long entryId) {
AddCompletion completion = addCompletionRecycler.get();
completion.reset(key, originalCallback, originalCtx, ledgerId, entryId);
return completion;
}
// visible for testing
class AddCompletion extends CompletionValue implements WriteCallback {
final Recycler.Handle<AddCompletion> handle;
CompletionKey key = null;
WriteCallback originalCallback = null;
AddCompletion(Recycler.Handle<AddCompletion> handle) {
super("Add", null, -1, -1, addEntryOpLogger, addTimeoutOpLogger);
this.handle = handle;
}
void reset(final CompletionKey key,
final WriteCallback originalCallback,
final Object originalCtx,
final long ledgerId, final long entryId) {
this.key = key;
this.originalCallback = originalCallback;
this.ctx = originalCtx;
this.ledgerId = ledgerId;
this.entryId = entryId;
this.startTime = MathUtils.nowInNano();
}
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieId addr,
Object ctx) {
logOpResult(rc);
originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
key.release();
handle.recycle(this);
}
@Override
boolean maybeTimeout() {
if (MathUtils.elapsedNanos(startTime) >= addEntryTimeoutNanos) {
timeout();
return true;
} else {
return false;
}
}
@Override
public void errorOut() {
errorOut(BKException.Code.BookieHandleNotAvailableException);
}
@Override
public void errorOut(final int rc) {
errorOutAndRunCallback(
() -> writeComplete(rc, ledgerId, entryId, bookieId, ctx));
}
@Override
public void setOutstanding() {
addEntryOutstanding.inc();
}
@Override
public void handleV2Response(
long ledgerId, long entryId, StatusCode status,
BookieProtocol.Response response) {
addEntryOutstanding.dec();
handleResponse(ledgerId, entryId, status);
}
@Override
public void handleV3Response(
BookkeeperProtocol.Response response) {
addEntryOutstanding.dec();
AddResponse addResponse = response.getAddResponse();
StatusCode status = response.getStatus() == StatusCode.EOK
? addResponse.getStatus() : response.getStatus();
handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
status);
}
private void handleResponse(long ledgerId, long entryId,
StatusCode status) {
if (LOG.isDebugEnabled()) {
logResponse(status, "ledger", ledgerId, "entry", entryId);
}
int rc = convertStatus(status, BKException.Code.WriteException);
writeComplete(rc, ledgerId, entryId, bookieId, ctx);
}
}
// visable for testing
CompletionKey newCompletionKey(long txnId, OperationType operationType) {
return new V3CompletionKey(txnId, operationType);
}
class V3CompletionKey extends CompletionKey {
public V3CompletionKey(long txnId, OperationType operationType) {
super(txnId, operationType);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof V3CompletionKey)) {
return false;
}
V3CompletionKey that = (V3CompletionKey) obj;
return this.txnId == that.txnId && this.operationType == that.operationType;
}
@Override
public int hashCode() {
return ((int) txnId);
}
@Override
public String toString() {
return String.format("TxnId(%d), OperationType(%s)", txnId, operationType);
}
}
abstract class CompletionKey {
final long txnId;
OperationType operationType;
CompletionKey(long txnId,
OperationType operationType) {
this.txnId = txnId;
this.operationType = operationType;
}
public void release() {}
}
/**
* Note : Helper functions follow
*/
/**
* @param status
* @return {@link BKException.Code.UNINITIALIZED} if the statuscode is unknown.
*/
private int statusCodeToExceptionCode(StatusCode status) {
switch (status) {
case EOK:
return BKException.Code.OK;
case ENOENTRY:
return BKException.Code.NoSuchEntryException;
case ENOLEDGER:
return BKException.Code.NoSuchLedgerExistsException;
case EBADVERSION:
return BKException.Code.ProtocolVersionException;
case EUA:
return BKException.Code.UnauthorizedAccessException;
case EFENCED:
return BKException.Code.LedgerFencedException;
case EREADONLY:
return BKException.Code.WriteOnReadOnlyBookieException;
case ETOOMANYREQUESTS:
return BKException.Code.TooManyRequestsException;
default:
return BKException.Code.UNINITIALIZED;
}
}
private void putCompletionKeyValue(CompletionKey key, CompletionValue value) {
CompletionValue existingValue = completionObjects.putIfAbsent(key, value);
if (existingValue != null) { // will only happen for V2 keys, as V3 have unique txnid
// There's a pending read request on same ledger/entry. Use the multimap to track all of them
completionObjectsV2Conflicts.put(key, value);
}
}
private CompletionValue getCompletionValue(CompletionKey key) {
CompletionValue completionValue = completionObjects.remove(key);
if (completionValue == null) {
// If there's no completion object here, try in the multimap
completionValue = completionObjectsV2Conflicts.removeAny(key).orElse(null);
}
return completionValue;
}
private long getTxnId() {
return txnIdGenerator.incrementAndGet();
}
private final Recycler<V2CompletionKey> v2KeyRecycler = new Recycler<V2CompletionKey>() {
@Override
protected V2CompletionKey newObject(
Recycler.Handle<V2CompletionKey> handle) {
return new V2CompletionKey(handle);
}
};
V2CompletionKey acquireV2Key(long ledgerId, long entryId,
OperationType operationType) {
V2CompletionKey key = v2KeyRecycler.get();
key.reset(ledgerId, entryId, operationType);
return key;
}
private class V2CompletionKey extends CompletionKey {
private final Handle<V2CompletionKey> recyclerHandle;
long ledgerId;
long entryId;
private V2CompletionKey(Handle<V2CompletionKey> handle) {
super(-1, null);
this.recyclerHandle = handle;
}
void reset(long ledgerId, long entryId, OperationType operationType) {
this.ledgerId = ledgerId;
this.entryId = entryId;
this.operationType = operationType;
}
@Override
public boolean equals(Object object) {
if (!(object instanceof V2CompletionKey)) {
return false;
}
V2CompletionKey that = (V2CompletionKey) object;
return this.entryId == that.entryId
&& this.ledgerId == that.ledgerId
&& this.operationType == that.operationType;
}
@Override
public int hashCode() {
return Long.hashCode(ledgerId) * 31 + Long.hashCode(entryId);
}
@Override
public String toString() {
return String.format("%d:%d %s", ledgerId, entryId, operationType);
}
@Override
public void release() {
recyclerHandle.recycle(this);
}
}
Request.Builder withRequestContext(Request.Builder builder) {
if (preserveMdcForTaskExecution) {
return appendRequestContext(builder);
}
return builder;
}
static Request.Builder appendRequestContext(Request.Builder builder) {
final Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();
if (mdcContextMap == null || mdcContextMap.isEmpty()) {
return builder;
}
for (Map.Entry<String, String> kv : mdcContextMap.entrySet()) {
final BookkeeperProtocol.ContextPair context = BookkeeperProtocol.ContextPair.newBuilder()
.setKey(kv.getKey())
.setValue(kv.getValue())
.build();
builder.addRequestContext(context);
}
return builder;
}
ChannelFutureListener contextPreservingListener(ChannelFutureListener listener) {
return preserveMdcForTaskExecution ? new ContextPreservingFutureListener(listener) : listener;
}
/**
* Decorator to preserve MDC for connection listener.
*/
static class ContextPreservingFutureListener implements ChannelFutureListener {
private final ChannelFutureListener listener;
private final Map<String, String> mdcContextMap;
ContextPreservingFutureListener(ChannelFutureListener listener) {
this.listener = listener;
this.mdcContextMap = MDC.getCopyOfContextMap();
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
MdcUtils.restoreContext(mdcContextMap);
try {
listener.operationComplete(future);
} finally {
MDC.clear();
}
}
}
/**
* Connection listener.
*/
class ConnectionFutureListener implements ChannelFutureListener {
private final long startTime;
ConnectionFutureListener(long startTime) {
this.startTime = startTime;
}
@Override
public void operationComplete(ChannelFuture future) {
if (LOG.isDebugEnabled()) {
LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.channel());
}
int rc;
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
/* We fill in the timer based on whether the connect operation itself succeeded regardless of
* whether there was a race */
if (future.isSuccess()) {
PerChannelBookieClient.this
.connectTimer.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
} else {
PerChannelBookieClient.this
.connectTimer.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
synchronized (PerChannelBookieClient.this) {
if (future.isSuccess() && state == ConnectionState.CONNECTING && future.channel().isActive()) {
rc = BKException.Code.OK;
channel = future.channel();
if (shFactory != null) {
LOG.info("Successfully connected to bookie: {} {} initiate TLS", bookieId, future.channel());
makeWritable();
initiateTLS();
return;
} else {
LOG.info("Successfully connected to bookie: {} {}", bookieId, future.channel());
state = ConnectionState.CONNECTED;
activeNonTlsChannelCounter.inc();
}
} else if (future.isSuccess() && state == ConnectionState.START_TLS) {
rc = BKException.Code.OK;
LOG.info("Successfully connected to bookie using TLS: " + bookieId);
state = ConnectionState.CONNECTED;
AuthHandler.ClientSideHandler authHandler = future.channel().pipeline()
.get(AuthHandler.ClientSideHandler.class);
if (conf.getHostnameVerificationEnabled() && !authHandler.verifyTlsHostName(channel)) {
rc = BKException.Code.UnauthorizedAccessException;
} else {
authHandler.authProvider.onProtocolUpgrade();
activeTlsChannelCounter.inc();
}
} else if (future.isSuccess() && (state == ConnectionState.CLOSED
|| state == ConnectionState.DISCONNECTED)) {
LOG.warn("Closed before connection completed, clean up: {}, current state {}",
future.channel(), state);
closeChannel(future.channel());
rc = BKException.Code.BookieHandleNotAvailableException;
channel = null;
} else if (future.isSuccess() && state == ConnectionState.CONNECTED) {
if (LOG.isDebugEnabled()) {
LOG.debug("Already connected with another channel({}), so close the new channel({})", channel,
future.channel());
}
closeChannel(future.channel());
return; // pendingOps should have been completed when other channel connected
} else {
Throwable cause = future.cause();
if (cause instanceof UnknownHostException || cause instanceof NativeIoException) {
// Don't log stack trace for common errors
logBookieUnavailable(() -> LOG.warn("Could not connect to bookie: {}/{}, current state {} : {}",
future.channel(), bookieId, state, future.cause().getMessage()));
} else {
// Regular exceptions, include stack trace
logBookieUnavailable(() -> LOG.error("Could not connect to bookie: {}/{}, current state {} : ",
future.channel(), bookieId, state, future.cause()));
}
rc = BKException.Code.BookieHandleNotAvailableException;
Channel failedChannel = future.channel();
if (failedChannel != null) { // can be null in case of dummy failed ChannelFuture
closeChannel(failedChannel);
}
channel = null;
if (state != ConnectionState.CLOSED) {
state = ConnectionState.DISCONNECTED;
}
failedConnectionCounter.inc();
}
// trick to not do operations under the lock, take the list
// of pending ops and assign it to a new variable, while
// emptying the pending ops by just assigning it to a new
// list
oldPendingOps = pendingOps;
pendingOps = new ArrayDeque<>();
}
for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, PerChannelBookieClient.this);
}
makeWritable();
}
private void logBookieUnavailable(Runnable logger) {
final long now = System.currentTimeMillis();
if ((now - lastBookieUnavailableLogTimestamp) > conf.getClientConnectBookieUnavailableLogThrottlingMs()) {
logger.run();
lastBookieUnavailableLogTimestamp = now;
}
}
}
private void initiateTLS() {
LOG.info("Initializing TLS to {}", channel);
assert state == ConnectionState.CONNECTING;
final long txnId = getTxnId();
final CompletionKey completionKey = new V3CompletionKey(txnId, OperationType.START_TLS);
completionObjects.put(completionKey,
new StartTLSCompletion(completionKey));
BookkeeperProtocol.Request.Builder h = withRequestContext(BookkeeperProtocol.Request.newBuilder());
BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
.setVersion(ProtocolVersion.VERSION_THREE)
.setOperation(OperationType.START_TLS)
.setTxnId(txnId);
h.setHeader(headerBuilder.build());
h.setStartTLSRequest(BookkeeperProtocol.StartTLSRequest.newBuilder().build());
state = ConnectionState.START_TLS;
writeAndFlush(channel, completionKey, h.build());
}
private void failTLS(int rc) {
LOG.error("TLS failure on: {}, rc: {}", channel, rc);
Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
synchronized (this) {
disconnect();
oldPendingOps = pendingOps;
pendingOps = new ArrayDeque<>();
}
for (GenericCallback<PerChannelBookieClient> pendingOp : oldPendingOps) {
pendingOp.operationComplete(rc, null);
}
failedTlsHandshakeCounter.inc();
}
private static class FailedChannelFutureImpl implements ChannelFuture {
private final Throwable failureCause;
public FailedChannelFutureImpl(Throwable failureCause) {
this.failureCause = failureCause;
}
@Override
public Channel channel() {
// used only for log
return null;
}
@Override
public ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
throw new UnsupportedOperationException("Not supported");
}
@Override
@SuppressWarnings({"unchecked", "varargs"})
public ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
throw new UnsupportedOperationException("Not supported");
}
@Override
@SuppressWarnings({"unchecked", "varargs"})
public ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
throw new UnsupportedOperationException("Not supported");
}
@Override
public ChannelFuture sync() throws InterruptedException {
throw new UnsupportedOperationException("Not supported");
}
@Override
public ChannelFuture syncUninterruptibly() {
throw new UnsupportedOperationException("Not supported");
}
@Override
public ChannelFuture await() throws InterruptedException {
throw new UnsupportedOperationException("Not supported");
}
@Override
public ChannelFuture awaitUninterruptibly() {
throw new UnsupportedOperationException("Not supported");
}
@Override
public boolean isVoid() {
throw new UnsupportedOperationException("Not supported");
}
@Override
public boolean isSuccess() {
return false;
}
@Override
public boolean isCancellable() {
return false;
}
@Override
public Throwable cause() {
return failureCause;
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return true;
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return true;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return true;
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
}
@Override
public Void getNow() {
throw new UnsupportedOperationException("Not supported");
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
throw new ExecutionException(failureCause);
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new ExecutionException(failureCause);
}
}
}