blob: 3c79203e821bcf981ca7360713d15ba0b817d80d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
/**
* Utility for callbacks.
*
*/
@Slf4j
class SyncCallbackUtils {
/**
* Wait for a result. This is convenience method to implement callbacks
*
* @param <T>
* @param future
* @return
* @throws InterruptedException
* @throws BKException
*/
public static <T> T waitForResult(CompletableFuture<T> future) throws InterruptedException, BKException {
try {
try {
/*
* CompletableFuture.get() in JDK8 spins before blocking and wastes CPU time.
* CompletableFuture.get(long, TimeUnit) blocks immediately (if the result is
* not yet available). While the implementation of get() has changed in JDK9
* (not spinning any more), using CompletableFuture.get(long, TimeUnit) allows
* us to avoid spinning for all current JDK versions.
*/
return future.get(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (TimeoutException eignore) {
// it's ok to return null if we timeout after 292 years (2^63 nanos)
return null;
}
} catch (ExecutionException err) {
if (err.getCause() instanceof BKException) {
throw (BKException) err.getCause();
} else {
BKException unexpectedConditionException =
BKException.create(BKException.Code.UnexpectedConditionException);
unexpectedConditionException.initCause(err.getCause());
throw unexpectedConditionException;
}
}
}
/**
* Handle the Response Code and transform it to a BKException.
*
* @param <T>
* @param rc
* @param result
* @param future
*/
public static <T> void finish(int rc, T result, CompletableFuture<? super T> future) {
if (rc != BKException.Code.OK) {
future.completeExceptionally(BKException.create(rc).fillInStackTrace());
} else {
future.complete(result);
}
}
static class SyncCreateCallback implements AsyncCallback.CreateCallback {
private final CompletableFuture<? super LedgerHandle> future;
public SyncCreateCallback(CompletableFuture<? super LedgerHandle> future) {
this.future = future;
}
/**
* Create callback implementation for synchronous create call.
*
* @param rc return code
* @param lh ledger handle object
* @param ctx optional control object
*/
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {
finish(rc, lh, future);
}
}
static class SyncCreateAdvCallback implements AsyncCallback.CreateCallback {
private final CompletableFuture<? super LedgerHandleAdv> future;
public SyncCreateAdvCallback(CompletableFuture<? super LedgerHandleAdv> future) {
this.future = future;
}
/**
* Create callback implementation for synchronous create call.
*
* @param rc return code
* @param lh ledger handle object
* @param ctx optional control object
*/
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {
if (lh == null || lh instanceof LedgerHandleAdv) {
finish(rc, (LedgerHandleAdv) lh, future);
} else {
finish(BKException.Code.UnexpectedConditionException, null, future);
}
}
}
static class SyncOpenCallback implements AsyncCallback.OpenCallback {
private final CompletableFuture<? super LedgerHandle> future;
public SyncOpenCallback(CompletableFuture<? super LedgerHandle> future) {
this.future = future;
}
/**
* Callback method for synchronous open operation.
*
* @param rc
* return code
* @param lh
* ledger handle
* @param ctx
* optional control object
*/
@Override
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
finish(rc, lh, future);
}
}
static class SyncDeleteCallback implements AsyncCallback.DeleteCallback {
private final CompletableFuture<Void> future;
public SyncDeleteCallback(CompletableFuture<Void> future) {
this.future = future;
}
/**
* Delete callback implementation for synchronous delete call.
*
* @param rc
* return code
* @param ctx
* optional control object
*/
@Override
public void deleteComplete(int rc, Object ctx) {
finish(rc, null, future);
}
}
static class LastAddConfirmedCallback implements AsyncCallback.AddLacCallback {
static final LastAddConfirmedCallback INSTANCE = new LastAddConfirmedCallback();
/**
* Implementation of callback interface for synchronous read method.
*
* @param rc
* return code
* @param lh
* ledger identifier
* @param ctx
* control object
*/
@Override
public void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
if (rc != BKException.Code.OK) {
log.warn("LastAddConfirmedUpdate failed: {} ", BKException.getMessage(rc));
} else {
if (log.isDebugEnabled()) {
log.debug("Callback LAC Updated for: {} ", lh.getId());
}
}
}
}
static class SyncReadCallback implements AsyncCallback.ReadCallback {
private final CompletableFuture<Enumeration<LedgerEntry>> future;
public SyncReadCallback(CompletableFuture<Enumeration<LedgerEntry>> future) {
this.future = future;
}
/**
* Implementation of callback interface for synchronous read method.
*
* @param rc
* return code
* @param lh
* ledger handle
* @param seq
* sequence of entries
* @param ctx
* control object
*/
@Override
public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
finish(rc, seq, future);
}
}
static class SyncAddCallback extends CompletableFuture<Long> implements AsyncCallback.AddCallback {
/**
* Implementation of callback interface for synchronous read method.
*
* @param rc
* return code
* @param lh
* ledger handle
* @param entry
* entry identifier
* @param ctx
* control object
*/
@Override
public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
finish(rc, entry, this);
}
}
static class FutureReadLastConfirmed extends CompletableFuture<Long>
implements AsyncCallback.ReadLastConfirmedCallback {
@Override
public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
finish(rc, lastConfirmed, this);
}
}
static class SyncReadLastConfirmedCallback implements AsyncCallback.ReadLastConfirmedCallback {
/**
* Implementation of callback interface for synchronous read last confirmed method.
*/
@Override
public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
LedgerHandle.LastConfirmedCtx lcCtx = (LedgerHandle.LastConfirmedCtx) ctx;
synchronized (lcCtx) {
lcCtx.setRC(rc);
lcCtx.setLastConfirmed(lastConfirmed);
lcCtx.notify();
}
}
}
static class SyncCloseCallback implements AsyncCallback.CloseCallback {
private final CompletableFuture<Void> future;
public SyncCloseCallback(CompletableFuture<Void> future) {
this.future = future;
}
/**
* Close callback method.
*
* @param rc
* @param lh
* @param ctx
*/
@Override
public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
finish(rc, null, future);
}
}
static class FutureReadLastConfirmedAndEntry
extends CompletableFuture<LastConfirmedAndEntry> implements AsyncCallback.ReadLastConfirmedAndEntryCallback {
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
LastConfirmedAndEntry result = LastConfirmedAndEntryImpl.create(lastConfirmed, entry);
finish(rc, result, this);
}
}
}