blob: d9269ec83b1796a6f827604b3284483c573c79b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.MetadataCompressionConfig;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
@Slf4j
public class MetaStoreImpl implements MetaStore, Consumer<Notification> {
private static final String BASE_NODE = "/managed-ledgers";
private static final String PREFIX = BASE_NODE + "/";
private final MetadataStore store;
private final OrderedExecutor executor;
private static final int MAGIC_MANAGED_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
private final MetadataCompressionConfig ledgerInfoCompressionConfig;
private final MetadataCompressionConfig cursorInfoCompressionConfig;
private final Map<String, UpdateCallback<ManagedLedgerInfo>> managedLedgerInfoUpdateCallbackMap;
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
this.store = store;
this.executor = executor;
this.ledgerInfoCompressionConfig = MetadataCompressionConfig.noCompression;
this.cursorInfoCompressionConfig = MetadataCompressionConfig.noCompression;
managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>();
if (store != null) {
store.registerListener(this);
}
}
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor,
MetadataCompressionConfig ledgerInfoCompressionConfig,
MetadataCompressionConfig cursorInfoCompressionConfig) {
this.store = store;
this.executor = executor;
this.ledgerInfoCompressionConfig = ledgerInfoCompressionConfig;
this.cursorInfoCompressionConfig = cursorInfoCompressionConfig;
managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>();
if (store != null) {
store.registerListener(this);
}
}
@Override
public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map<String, String> properties,
MetaStoreCallback<ManagedLedgerInfo> callback) {
// Try to get the content or create an empty node
String path = PREFIX + ledgerName;
store.get(path)
.thenAcceptAsync(optResult -> {
if (optResult.isPresent()) {
ManagedLedgerInfo info;
try {
info = parseManagedLedgerInfo(optResult.get().getValue());
info = updateMLInfoTimestamp(info);
callback.operationComplete(info, optResult.get().getStat());
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(getException(e));
}
} else {
// Z-node doesn't exist
if (createIfMissing) {
log.info("Creating '{}'", path);
store.put(path, new byte[0], Optional.of(-1L))
.thenAccept(stat -> {
ManagedLedgerInfo.Builder ledgerBuilder = ManagedLedgerInfo.newBuilder();
if (properties != null) {
properties.forEach((k, v) -> {
ledgerBuilder.addProperties(
MLDataFormats.KeyValue.newBuilder()
.setKey(k)
.setValue(v)
.build());
});
}
callback.operationComplete(ledgerBuilder.build(), stat);
}).exceptionally(ex -> {
callback.operationFailed(getException(ex));
return null;
});
} else {
// Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this
// point
callback.operationFailed(new MetadataNotFoundException("Managed ledger not found"));
}
}
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
try {
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
} catch (RejectedExecutionException e) {
//executor maybe shutdown, use common pool to run callback.
CompletableFuture.runAsync(() -> callback.operationFailed(getException(ex)));
}
return null;
});
}
public CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name) {
CompletableFuture<Map<String, String>> result = new CompletableFuture<>();
getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
HashMap<String, String> propertiesMap = new HashMap<>(mlInfo.getPropertiesCount());
if (mlInfo.getPropertiesCount() > 0) {
for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
MLDataFormats.KeyValue property = mlInfo.getProperties(i);
propertiesMap.put(property.getKey(), property.getValue());
}
}
result.complete(propertiesMap);
}
@Override
public void operationFailed(MetaStoreException e) {
if (e instanceof MetadataNotFoundException) {
result.complete(new HashMap<>());
} else {
result.completeExceptionally(e);
}
}
});
return result;
}
@Override
public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
MetaStoreCallback<Void> callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updating metadata version={} with content={}", ledgerName, stat, mlInfo);
}
String path = PREFIX + ledgerName;
store.put(path, compressLedgerInfo(mlInfo), Optional.of(stat.getVersion()))
.thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion),
executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
@Override
public void getCursors(String ledgerName, MetaStoreCallback<List<String>> callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] Get cursors list", ledgerName);
}
String path = PREFIX + ledgerName;
store.getChildren(path)
.thenAcceptAsync(cursors -> callback.operationComplete(cursors, null), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
@Override
public void asyncGetCursorInfo(String ledgerName, String cursorName,
MetaStoreCallback<ManagedCursorInfo> callback) {
String path = PREFIX + ledgerName + "/" + cursorName;
if (log.isDebugEnabled()) {
log.debug("Reading from {}", path);
}
store.get(path)
.thenAcceptAsync(optRes -> {
if (optRes.isPresent()) {
try {
ManagedCursorInfo info = parseManagedCursorInfo(optRes.get().getValue());
callback.operationComplete(info, optRes.get().getStat());
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(getException(e));
}
} else {
callback.operationFailed(new MetadataNotFoundException("Cursor metadata not found"));
}
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
@Override
public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Stat stat,
MetaStoreCallback<Void> callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Updating cursor info ledgerId={} mark-delete={}:{} lastActive={}",
ledgerName, cursorName, info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(),
info.getMarkDeleteEntryId(), info.getLastActive());
}
String path = PREFIX + ledgerName + "/" + cursorName;
byte[] content = compressCursorInfo(info);
long expectedVersion;
if (stat != null) {
expectedVersion = stat.getVersion();
if (log.isDebugEnabled()) {
log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
} else {
expectedVersion = -1;
if (log.isDebugEnabled()) {
log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
}
store.put(path, content, Optional.of(expectedVersion))
.thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
@Override
public void asyncRemoveCursor(String ledgerName, String cursorName, MetaStoreCallback<Void> callback) {
String path = PREFIX + ledgerName + "/" + cursorName;
log.info("[{}] Remove cursor={}", ledgerName, cursorName);
store.delete(path, Optional.empty())
.thenAcceptAsync(v -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] cursor delete done", ledgerName, cursorName);
}
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, () -> {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof MetadataStoreException.NotFoundException){
log.info("[{}] [{}] cursor delete done because it did not exist.", ledgerName, cursorName);
callback.operationComplete(null, null);
return;
}
callback.operationFailed(getException(ex));
});
return null;
});
}
@Override
public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> callback) {
log.info("[{}] Remove ManagedLedger", ledgerName);
String path = PREFIX + ledgerName;
store.delete(path, Optional.empty())
.thenAcceptAsync(v -> {
if (log.isDebugEnabled()) {
log.debug("[{}] managed ledger delete done", ledgerName);
}
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
@Override
public Iterable<String> getManagedLedgers() throws MetaStoreException {
try {
return store.getChildren(BASE_NODE).join();
} catch (CompletionException e) {
throw getException(e);
}
}
@Override
public CompletableFuture<Boolean> asyncExists(String path) {
return store.exists(PREFIX + path);
}
@Override
public void watchManagedLedgerInfo(String ledgerName, UpdateCallback<ManagedLedgerInfo> callback) {
managedLedgerInfoUpdateCallbackMap.put(PREFIX + ledgerName, callback);
}
@Override
public void unwatchManagedLedgerInfo(String ledgerName) {
managedLedgerInfoUpdateCallbackMap.remove(PREFIX + ledgerName);
}
@Override
public void accept(Notification notification) {
if (!notification.getPath().startsWith(PREFIX) || notification.getType() != NotificationType.Modified) {
return;
}
UpdateCallback<ManagedLedgerInfo> callback = managedLedgerInfoUpdateCallbackMap.get(notification.getPath());
if (callback == null) {
return;
}
String ledgerName = notification.getPath().substring(PREFIX.length());
store.get(notification.getPath()).thenAcceptAsync(optResult -> {
if (optResult.isPresent()) {
ManagedLedgerInfo info;
try {
info = parseManagedLedgerInfo(optResult.get().getValue());
info = updateMLInfoTimestamp(info);
callback.onUpdate(info, optResult.get().getStat());
} catch (InvalidProtocolBufferException e) {
log.error("[{}] Error when parseManagedLedgerInfo", ledgerName, e);
}
}
}, executor.chooseThread(ledgerName)).exceptionally(ex -> {
log.error("[{}] Error when read ManagedLedgerInfo", ledgerName, ex);
return null;
});
}
//
// update timestamp if missing or 0
// 3 cases - timestamp does not exist for ledgers serialized before
// - timestamp is 0 for a ledger in recovery
// - ledger has timestamp which is the normal case now
private static ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
List<ManagedLedgerInfo.LedgerInfo> infoList = new ArrayList<>(info.getLedgerInfoCount());
long currentTime = System.currentTimeMillis();
for (ManagedLedgerInfo.LedgerInfo ledgerInfo : info.getLedgerInfoList()) {
if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0) {
ManagedLedgerInfo.LedgerInfo.Builder singleInfoBuilder = ledgerInfo.toBuilder();
singleInfoBuilder.setTimestamp(currentTime);
infoList.add(singleInfoBuilder.build());
} else {
infoList.add(ledgerInfo);
}
}
ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder();
mlInfo.addAllLedgerInfo(infoList);
if (info.hasTerminatedPosition()) {
mlInfo.setTerminatedPosition(info.getTerminatedPosition());
}
mlInfo.addAllProperties(info.getPropertiesList());
return mlInfo.build();
}
private static MetaStoreException getException(Throwable t) {
if (t.getCause() instanceof MetadataStoreException.BadVersionException) {
return new ManagedLedgerException.BadVersionException(t.getMessage());
} else {
return new MetaStoreException(t);
}
}
public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
CompressionType compressionType = ledgerInfoCompressionConfig.getCompressionType();
if (compressionType.equals(CompressionType.NONE)) {
return managedLedgerInfo.toByteArray();
}
int uncompressedSize = managedLedgerInfo.getSerializedSize();
if (uncompressedSize > ledgerInfoCompressionConfig.getCompressSizeThresholdInBytes()) {
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
.newBuilder()
.setCompressionType(compressionType)
.setUncompressedSize(uncompressedSize)
.build();
return compressManagedInfo(managedLedgerInfo.toByteArray(), mlInfoMetadata.toByteArray(),
mlInfoMetadata.getSerializedSize(), compressionType);
}
return managedLedgerInfo.toByteArray();
}
public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) {
CompressionType compressionType = cursorInfoCompressionConfig.getCompressionType();
if (compressionType.equals(CompressionType.NONE)) {
return managedCursorInfo.toByteArray();
}
int uncompressedSize = managedCursorInfo.getSerializedSize();
if (uncompressedSize > cursorInfoCompressionConfig.getCompressSizeThresholdInBytes()) {
MLDataFormats.ManagedCursorInfoMetadata metadata = MLDataFormats.ManagedCursorInfoMetadata
.newBuilder()
.setCompressionType(compressionType)
.setUncompressedSize(uncompressedSize)
.build();
return compressManagedInfo(managedCursorInfo.toByteArray(), metadata.toByteArray(),
metadata.getSerializedSize(), compressionType);
}
return managedCursorInfo.toByteArray();
}
public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
if (metadataBytes != null) {
try {
MLDataFormats.ManagedLedgerInfoMetadata metadata =
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
return ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
} catch (Exception e) {
log.error("Failed to parse managedLedgerInfo metadata, "
+ "fall back to parse managedLedgerInfo directly.", e);
return ManagedLedgerInfo.parseFrom(data);
} finally {
byteBuf.release();
}
} else {
return ManagedLedgerInfo.parseFrom(data);
}
}
public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
if (metadataBytes != null) {
try {
MLDataFormats.ManagedCursorInfoMetadata metadata =
MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
return ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
} catch (Exception e) {
log.error("Failed to parse ManagedCursorInfo metadata, "
+ "fall back to parse ManagedCursorInfo directly", e);
return ManagedCursorInfo.parseFrom(data);
} finally {
byteBuf.release();
}
} else {
return ManagedCursorInfo.parseFrom(data);
}
}
/**
* Compress Managed Info data such as LedgerInfo, CursorInfo.
*
* compression data structure
* [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
*/
private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSerializedSize,
MLDataFormats.CompressionType compressionType) {
if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
return info;
}
ByteBuf metadataByteBuf = null;
ByteBuf encodeByteBuf = null;
try {
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
metadataSerializedSize + 6);
metadataByteBuf.writeShort(MAGIC_MANAGED_INFO_METADATA);
metadataByteBuf.writeInt(metadataSerializedSize);
metadataByteBuf.writeBytes(metadata);
encodeByteBuf = getCompressionCodec(compressionType)
.encode(Unpooled.wrappedBuffer(info));
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
compositeByteBuf.readBytes(dataBytes);
return dataBytes;
} finally {
if (metadataByteBuf != null) {
metadataByteBuf.release();
}
if (encodeByteBuf != null) {
encodeByteBuf.release();
}
}
}
private byte[] extractCompressMetadataBytes(ByteBuf data) {
if (data.readableBytes() > 0 && data.readShort() == MAGIC_MANAGED_INFO_METADATA) {
int metadataSize = data.readInt();
byte[] metadataBytes = new byte[metadataSize];
data.readBytes(metadataBytes);
return metadataBytes;
}
return null;
}
private CompressionCodec getCompressionCodec(CompressionType compressionType) {
return CompressionCodecProvider.getCompressionCodec(
org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name()));
}
}