blob: c8313bb31e7bb7463c9438a064e17a6f31dbbe46 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.proto;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
private static final Logger LOG = LoggerFactory.getLogger(ReadEntryProcessor.class);
private ExecutorService fenceThreadPool;
private boolean throttleReadResponses;
public static ReadEntryProcessor create(ReadRequest request,
Channel channel,
BookieRequestProcessor requestProcessor,
ExecutorService fenceThreadPool,
boolean throttleReadResponses) {
ReadEntryProcessor rep = RECYCLER.get();
rep.init(request, channel, requestProcessor);
rep.fenceThreadPool = fenceThreadPool;
rep.throttleReadResponses = throttleReadResponses;
return rep;
}
@Override
protected void processPacket() {
if (LOG.isDebugEnabled()) {
LOG.debug("Received new read request: {}", request);
}
int errorCode = BookieProtocol.EOK;
long startTimeNanos = MathUtils.nowInNano();
ByteBuf data = null;
try {
CompletableFuture<Boolean> fenceResult = null;
if (request.isFencing()) {
LOG.warn("Ledger: {} fenced by: {}", request.getLedgerId(), channel.remoteAddress());
if (request.hasMasterKey()) {
fenceResult = requestProcessor.getBookie().fenceLedger(request.getLedgerId(),
request.getMasterKey());
} else {
LOG.error("Password not provided, Not safe to fence {}", request.getLedgerId());
throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
}
}
data = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId());
if (LOG.isDebugEnabled()) {
LOG.debug("##### Read entry ##### {} -- ref-count: {}", data.readableBytes(), data.refCnt());
}
if (fenceResult != null) {
handleReadResultForFenceRead(fenceResult, data, startTimeNanos);
return;
}
} catch (Bookie.NoLedgerException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error reading {}", request, e);
}
errorCode = BookieProtocol.ENOLEDGER;
} catch (Bookie.NoEntryException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error reading {}", request, e);
}
errorCode = BookieProtocol.ENOENTRY;
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error reading {}", request, e);
}
errorCode = BookieProtocol.EIO;
} catch (BookieException e) {
LOG.error("Unauthorized access to ledger {}", request.getLedgerId(), e);
errorCode = BookieProtocol.EUA;
} catch (Throwable t) {
LOG.error("Unexpected exception reading at {}:{} : {}", request.getLedgerId(), request.getEntryId(),
t.getMessage(), t);
errorCode = BookieProtocol.EBADREQ;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Read entry rc = {} for {}", errorCode, request);
}
sendResponse(data, errorCode, startTimeNanos);
}
private void sendResponse(ByteBuf data, int errorCode, long startTimeNanos) {
final RequestStats stats = requestProcessor.getRequestStats();
final OpStatsLogger logger = stats.getReadEntryStats();
BookieProtocol.Response response;
if (errorCode == BookieProtocol.EOK) {
logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
response = ResponseBuilder.buildReadResponse(data, request);
} else {
if (data != null) {
ReferenceCountUtil.release(data);
}
logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
response = ResponseBuilder.buildErrorResponse(errorCode, request);
}
if (throttleReadResponses) {
sendResponseAndWait(errorCode, response, stats.getReadRequestStats());
} else {
sendResponse(errorCode, response, stats.getReadRequestStats());
}
recycle();
}
private void sendFenceResponse(Boolean result, ByteBuf data, long startTimeNanos) {
final int retCode = result != null && result ? BookieProtocol.EOK : BookieProtocol.EIO;
sendResponse(data, retCode, startTimeNanos);
}
private void handleReadResultForFenceRead(CompletableFuture<Boolean> fenceResult,
ByteBuf data,
long startTimeNanos) {
if (null != fenceThreadPool) {
fenceResult.whenCompleteAsync(new FutureEventListener<Boolean>() {
@Override
public void onSuccess(Boolean result) {
sendFenceResponse(result, data, startTimeNanos);
}
@Override
public void onFailure(Throwable t) {
LOG.error("Error processing fence request", t);
// if failed to fence, fail the read request to make it retry.
sendResponse(data, BookieProtocol.EIO, startTimeNanos);
}
}, fenceThreadPool);
} else {
try {
Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
sendFenceResponse(fenced, data, startTimeNanos);
return;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupting fence read entry {}", request, ie);
} catch (ExecutionException ee) {
LOG.error("Failed to fence read entry {}", request, ee.getCause());
} catch (TimeoutException te) {
LOG.error("Timeout to fence read entry {}", request, te);
}
sendResponse(data, BookieProtocol.EIO, startTimeNanos);
}
}
@Override
public String toString() {
return String.format("ReadEntry(%d, %d)", request.getLedgerId(), request.getEntryId());
}
private void recycle() {
super.reset();
this.recyclerHandle.recycle(this);
}
private final Recycler.Handle<ReadEntryProcessor> recyclerHandle;
private ReadEntryProcessor(Recycler.Handle<ReadEntryProcessor> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private static final Recycler<ReadEntryProcessor> RECYCLER = new Recycler<ReadEntryProcessor>() {
@Override
protected ReadEntryProcessor newObject(Recycler.Handle<ReadEntryProcessor> handle) {
return new ReadEntryProcessor(handle);
}
};
}