blob: 46de480ec99368c346462db9ce7e7a4b238c72da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.stream.protocol.util;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.bookkeeper.common.util.Revisioned;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.RangeProperties;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.bookkeeper.stream.proto.StreamMetadata.LifecycleState;
import org.apache.bookkeeper.stream.proto.StreamMetadata.ServingState;
import org.apache.bookkeeper.stream.proto.StreamName;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRequest;
import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
import org.apache.commons.lang3.tuple.Pair;
/**
* Protocol related utils.
*/
public class ProtoUtils {
/**
* Check if two key ranges overlaps with each other.
*
* @param meta1 first key range
* @param meta2 second key range
* @return true if two key ranges overlaps
*/
public static boolean keyRangeOverlaps(RangeProperties meta1, RangeProperties meta2) {
return keyRangeOverlaps(
meta1.getStartHashKey(), meta1.getEndHashKey(),
meta2.getStartHashKey(), meta2.getEndHashKey());
}
public static boolean keyRangeOverlaps(Pair<Long, Long> range1,
Pair<Long, Long> range2) {
return keyRangeOverlaps(range1.getLeft(), range1.getRight(),
range2.getLeft(), range2.getRight());
}
public static boolean keyRangeOverlaps(RangeProperties range1,
Pair<Long, Long> range2) {
return keyRangeOverlaps(range1.getStartHashKey(), range1.getEndHashKey(),
range2.getLeft(), range2.getRight());
}
public static boolean keyRangeOverlaps(Pair<Long, Long> range1,
RangeProperties range2) {
return keyRangeOverlaps(range1.getLeft(), range1.getRight(),
range2.getStartHashKey(), range2.getEndHashKey());
}
static boolean keyRangeOverlaps(long startKey1,
long endKey1,
long startKey2,
long endKey2) {
return endKey2 > startKey1 && startKey2 < endKey1;
}
/**
* Validate namespace name.
*
* @return true if it is a valid namespace name. otherwise false.
*/
public static boolean validateNamespaceName(String name) {
return validateStreamName(name);
}
/**
* Validate stream name.
*
* <p>follow the rules that dlog uses for validating stream name.
*
* @return true if it is a valid namespace name. otherwise false.
*/
public static boolean validateStreamName(String name) {
if (Strings.isNullOrEmpty(name)) {
return false;
}
for (int i = 0; i < name.length(); i++) {
char c = name.charAt(i);
if (c == 0
|| c == ' '
|| c == '<'
|| c == '>'
|| c > '\u0000' && c < '\u001f'
|| c > '\u007f' && c < '\u009f'
|| c > '\ud800' && c < '\uf8ff'
|| c > '\ufff0' && c < '\uffff') {
return false;
}
}
return true;
}
public static List<RangeProperties> split(long streamId,
int numInitialRanges,
long nextRangeId,
StorageContainerPlacementPolicy placementPolicy) {
int numRanges = Math.max(2, numInitialRanges);
if (numRanges % 2 != 0) { // round up to odd number
numRanges = numRanges + 1;
}
long rangeSize = Long.MAX_VALUE / (numRanges / 2);
long startKey = Long.MIN_VALUE;
List<RangeProperties> ranges = Lists.newArrayListWithExpectedSize(numRanges);
for (int idx = 0; idx < numRanges; ++idx) {
long endKey = startKey + rangeSize;
if (numRanges - 1 == idx) {
endKey = Long.MAX_VALUE;
}
long rangeId = nextRangeId++;
RangeProperties props = RangeProperties.newBuilder()
.setStartHashKey(startKey)
.setEndHashKey(endKey)
.setStorageContainerId(placementPolicy.placeStreamRange(streamId, rangeId))
.setRangeId(rangeId)
.build();
startKey = endKey;
ranges.add(props);
}
return ranges;
}
/**
* Check if the stream is created.
*
* @param state stream state
* @return true if the stream is in created state. otherwise, false.
*/
public static boolean isStreamCreated(LifecycleState state) {
checkArgument(state != LifecycleState.UNRECOGNIZED);
return LifecycleState.UNINIT != state
&& LifecycleState.CREATING != state;
}
/**
* Check if the stream is writable.
*
* @param state stream state
* @return true if the stream is writable. otherwise, false.
*/
public static boolean isStreamWritable(ServingState state) {
checkArgument(state != ServingState.UNRECOGNIZED);
return ServingState.WRITABLE == state;
}
//
// Location API
//
/**
* Create a {@link GetStorageContainerEndpointRequest}.
*
* @param storageContainers list of storage containers
* @return a get storage container endpoint request.
*/
public static GetStorageContainerEndpointRequest createGetStorageContainerEndpointRequest(
List<Revisioned<Long>> storageContainers) {
GetStorageContainerEndpointRequest.Builder builder = GetStorageContainerEndpointRequest.newBuilder();
for (Revisioned<Long> storageContainer : storageContainers) {
builder.addRequests(
OneStorageContainerEndpointRequest.newBuilder()
.setStorageContainer(storageContainer.getValue())
.setRevision(storageContainer.getRevision()));
}
return builder.build();
}
/**
* Create a {@link GetStorageContainerEndpointResponse}.
*
* @param endpoints list of storage container endpoints
* @return a get storage container endpoint response.
*/
public static GetStorageContainerEndpointResponse createGetStorageContainerEndpointResponse(
List<StorageContainerEndpoint> endpoints) {
GetStorageContainerEndpointResponse.Builder builder = GetStorageContainerEndpointResponse.newBuilder();
builder.setStatusCode(StatusCode.SUCCESS);
for (StorageContainerEndpoint endpoint : endpoints) {
builder.addResponses(
OneStorageContainerEndpointResponse.newBuilder()
.setStatusCode(StatusCode.SUCCESS)
.setEndpoint(endpoint));
}
return builder.build();
}
//
// Meta Range API
//
public static GetActiveRangesRequest createGetActiveRangesRequest(long streamId) {
return GetActiveRangesRequest.newBuilder()
.setStreamId(streamId)
.build();
}
public static GetActiveRangesRequest createGetActiveRangesRequest(StreamProperties streamProps) {
return GetActiveRangesRequest.newBuilder()
.setStreamId(streamProps.getStreamId())
.setStreamProps(streamProps)
.build();
}
//
// Namespace API
//
/**
* Create a {@link CreateNamespaceRequest}.
*
* @param nsName namespace name
* @param nsConf namespace conf
* @return a create namespace request.
*/
public static CreateNamespaceRequest createCreateNamespaceRequest(String nsName,
NamespaceConfiguration nsConf) {
return CreateNamespaceRequest.newBuilder()
.setName(nsName)
.setNsConf(nsConf)
.build();
}
/**
* Create a {@link DeleteNamespaceRequest}.
*
* @param colName namespace name
* @return a delete namespace request.
*/
public static DeleteNamespaceRequest createDeleteNamespaceRequest(String colName) {
return DeleteNamespaceRequest.newBuilder()
.setName(colName)
.build();
}
/**
* Create a {@link GetNamespaceRequest}.
*
* @param colName namespace name
* @return a get namespace request.
*/
public static GetNamespaceRequest createGetNamespaceRequest(String colName) {
return GetNamespaceRequest.newBuilder()
.setName(colName)
.build();
}
//
// Stream API
//
/**
* Create a {@link CreateStreamRequest}.
*
* @param nsName namespace name
* @param streamName stream name
* @param streamConf stream configuration
* @return a create stream request.
*/
public static CreateStreamRequest createCreateStreamRequest(String nsName,
String streamName,
StreamConfiguration streamConf) {
return CreateStreamRequest.newBuilder()
.setNsName(nsName)
.setName(streamName)
.setStreamConf(streamConf)
.build();
}
/**
* Create a {@link GetStreamRequest}.
*
* @param nsName namespace name
* @param streamName stream name
* @return a create stream request.
*/
public static GetStreamRequest createGetStreamRequest(String nsName,
String streamName) {
return GetStreamRequest.newBuilder()
.setStreamName(StreamName.newBuilder()
.setNamespaceName(nsName)
.setStreamName(streamName))
.build();
}
/**
* Create a {@link GetStreamRequest}.
*
* @param streamId stream id
* @return a create stream request.
*/
public static GetStreamRequest createGetStreamRequest(long streamId) {
return GetStreamRequest.newBuilder()
.setStreamId(streamId)
.build();
}
/**
* Create a {@link DeleteStreamRequest}.
*
* @param nsName namespace name
* @param streamName stream name
* @return a create stream request.
*/
public static DeleteStreamRequest createDeleteStreamRequest(String nsName,
String streamName) {
return DeleteStreamRequest.newBuilder()
.setName(streamName)
.setNsName(nsName)
.build();
}
}