blob: ac2e746ac4676f54540b60e9c7b64d61e0cec121 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.RejectedExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
@Slf4j
public class MetaStoreImpl implements MetaStore {
private static final String BASE_NODE = "/managed-ledgers";
private static final String PREFIX = BASE_NODE + "/";
private final MetadataStore store;
private final OrderedExecutor executor;
private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
private final CompressionType compressionType;
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
this.store = store;
this.executor = executor;
this.compressionType = CompressionType.NONE;
}
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) {
this.store = store;
this.executor = executor;
CompressionType finalCompressionType;
if (compressionType != null) {
try {
finalCompressionType = CompressionType.valueOf(compressionType);
} catch (Exception e) {
log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage());
throw e;
}
} else {
finalCompressionType = CompressionType.NONE;
}
this.compressionType = finalCompressionType;
}
@Override
public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
MetaStoreCallback<ManagedLedgerInfo> callback) {
// Try to get the content or create an empty node
String path = PREFIX + ledgerName;
store.get(path)
.thenAcceptAsync(optResult -> {
if (optResult.isPresent()) {
ManagedLedgerInfo info;
try {
info = parseManagedLedgerInfo(optResult.get().getValue());
info = updateMLInfoTimestamp(info);
callback.operationComplete(info, optResult.get().getStat());
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(getException(e));
}
} else {
// Z-node doesn't exist
if (createIfMissing) {
log.info("Creating '{}'", path);
store.put(path, new byte[0], Optional.of(-1L))
.thenAccept(stat -> {
ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
callback.operationComplete(info, stat);
}).exceptionally(ex -> {
callback.operationFailed(getException(ex));
return null;
});
} else {
// Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this
// point
callback.operationFailed(new MetadataNotFoundException("Managed ledger not found"));
}
}
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
try {
executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
} catch (RejectedExecutionException e) {
//executor maybe shutdown, use common pool to run callback.
CompletableFuture.runAsync(() -> callback.operationFailed(getException(ex)));
}
return null;
});
}
@Override
public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
MetaStoreCallback<Void> callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updating metadata version={} with content={}", ledgerName, stat, mlInfo);
}
String path = PREFIX + ledgerName;
store.put(path, compressLedgerInfo(mlInfo), Optional.of(stat.getVersion()))
.thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion),
executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
return null;
});
}
@Override
public void getCursors(String ledgerName, MetaStoreCallback<List<String>> callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] Get cursors list", ledgerName);
}
String path = PREFIX + ledgerName;
store.getChildren(path)
.thenAcceptAsync(cursors -> callback.operationComplete(cursors, null), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
return null;
});
}
@Override
public void asyncGetCursorInfo(String ledgerName, String cursorName,
MetaStoreCallback<ManagedCursorInfo> callback) {
String path = PREFIX + ledgerName + "/" + cursorName;
if (log.isDebugEnabled()) {
log.debug("Reading from {}", path);
}
store.get(path)
.thenAcceptAsync(optRes -> {
if (optRes.isPresent()) {
try {
ManagedCursorInfo info = ManagedCursorInfo.parseFrom(optRes.get().getValue());
callback.operationComplete(info, optRes.get().getStat());
} catch (InvalidProtocolBufferException e) {
callback.operationFailed(getException(e));
}
} else {
callback.operationFailed(new MetadataNotFoundException("Cursor metadata not found"));
}
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
return null;
});
}
@Override
public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Stat stat,
MetaStoreCallback<Void> callback) {
log.info("[{}] [{}] Updating cursor info ledgerId={} mark-delete={}:{}", ledgerName, cursorName,
info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
String path = PREFIX + ledgerName + "/" + cursorName;
byte[] content = info.toByteArray(); // Binary format
long expectedVersion;
if (stat != null) {
expectedVersion = stat.getVersion();
if (log.isDebugEnabled()) {
log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
} else {
expectedVersion = -1;
if (log.isDebugEnabled()) {
log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
}
store.put(path, content, Optional.of(expectedVersion))
.thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
return null;
});
}
@Override
public void asyncRemoveCursor(String ledgerName, String cursorName, MetaStoreCallback<Void> callback) {
String path = PREFIX + ledgerName + "/" + cursorName;
log.info("[{}] Remove consumer={}", ledgerName, cursorName);
store.delete(path, Optional.empty())
.thenAcceptAsync(v -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] cursor delete done", ledgerName, cursorName);
}
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
return null;
});
}
@Override
public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> callback) {
log.info("[{}] Remove ManagedLedger", ledgerName);
String path = PREFIX + ledgerName;
store.delete(path, Optional.empty())
.thenAcceptAsync(v -> {
if (log.isDebugEnabled()) {
log.debug("[{}] managed ledger delete done", ledgerName);
}
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
return null;
});
}
@Override
public Iterable<String> getManagedLedgers() throws MetaStoreException {
try {
return store.getChildren(BASE_NODE).join();
} catch (CompletionException e) {
throw getException(e);
}
}
@Override
public CompletableFuture<Boolean> asyncExists(String path) {
return store.exists(PREFIX + path);
}
//
// update timestamp if missing or 0
// 3 cases - timestamp does not exist for ledgers serialized before
// - timestamp is 0 for a ledger in recovery
// - ledger has timestamp which is the normal case now
private static ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
List<ManagedLedgerInfo.LedgerInfo> infoList = new ArrayList<>(info.getLedgerInfoCount());
long currentTime = System.currentTimeMillis();
for (ManagedLedgerInfo.LedgerInfo ledgerInfo : info.getLedgerInfoList()) {
if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0) {
ManagedLedgerInfo.LedgerInfo.Builder singleInfoBuilder = ledgerInfo.toBuilder();
singleInfoBuilder.setTimestamp(currentTime);
infoList.add(singleInfoBuilder.build());
} else {
infoList.add(ledgerInfo);
}
}
ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder();
mlInfo.addAllLedgerInfo(infoList);
if (info.hasTerminatedPosition()) {
mlInfo.setTerminatedPosition(info.getTerminatedPosition());
}
mlInfo.addAllProperties(info.getPropertiesList());
return mlInfo.build();
}
private static MetaStoreException getException(Throwable t) {
if (t.getCause() instanceof MetadataStoreException.BadVersionException) {
return new ManagedLedgerException.BadVersionException(t.getMessage());
} else {
return new MetaStoreException(t);
}
}
/**
* Compress ManagedLedgerInfo data.
*
* compression data structure
* [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
*/
public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
return managedLedgerInfo.toByteArray();
}
ByteBuf metadataByteBuf = null;
ByteBuf encodeByteBuf = null;
try {
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
.newBuilder()
.setCompressionType(compressionType)
.setUncompressedSize(managedLedgerInfo.getSerializedSize())
.build();
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(
mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6);
metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
encodeByteBuf = getCompressionCodec(compressionType)
.encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
compositeByteBuf.readBytes(dataBytes);
return dataBytes;
} finally {
if (metadataByteBuf != null) {
metadataByteBuf.release();
}
if (encodeByteBuf != null) {
encodeByteBuf.release();
}
}
}
public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
ByteBuf decodeByteBuf = null;
try {
int metadataSize = byteBuf.readInt();
byte[] metadataBytes = new byte[metadataSize];
byteBuf.readBytes(metadataBytes);
MLDataFormats.ManagedLedgerInfoMetadata metadata =
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
long unpressedSize = metadata.getUncompressedSize();
decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, (int) unpressedSize);
byte[] decodeBytes;
// couldn't decode data by ZLIB compression byteBuf array() directly
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
decodeBytes = decodeByteBuf.array();
} else {
decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()];
decodeByteBuf.readBytes(decodeBytes);
}
return ManagedLedgerInfo.parseFrom(decodeBytes);
} catch (Exception e) {
log.error("Failed to parse managedLedgerInfo metadata, "
+ "fall back to parse managedLedgerInfo directly.", e);
return ManagedLedgerInfo.parseFrom(data);
} finally {
if (decodeByteBuf != null) {
decodeByteBuf.release();
}
byteBuf.release();
}
} else {
return ManagedLedgerInfo.parseFrom(data);
}
}
private CompressionCodec getCompressionCodec(CompressionType compressionType) {
return CompressionCodecProvider.getCompressionCodec(
org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name()));
}
}