blob: c9e6def287e52fa05cd39af200d84e7e8f0996c0 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.SyncCallbackUtils.LastAddConfirmedCallback;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
interface ExplicitLacFlushPolicy {
void stopExplicitLacFlush();
void updatePiggyBackedLac(long piggyBackedLac);
ExplicitLacFlushPolicy VOID_EXPLICITLAC_FLUSH_POLICY = new ExplicitLacFlushPolicy() {
@Override
public void stopExplicitLacFlush() {
// void method
}
@Override
public void updatePiggyBackedLac(long piggyBackedLac) {
// void method
}
};
class ExplicitLacFlushPolicyImpl implements ExplicitLacFlushPolicy {
static final Logger LOG = LoggerFactory.getLogger(ExplicitLacFlushPolicyImpl.class);
volatile long piggyBackedLac = LedgerHandle.INVALID_ENTRY_ID;
volatile long explicitLac = LedgerHandle.INVALID_ENTRY_ID;
final LedgerHandle lh;
final ClientContext clientCtx;
ScheduledFuture<?> scheduledFuture;
ExplicitLacFlushPolicyImpl(LedgerHandle lh,
ClientContext clientCtx) {
this.lh = lh;
this.clientCtx = clientCtx;
scheduleExplictLacFlush();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduled Explicit Last Add Confirmed Update");
}
}
private long getExplicitLac() {
return explicitLac;
}
private void setExplicitLac(long explicitLac) {
this.explicitLac = explicitLac;
}
private long getPiggyBackedLac() {
return piggyBackedLac;
}
public void setPiggyBackedLac(long piggyBackedLac) {
this.piggyBackedLac = piggyBackedLac;
}
private void scheduleExplictLacFlush() {
final SafeRunnable updateLacTask = new SafeRunnable() {
@Override
public void safeRun() {
// Made progress since previous explicitLAC through
// Piggyback, so no need to send an explicit LAC update to
// bookies.
if (getExplicitLac() < getPiggyBackedLac()) {
if (LOG.isDebugEnabled()) {
LOG.debug("ledgerid: {}", lh.getId());
LOG.debug("explicitLac:{} piggybackLac:{}", getExplicitLac(), getPiggyBackedLac());
}
setExplicitLac(getPiggyBackedLac());
return;
}
if (lh.getLastAddConfirmed() > getExplicitLac()) {
// Send Explicit LAC
if (LOG.isDebugEnabled()) {
LOG.debug("ledgerid: {}", lh.getId());
}
asyncExplicitLacFlush(lh.getLastAddConfirmed());
setExplicitLac(lh.getLastAddConfirmed());
if (LOG.isDebugEnabled()) {
LOG.debug("After sending explict LAC lac: {} explicitLac:{}", lh.getLastAddConfirmed(),
getExplicitLac());
}
}
}
@Override
public String toString() {
return String.format("UpdateLacTask ledgerId - (%d)", lh.getId());
}
};
try {
long explicitLacIntervalInMs = clientCtx.getConf().explicitLacInterval;
scheduledFuture = clientCtx.getScheduler().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask,
explicitLacIntervalInMs, explicitLacIntervalInMs, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException re) {
LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}",
lh.getId(), re);
}
}
/**
* Make a LastAddUpdate request.
*/
void asyncExplicitLacFlush(final long explicitLac) {
final LastAddConfirmedCallback cb = LastAddConfirmedCallback.INSTANCE;
final PendingWriteLacOp op = new PendingWriteLacOp(lh, clientCtx, lh.getCurrentEnsemble(), cb, null);
op.setLac(explicitLac);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending Explicit LAC: {}", explicitLac);
}
clientCtx.getMainWorkerPool().submit(new SafeRunnable() {
@Override
public void safeRun() {
ByteBufList toSend = lh.macManager
.computeDigestAndPackageForSendingLac(lh.getLastAddConfirmed());
op.initiate(toSend);
}
});
} catch (RejectedExecutionException e) {
cb.addLacComplete(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
lh, null);
}
}
@Override
public void stopExplicitLacFlush() {
scheduledFuture.cancel(true);
}
@Override
public void updatePiggyBackedLac(long piggyBackedLac) {
setPiggyBackedLac(piggyBackedLac);
}
}
}