blob: 3a4d78ad66d96a7c6c4e72e868644c6e0a698fed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.stream.storage.impl.metadata;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.MIN_DATA_STREAM_ID;
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.validateNamespaceName;
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.validateStreamName;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.kv.op.CompareResult;
import org.apache.bookkeeper.api.kv.op.RangeOp;
import org.apache.bookkeeper.api.kv.op.TxnOp;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.Bytes;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stream.proto.NamespaceMetadata;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.bookkeeper.stream.proto.StreamName;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest.IdCase;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
/**
* A statestore backed implementation of {@link RootRangeStore}.
*/
@Slf4j
public class RootRangeStoreImpl
implements RootRangeStore {
private static final byte SYSTEM_TAG = (byte) 0xff;
private static final byte NS_NAME_TAG = (byte) 0x01;
private static final byte NS_ID_TAG = (byte) 0x02;
// separator used for separating streams within a same namespace
private static final byte NS_STREAM_NAME_SEP = (byte) 0x03;
private static final byte NS_STREAM_ID_SEP = (byte) 0x04;
private static final byte STREAM_ID_TAG = (byte) 0x05;
private static final byte NS_END_SEP = (byte) 0xff;
static final byte[] NS_ID_KEY = new byte[]{SYSTEM_TAG, 'n', 's', 'i', 'd'};
static final byte[] STREAM_ID_KEY = new byte[]{SYSTEM_TAG, 's', 't', 'r', 'e', 'a', 'm', 'i', 'd'};
/**
* ns name key: [NS_NAME_TAG][ns_name].
*/
static final byte[] getNamespaceNameKey(String nsName) {
byte[] nsNameBytes = nsName.getBytes(UTF_8);
byte[] nsNameKey = new byte[nsNameBytes.length + 1];
System.arraycopy(nsNameBytes, 0, nsNameKey, 1, nsNameBytes.length);
nsNameKey[0] = NS_NAME_TAG;
return nsNameKey;
}
/**
* ns id key: [NS_ID_TAG][ns_id].
*/
static final byte[] getNamespaceIdKey(long nsId) {
byte[] nsIdBytes = new byte[Long.BYTES + 1];
nsIdBytes[0] = NS_ID_TAG;
Bytes.toBytes(nsId, nsIdBytes, 1);
return nsIdBytes;
}
/**
* ns id end key: [NS_ID_TAG][ns_id][NS_END_SEP].
*/
static final byte[] getNamespaceIdEndKey(long nsId) {
byte[] nsIdBytes = new byte[Long.BYTES + 2];
nsIdBytes[0] = NS_ID_TAG;
Bytes.toBytes(nsId, nsIdBytes, 1);
nsIdBytes[Long.BYTES + 1] = NS_END_SEP;
return nsIdBytes;
}
/**
* stream name key: [NS_ID_TAG][ns_id][STREAM_NAME_SEP][stream_name].
*/
static final byte[] getStreamNameKey(long nsId, String streamName) {
byte[] streamNameBytes = streamName.getBytes(UTF_8);
byte[] streamNameKey = new byte[streamNameBytes.length + Long.BYTES + 2];
streamNameKey[0] = NS_ID_TAG;
Bytes.toBytes(nsId, streamNameKey, 1);
streamNameKey[Long.BYTES + 1] = NS_STREAM_NAME_SEP;
System.arraycopy(streamNameBytes, 0, streamNameKey, Long.BYTES + 2, streamNameBytes.length);
return streamNameKey;
}
/**
* stream name id: [NS_ID_TAG][ns_id][STREAM_ID_SEP][stream_id].
*/
static final byte[] getStreamIdKey(long nsId, long streamId) {
byte[] streamIdBytes = new byte[2 * Long.BYTES + 2];
streamIdBytes[0] = NS_ID_TAG;
Bytes.toBytes(nsId, streamIdBytes, 1);
streamIdBytes[Long.BYTES + 1] = NS_STREAM_ID_SEP;
Bytes.toBytes(streamId, streamIdBytes, Long.BYTES + 2);
return streamIdBytes;
}
/**
* stream id: [STREAM_ID_TAG][stream_id].
*/
static final byte[] getStreamIdKey(long streamId) {
byte[] streamIdBytes = new byte[Long.BYTES + 1];
streamIdBytes[0] = STREAM_ID_TAG;
Bytes.toBytes(streamId, streamIdBytes, 1);
return streamIdBytes;
}
private final MVCCAsyncStore<byte[], byte[]> store;
private final StorageContainerPlacementPolicy placementPolicy;
private final ScheduledExecutorService executor;
public RootRangeStoreImpl(MVCCAsyncStore<byte[], byte[]> store,
StorageContainerPlacementPolicy placementPolicy,
ScheduledExecutorService executor) {
this.store = store;
this.placementPolicy = placementPolicy;
this.executor = executor;
}
//
// Namespaces API
//
CompletableFuture<KeyValue<byte[], byte[]>> getValue(byte[] key) {
RangeOp<byte[], byte[]> op = store.getOpFactory().newRange(
key,
Options.get());
return store.range(op).thenApplyAsync(kvs -> {
try {
if (kvs.count() <= 0) {
return null;
} else {
return kvs.getKvsAndClear().get(0);
}
} finally {
kvs.close();
}
}).whenComplete((kv, cause) -> op.close());
}
@Override
public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
if (log.isTraceEnabled()) {
log.trace("Received CreateNamespace request : {}", request);
}
return CreateNamespaceProcessor.of().process(
this,
request,
executor);
}
StatusCode verifyCreateNamespaceRequest(CreateNamespaceRequest request) {
String colName = request.getName();
StatusCode code = StatusCode.SUCCESS;
if (!validateNamespaceName(colName)) {
log.error("Failed to create namespace due to invalid namespace name {}", colName);
code = StatusCode.INVALID_NAMESPACE_NAME;
}
return code;
}
CompletableFuture<CreateNamespaceResponse> doProcessCreateNamespaceRequest(CreateNamespaceRequest request) {
return getValue(NS_ID_KEY)
.thenCompose(nsIdKv -> {
long currentNsId, currentNsIdRev;
if (null == nsIdKv) {
currentNsId = -1L;
currentNsIdRev = -1L;
} else {
currentNsId = Bytes.toLong(nsIdKv.value(), 0);
currentNsIdRev = nsIdKv.modifiedRevision();
nsIdKv.close();
}
return executeCreateNamespaceTxn(currentNsId, currentNsIdRev, request);
});
}
private CompletableFuture<CreateNamespaceResponse> executeCreateNamespaceTxn(long currentNsId,
long currentNsIdRev,
CreateNamespaceRequest request) {
long namespaceId = currentNsId + 1;
String nsName = request.getName();
NamespaceMetadata metadata = NamespaceMetadata.newBuilder()
.setProps(NamespaceProperties.newBuilder()
.setNamespaceId(namespaceId)
.setNamespaceName(nsName)
.setDefaultStreamConf(request.getNsConf().getDefaultStreamConf()))
.build();
byte[] nsNameKey = getNamespaceNameKey(nsName);
byte[] nsNameVal = Bytes.toBytes(namespaceId);
byte[] nsIdKey = getNamespaceIdKey(namespaceId);
byte[] nsIdVal = metadata.toByteArray();
TxnOp<byte[], byte[]> txn = store.newTxn()
.If(
store.newCompareValue(CompareResult.EQUAL, nsNameKey, null),
currentNsIdRev < 0
? store.newCompareValue(CompareResult.EQUAL, NS_ID_KEY, null) :
store.newCompareModRevision(CompareResult.EQUAL, NS_ID_KEY, currentNsIdRev)
)
.Then(
store.newPut(nsNameKey, nsNameVal),
store.newPut(nsIdKey, nsIdVal),
store.newPut(NS_ID_KEY, Bytes.toBytes(namespaceId)))
.build();
return store.txn(txn)
.thenApply(txnResult -> {
try {
CreateNamespaceResponse.Builder respBuilder = CreateNamespaceResponse.newBuilder();
if (txnResult.isSuccess()) {
respBuilder.setCode(StatusCode.SUCCESS);
respBuilder.setNsProps(metadata.getProps());
} else {
// TODO: differentiate the error code
respBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR);
}
return respBuilder.build();
} finally {
txnResult.close();
}
})
.whenComplete((resp, cause) -> txn.close());
}
@Override
public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
return DeleteNamespaceProcessor.of().process(
this,
request,
executor);
}
StatusCode verifyDeleteNamespaceRequest(DeleteNamespaceRequest request) {
String colName = request.getName();
StatusCode code = StatusCode.SUCCESS;
if (!validateNamespaceName(colName)) {
log.error("Failed to delete namespace due to invalid namespace name {}", colName);
code = StatusCode.INVALID_NAMESPACE_NAME;
}
return code;
}
CompletableFuture<DeleteNamespaceResponse> doProcessDeleteNamespaceRequest(DeleteNamespaceRequest request) {
String nsName = request.getName();
return getNamespace(nsName)
.thenCompose(nsMetadata -> deleteNamespace(nsName, nsMetadata));
}
CompletableFuture<DeleteNamespaceResponse> deleteNamespace(String nsName, NamespaceMetadata nsMetadata) {
if (null == nsMetadata) {
return FutureUtils.value(
DeleteNamespaceResponse.newBuilder()
.setCode(StatusCode.NAMESPACE_NOT_FOUND)
.build());
}
byte[] nsNameKey = getNamespaceNameKey(nsName);
byte[] nsIdKey = getNamespaceIdKey(nsMetadata.getProps().getNamespaceId());
byte[] nsIdEndKey = getNamespaceIdEndKey(nsMetadata.getProps().getNamespaceId());
TxnOp<byte[], byte[]> txnOp = store.newTxn()
.If(
store.newCompareValue(CompareResult.NOT_EQUAL, nsNameKey, null),
store.newCompareValue(CompareResult.NOT_EQUAL, nsIdKey, null)
)
.Then(
store.newDelete(nsNameKey),
store.newDeleteRange(nsIdKey, nsIdEndKey)
)
.build();
return store.txn(txnOp).thenApply(txnResult -> {
try {
DeleteNamespaceResponse.Builder respBuilder = DeleteNamespaceResponse.newBuilder();
if (txnResult.isSuccess()) {
respBuilder.setCode(StatusCode.SUCCESS);
} else {
// TODO: differentiate the error code
respBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR);
}
return respBuilder.build();
} finally {
txnResult.close();
}
}).whenComplete((resp, cause) -> txnOp.close());
}
@Override
public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
String nsName = request.getName();
if (!validateNamespaceName(nsName)) {
log.error("Failed to get namespace due to invalid namespace name {}", nsName);
return FutureUtils.value(
GetNamespaceResponse.newBuilder()
.setCode(StatusCode.INVALID_NAMESPACE_NAME)
.build());
} else {
return getNamespace(request.getName()).thenApply(nsMetadata -> {
GetNamespaceResponse.Builder nsRespBuilder = GetNamespaceResponse.newBuilder();
if (null == nsMetadata) {
nsRespBuilder.setCode(StatusCode.NAMESPACE_NOT_FOUND);
} else {
nsRespBuilder.setCode(StatusCode.SUCCESS);
nsRespBuilder.setNsProps(nsMetadata.getProps());
}
return nsRespBuilder.build();
});
}
}
private CompletableFuture<NamespaceMetadata> getNamespace(long nsId) {
byte[] nsIdKey = getNamespaceIdKey(nsId);
return store.get(nsIdKey).thenCompose(nsIdVal -> {
try {
return FutureUtils.value(null != nsIdVal ? NamespaceMetadata.parseFrom(nsIdVal) : null);
} catch (InvalidProtocolBufferException e) {
return FutureUtils.exception(e);
}
});
}
private CompletableFuture<NamespaceMetadata> getNamespace(String nsName) {
byte[] nsNameKey = getNamespaceNameKey(nsName);
return store.get(nsNameKey)
.thenCompose(nsIdBytes -> {
if (null == nsIdBytes) {
return FutureUtils.value(null);
} else {
long nsId = Bytes.toLong(nsIdBytes, 0);
return getNamespace(nsId);
}
});
}
//
// Stream API
//
@Override
public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
String streamName = request.getName();
String nsName = request.getNsName();
StatusCode code = verifyStreamRequest(nsName, streamName);
if (StatusCode.SUCCESS != code) {
return FutureUtils.value(CreateStreamResponse.newBuilder().setCode(code).build());
}
return createStream(nsName, streamName, request.getStreamConf());
}
StatusCode verifyStreamRequest(String nsName, String streamName) {
StatusCode code = StatusCode.SUCCESS;
if (!validateNamespaceName(nsName)) {
log.error("Invalid namespace name {}", nsName);
code = StatusCode.INVALID_NAMESPACE_NAME;
} else if (!validateStreamName(streamName)) {
log.error("Invalid stream name {}", streamName);
code = StatusCode.INVALID_STREAM_NAME;
}
return code;
}
private CompletableFuture<CreateStreamResponse> createStream(String nsName,
String streamName,
StreamConfiguration streamConf) {
return getNamespace(nsName)
.thenCompose(nsMetadata -> createStream(
nsMetadata,
streamName,
streamConf
))
.exceptionally(cause ->
CreateStreamResponse.newBuilder().setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
}
private CompletableFuture<CreateStreamResponse> createStream(NamespaceMetadata nsMetadata,
String streamName,
StreamConfiguration streamConf) {
if (null == nsMetadata) {
return FutureUtils.value(CreateStreamResponse.newBuilder().setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
}
return getValue(STREAM_ID_KEY)
.thenCompose(streamIdKv -> {
long currentStreamId = -1L;
long currentStreamIdRev = -1L;
if (null != streamIdKv) {
currentStreamId = Bytes.toLong(streamIdKv.value(), 0);
currentStreamIdRev = streamIdKv.modifiedRevision();
streamIdKv.close();
}
return executeCreateStreamTxn(
nsMetadata.getProps().getNamespaceId(),
streamName,
streamConf,
currentStreamId,
currentStreamIdRev);
});
}
private CompletableFuture<CreateStreamResponse> executeCreateStreamTxn(long nsId,
String streamName,
StreamConfiguration streamConf,
long currentStreamId,
long currentStreamIdRev) {
long streamId;
if (currentStreamId < 0) {
streamId = MIN_DATA_STREAM_ID;
} else {
streamId = currentStreamId + 1;
}
long scId = placementPolicy.placeStreamRange(streamId, 0L);
StreamProperties streamProps = StreamProperties.newBuilder()
.setStreamId(streamId)
.setStreamName(streamName)
.setStorageContainerId(scId)
.setStreamConf(streamConf)
.build();
byte[] nsIdKey = getNamespaceIdKey(nsId);
byte[] streamNameKey = getStreamNameKey(nsId, streamName);
byte[] streamNameVal = Bytes.toBytes(streamId);
byte[] streamIdKey = getStreamIdKey(nsId, streamId);
byte[] streamIdVal = streamProps.toByteArray();
byte[] streamReverseIndexKey = getStreamIdKey(streamId);
byte[] streamReverseIndexValue = Bytes.toBytes(nsId);
TxnOp<byte[], byte[]> txn = store.newTxn()
.If(
store.newCompareValue(CompareResult.NOT_EQUAL, nsIdKey, null),
currentStreamIdRev < 0
? store.newCompareValue(CompareResult.EQUAL, STREAM_ID_KEY, null) :
store.newCompareModRevision(CompareResult.EQUAL, STREAM_ID_KEY, currentStreamIdRev),
store.newCompareValue(CompareResult.EQUAL, streamNameKey, null)
)
.Then(
store.newPut(streamNameKey, streamNameVal),
store.newPut(streamIdKey, streamIdVal),
store.newPut(streamReverseIndexKey, streamReverseIndexValue),
store.newPut(STREAM_ID_KEY, Bytes.toBytes(streamId))
)
.build();
return store.txn(txn)
.thenApply(txnResult -> {
try {
CreateStreamResponse.Builder respBuilder = CreateStreamResponse.newBuilder();
if (txnResult.isSuccess()) {
respBuilder.setCode(StatusCode.SUCCESS);
respBuilder.setStreamProps(streamProps);
} else {
// TODO: differentiate the error codes
respBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR);
}
return respBuilder.build();
} finally {
txnResult.close();
txn.close();
}
})
.exceptionally(cause -> {
txn.close();
return CreateStreamResponse.newBuilder().setCode(StatusCode.INTERNAL_SERVER_ERROR).build();
});
}
@Override
public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
String streamName = request.getName();
String nsName = request.getNsName();
StatusCode code = verifyStreamRequest(nsName, streamName);
if (StatusCode.SUCCESS != code) {
return FutureUtils.value(DeleteStreamResponse.newBuilder().setCode(code).build());
}
return deleteStream(nsName, streamName);
}
private CompletableFuture<DeleteStreamResponse> deleteStream(String nsName,
String streamName) {
return getNamespace(nsName)
.thenCompose(nsMetadata -> deleteStream(nsMetadata, streamName));
}
private CompletableFuture<DeleteStreamResponse> deleteStream(NamespaceMetadata nsMetadata,
String streamName) {
if (null == nsMetadata) {
return FutureUtils.value(DeleteStreamResponse.newBuilder().setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
}
long nsId = nsMetadata.getProps().getNamespaceId();
byte[] streamNameKey = getStreamNameKey(nsId, streamName);
return store.get(streamNameKey).thenCompose(streamIdBytes -> {
if (null == streamIdBytes) {
return FutureUtils.value(
DeleteStreamResponse.newBuilder()
.setCode(StatusCode.STREAM_NOT_FOUND)
.build());
}
long streamId = Bytes.toLong(streamIdBytes, 0);
return deleteStream(
nsId,
streamId,
streamName);
});
}
private CompletableFuture<DeleteStreamResponse> deleteStream(long nsId,
long streamId,
String streamName) {
byte[] nsIdKey = getNamespaceIdKey(nsId);
byte[] streamNameKey = getStreamNameKey(nsId, streamName);
byte[] streamIdKey = getStreamIdKey(nsId, streamId);
byte[] streamReverseIndexKey = getStreamIdKey(streamId);
TxnOp<byte[], byte[]> txnOp = store.newTxn()
.If(
store.newCompareValue(CompareResult.NOT_EQUAL, nsIdKey, null),
store.newCompareValue(CompareResult.NOT_EQUAL, streamNameKey, null),
store.newCompareValue(CompareResult.NOT_EQUAL, streamIdKey, null),
store.newCompareValue(CompareResult.NOT_EQUAL, streamReverseIndexKey, null)
)
.Then(
store.newDelete(streamIdKey),
store.newDelete(streamNameKey),
store.newDelete(streamReverseIndexKey)
)
.build();
return store.txn(txnOp).thenApply(txnResult -> {
try {
DeleteStreamResponse.Builder respBuilder = DeleteStreamResponse.newBuilder();
if (txnResult.isSuccess()) {
respBuilder.setCode(StatusCode.SUCCESS);
} else {
respBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR);
}
return respBuilder.build();
} finally {
txnResult.close();
}
}).whenComplete((resp, cause) -> txnOp.close());
}
private CompletableFuture<GetStreamResponse> streamPropertiesToResponse(
CompletableFuture<StreamProperties> propsFuture
) {
GetStreamResponse.Builder respBuilder = GetStreamResponse.newBuilder();
return propsFuture.thenCompose(streamProps -> {
if (null == streamProps) {
return FutureUtils.value(respBuilder.setCode(StatusCode.STREAM_NOT_FOUND).build());
} else {
return FutureUtils.value(respBuilder
.setCode(StatusCode.SUCCESS)
.setStreamProps(streamProps)
.build());
}
}).exceptionally(cause ->
respBuilder
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
.build()
);
}
@Override
public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
if (IdCase.STREAM_ID == request.getIdCase()) {
return streamPropertiesToResponse(
getStreamProps(request.getStreamId()));
} else if (IdCase.STREAM_NAME == request.getIdCase()) {
return getStreamProps(request.getStreamName());
} else {
return FutureUtils.value(GetStreamResponse.newBuilder()
.setCode(StatusCode.ILLEGAL_OP)
.build());
}
}
CompletableFuture<StreamProperties> getStreamProps(long streamId) {
byte[] streamReverseIndexKey = getStreamIdKey(streamId);
return store.get(streamReverseIndexKey).thenCompose(nsIdBytes -> {
if (null == nsIdBytes) {
return FutureUtils.value(null);
}
long nsId = Bytes.toLong(nsIdBytes, 0);
return getStreamProps(nsId, streamId);
});
}
CompletableFuture<GetStreamResponse> getStreamProps(StreamName streamName) {
StatusCode code = verifyStreamRequest(
streamName.getNamespaceName(),
streamName.getStreamName());
if (StatusCode.SUCCESS != code) {
return FutureUtils.value(GetStreamResponse.newBuilder()
.setCode(code).build());
}
byte[] nsNameKey = getNamespaceNameKey(streamName.getNamespaceName());
return store.get(nsNameKey)
.thenCompose(nsIdBytes -> {
if (null == nsIdBytes) {
return FutureUtils.value(GetStreamResponse.newBuilder()
.setCode(StatusCode.NAMESPACE_NOT_FOUND).build());
}
long nsId = Bytes.toLong(nsIdBytes, 0);
return streamPropertiesToResponse(
getStreamProps(nsId, streamName.getStreamName()));
});
}
CompletableFuture<StreamProperties> getStreamProps(long nsId, String streamName) {
byte[] streamNameKey = getStreamNameKey(nsId, streamName);
return store.get(streamNameKey).thenCompose(streamIdBytes -> {
if (null == streamIdBytes) {
return FutureUtils.value(null);
}
long streamId = Bytes.toLong(streamIdBytes, 0);
return getStreamProps(nsId, streamId);
});
}
CompletableFuture<StreamProperties> getStreamProps(long nsId, long streamId) {
byte[] streamIdKey = getStreamIdKey(nsId, streamId);
return store.get(streamIdKey).thenCompose(streamPropBytes -> {
if (null == streamPropBytes) {
return FutureUtils.value(null);
} else {
try {
return FutureUtils.value(StreamProperties.parseFrom(streamPropBytes));
} catch (InvalidProtocolBufferException e) {
return FutureUtils.exception(e);
}
}
});
}
}