blob: d9ab2e6e75b372eaea9fabb8bd42fbf9668a53b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.proto;
import com.google.common.base.Stopwatch;
import io.netty.channel.Channel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Processor handling long poll read entry request.
*/
class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watcher<LastAddConfirmedUpdateNotification> {
private static final Logger logger = LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
private final Long previousLAC;
private Optional<Long> lastAddConfirmedUpdateTime = Optional.empty();
// long poll execution state
private final ExecutorService longPollThreadPool;
private final HashedWheelTimer requestTimer;
private Timeout expirationTimerTask = null;
private Future<?> deferredTask = null;
private boolean shouldReadEntry = false;
LongPollReadEntryProcessorV3(Request request,
Channel channel,
BookieRequestProcessor requestProcessor,
ExecutorService fenceThreadPool,
ExecutorService longPollThreadPool,
HashedWheelTimer requestTimer) {
super(request, channel, requestProcessor, fenceThreadPool);
this.previousLAC = readRequest.getPreviousLAC();
this.longPollThreadPool = longPollThreadPool;
this.requestTimer = requestTimer;
}
@Override
protected Long getPreviousLAC() {
return previousLAC;
}
private synchronized boolean shouldReadEntry() {
return shouldReadEntry;
}
@Override
protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
long entryId,
Stopwatch startTimeSw)
throws IOException {
if (RequestUtils.shouldPiggybackEntry(readRequest)) {
if (!readRequest.hasPreviousLAC() || (BookieProtocol.LAST_ADD_CONFIRMED != entryId)) {
// This is not a valid request - client bug?
logger.error("Incorrect read request, entry piggyback requested incorrectly for ledgerId {} entryId {}",
ledgerId, entryId);
return buildResponse(readResponseBuilder, StatusCode.EBADREQ, startTimeSw);
} else {
long knownLAC = requestProcessor.bookie.readLastAddConfirmed(ledgerId);
readResponseBuilder.setMaxLAC(knownLAC);
if (knownLAC > previousLAC) {
entryId = previousLAC + 1;
readResponseBuilder.setMaxLAC(knownLAC);
if (lastAddConfirmedUpdateTime.isPresent()) {
readResponseBuilder.setLacUpdateTimestamp(lastAddConfirmedUpdateTime.get());
}
if (logger.isDebugEnabled()) {
logger.debug("ReadLAC Piggy Back reading entry:{} from ledger: {}", entryId, ledgerId);
}
try {
return super.readEntry(readResponseBuilder, entryId, true, startTimeSw);
} catch (Bookie.NoEntryException e) {
requestProcessor.getRequestStats().getReadLastEntryNoEntryErrorCounter().inc();
logger.info(
"No entry found while piggyback reading entry {} from ledger {} : previous lac = {}",
entryId, ledgerId, previousLAC);
// piggy back is best effort and this request can fail genuinely because of striping
// entries across the ensemble
return buildResponse(readResponseBuilder, StatusCode.EOK, startTimeSw);
}
} else {
if (knownLAC < previousLAC) {
if (logger.isDebugEnabled()) {
logger.debug("Found smaller lac when piggy back reading lac and entry from ledger {} :"
+ " previous lac = {}, known lac = {}",
ledgerId, previousLAC, knownLAC);
}
}
return buildResponse(readResponseBuilder, StatusCode.EOK, startTimeSw);
}
}
} else {
return super.readEntry(readResponseBuilder, entryId, false, startTimeSw);
}
}
private ReadResponse buildErrorResponse(StatusCode statusCode, Stopwatch sw) {
ReadResponse.Builder builder = ReadResponse.newBuilder()
.setLedgerId(ledgerId)
.setEntryId(entryId);
return buildResponse(builder, statusCode, sw);
}
private ReadResponse getLongPollReadResponse() {
if (!shouldReadEntry() && readRequest.hasTimeOut()) {
if (logger.isTraceEnabled()) {
logger.trace("Waiting For LAC Update {}", previousLAC);
}
final Stopwatch startTimeSw = Stopwatch.createStarted();
final boolean watched;
try {
watched = requestProcessor.getBookie().waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
} catch (Bookie.NoLedgerException e) {
logger.info("No ledger found while longpoll reading ledger {}, previous lac = {}.",
ledgerId, previousLAC);
return buildErrorResponse(StatusCode.ENOLEDGER, startTimeSw);
} catch (IOException ioe) {
logger.error("IOException while longpoll reading ledger {}, previous lac = {} : ",
ledgerId, previousLAC, ioe);
return buildErrorResponse(StatusCode.EIO, startTimeSw);
}
registerSuccessfulEvent(requestProcessor.getRequestStats().getLongPollPreWaitStats(), startTimeSw);
lastPhaseStartTime.reset().start();
if (watched) {
// successfully registered watcher to lac updates
if (logger.isTraceEnabled()) {
logger.trace("Waiting For LAC Update {}: Timeout {}", previousLAC, readRequest.getTimeOut());
}
synchronized (this) {
expirationTimerTask = requestTimer.newTimeout(timeout -> {
requestProcessor.getBookie().cancelWaitForLastAddConfirmedUpdate(ledgerId, this);
// When the timeout expires just get whatever is the current
// readLastConfirmed
LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
}, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
}
return null;
}
}
// request doesn't have timeout or fail to wait, proceed to read entry
return getReadResponse();
}
@Override
protected void executeOp() {
ReadResponse readResponse = getLongPollReadResponse();
if (null != readResponse) {
sendResponse(readResponse);
}
}
@Override
public void update(LastAddConfirmedUpdateNotification newLACNotification) {
if (newLACNotification.getLastAddConfirmed() > previousLAC) {
if (newLACNotification.getLastAddConfirmed() != Long.MAX_VALUE && !lastAddConfirmedUpdateTime.isPresent()) {
lastAddConfirmedUpdateTime = Optional.of(newLACNotification.getTimestamp());
}
if (logger.isTraceEnabled()) {
logger.trace("Last Add Confirmed Advanced to {} for request {}",
newLACNotification.getLastAddConfirmed(), request);
}
scheduleDeferredRead(false);
}
newLACNotification.recycle();
}
private synchronized void scheduleDeferredRead(boolean timeout) {
if (null == deferredTask) {
if (logger.isTraceEnabled()) {
logger.trace("Deferred Task, expired: {}, request: {}", timeout, request);
}
try {
shouldReadEntry = true;
deferredTask = longPollThreadPool.submit(this);
} catch (RejectedExecutionException exc) {
// If the threadPool has been shutdown, simply drop the task
}
if (null != expirationTimerTask) {
expirationTimerTask.cancel();
}
registerEvent(timeout, requestProcessor.getRequestStats().getLongPollWaitStats(), lastPhaseStartTime);
lastPhaseStartTime.reset().start();
}
}
}