blob: d974d4b6d3130fa1c522504770f3a83e6fd6ec14 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.metadata.etcd.helpers;
import com.google.common.annotations.VisibleForTesting;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.common.exception.ClosedClientException;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchResponse;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.metadata.etcd.EtcdWatchClient;
import org.apache.bookkeeper.metadata.etcd.EtcdWatcher;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
/**
* A helper class to read the stream of values of a given key.
*/
@Slf4j
public class ValueStream<T> implements BiConsumer<WatchResponse, Throwable>, AutoCloseable {
private final Client client;
private final boolean ownWatchClient;
private final EtcdWatchClient watchClient;
private final Function<ByteSequence, T> encoder;
private final ByteSequence key;
private final Map<Consumer<Versioned<T>>, RevisionedConsumer<T>> consumers =
new HashMap<>();
private volatile T localValue = null;
private volatile long revision = -1L;
private CompletableFuture<EtcdWatcher> watchFuture = null;
private CompletableFuture<Void> closeFuture = null;
public ValueStream(Client client,
Function<ByteSequence, T> encoder,
ByteSequence key) {
this(client, new EtcdWatchClient(client), encoder, key);
}
public ValueStream(Client client,
EtcdWatchClient watchClient,
Function<ByteSequence, T> encoder,
ByteSequence key) {
this.client = client;
this.watchClient = watchClient;
this.ownWatchClient = false;
this.encoder = encoder;
this.key = key;
}
public CompletableFuture<Versioned<T>> read() {
return client.getKVClient().get(
key
).thenApply(getResp -> {
boolean updated = updateLocalValue(getResp);
Versioned<T> localValue = getLocalValue();
try {
return localValue;
} finally {
if (updated) {
notifyConsumers(localValue);
}
}
});
}
@VisibleForTesting
public int getNumConsumers() {
synchronized (consumers) {
return consumers.size();
}
}
private void notifyConsumers(Versioned<T> localValue) {
synchronized (consumers) {
consumers.values().forEach(consumer -> consumer.accept(localValue));
}
}
private synchronized boolean updateLocalValue(GetResponse response) {
if (revision < response.getHeader().getRevision()) {
revision = response.getHeader().getRevision();
if (response.getCount() > 0) {
localValue = encoder.apply(response.getKvs().get(0).getValue());
} else {
localValue = null;
}
return true;
} else {
return false;
}
}
private synchronized Versioned<T> processWatchResponse(WatchResponse response) {
if (null != closeFuture) {
return null;
}
if (log.isDebugEnabled()) {
log.debug("Received watch response : revision = {}, {} events = {}",
response.getHeader().getRevision(), response.getEvents().size(), response.getEvents());
}
if (response.getHeader().getRevision() <= revision) {
return null;
}
revision = response.getHeader().getRevision();
response.getEvents().forEach(event -> {
switch (event.getEventType()) {
case PUT:
this.localValue = encoder.apply(event.getKeyValue().getValue());
break;
case DELETE:
this.localValue = null;
break;
default:
// ignore
break;
}
});
return getLocalValue();
}
@VisibleForTesting
synchronized Versioned<T> getLocalValue() {
return new Versioned<>(
localValue,
new LongVersion(revision)
);
}
private CompletableFuture<Versioned<T>> getOrRead() {
boolean shouldRead = false;
synchronized (this) {
if (revision < 0L) {
// the value is never cached.
shouldRead = true;
}
}
if (shouldRead) {
return read();
} else {
return FutureUtils.value(getLocalValue());
}
}
@VisibleForTesting
synchronized boolean isWatcherSet() {
return null != watchFuture;
}
private synchronized CompletableFuture<EtcdWatcher> getWatchFuture() {
return this.watchFuture;
}
@VisibleForTesting
public CompletableFuture<EtcdWatcher> waitUntilWatched() {
CompletableFuture<EtcdWatcher> wf;
while ((wf = getWatchFuture()) == null) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
if (log.isDebugEnabled()) {
log.debug("Interrupted at waiting until the key is watched", e);
}
}
}
return wf;
}
public CompletableFuture<Versioned<T>> readAndWatch(Consumer<Versioned<T>> consumer) {
final RevisionedConsumer<T> revisionedConsumer = new RevisionedConsumer<>(consumer);
final boolean consumerExisted;
synchronized (consumers) {
consumerExisted = (null != consumers.put(consumer, revisionedConsumer));
}
if (consumerExisted) {
return getOrRead();
}
return getOrRead()
.thenCompose(versionedVal -> {
long revision = ((LongVersion) versionedVal.getVersion()).getLongVersion();
synchronized (this) {
notifyConsumers(versionedVal);
}
return watchIfNeeded(revision).thenApply(ignored -> versionedVal);
});
}
public CompletableFuture<Boolean> unwatch(Consumer<Versioned<T>> consumer) {
boolean lastConsumer;
synchronized (consumers) {
lastConsumer = (null != consumers.remove(consumer) && consumers.isEmpty());
}
if (lastConsumer) {
return closeOrRewatch(false).thenApply(ignored -> true);
} else {
return FutureUtils.value(false);
}
}
private synchronized CompletableFuture<EtcdWatcher> watchIfNeeded(long revision) {
if (null != watchFuture) {
return watchFuture;
}
watchFuture = watch(revision);
return watchFuture;
}
private CompletableFuture<EtcdWatcher> watch(long revision) {
WatchOption.Builder optionBuilder = WatchOption.newBuilder()
.withRevision(revision);
return watchClient.watch(key, optionBuilder.build(), this)
.whenComplete((watcher, cause) -> {
if (null != cause) {
synchronized (ValueStream.this) {
watchFuture = null;
}
}
});
}
private CompletableFuture<Void> closeOrRewatch(boolean rewatch) {
CompletableFuture<EtcdWatcher> oldWatcherFuture;
synchronized (this) {
oldWatcherFuture = watchFuture;
if (rewatch && null == closeFuture) {
watchFuture = watch(revision);
} else {
watchFuture = null;
}
}
if (null != oldWatcherFuture) {
return oldWatcherFuture.thenCompose(EtcdWatcher::closeAsync);
} else {
return FutureUtils.Void();
}
}
@Override
public void accept(WatchResponse watchResponse, Throwable throwable) {
if (null == throwable) {
if (log.isDebugEnabled()) {
log.debug("Received watch response : revision = {}, {} events = {}",
watchResponse.getHeader().getRevision(),
watchResponse.getEvents().size(),
watchResponse.getEvents());
}
synchronized (this) {
Versioned<T> localValue = processWatchResponse(watchResponse);
if (null != localValue) {
notifyConsumers(localValue);
}
}
} else {
// rewatch if it is not a `ClosedClientException`
closeOrRewatch(!(throwable instanceof ClosedClientException));
}
}
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> future;
synchronized (this) {
if (null == closeFuture) {
closeFuture = closeOrRewatch(false).thenCompose(ignored -> {
if (ownWatchClient) {
return watchClient.closeAsync();
} else {
return FutureUtils.Void();
}
});
}
future = closeFuture;
}
return future;
}
@Override
public void close() {
try {
FutureUtils.result(closeAsync());
} catch (Exception e) {
log.warn("Encountered exceptions on closing key reader : {}", e.getMessage());
}
}
}