blob: 6b67839360554ccd1282a54b15da6a5e63fecd4e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.stream.storage.impl.service;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STREAM_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.bookkeeper.stream.protocol.RangeId;
import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
import org.apache.bookkeeper.stream.storage.api.metadata.RootRangeStore;
import org.apache.bookkeeper.stream.storage.impl.kv.TableStoreFactory;
import org.apache.bookkeeper.stream.storage.impl.metadata.MetaRangeStoreFactory;
import org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreFactory;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Unit test of {@link RangeStoreServiceImpl}.
*/
public class RangeStoreServiceImplTest {
private static final long SCID = 3456L;
private static final long STREAM_ID = 1234L;
private static final long RANGE_ID = 3456L;
private static final RangeId RID = RangeId.of(STREAM_ID, RANGE_ID);
private MVCCStoreFactory mvccStoreFactory;
private RootRangeStoreFactory rrStoreFactory;
private MetaRangeStoreFactory mrStoreFactory;
private TableStoreFactory tableStoreFactory;
private RangeStoreServiceImpl container;
private OrderedScheduler scheduler;
private RootRangeStore rrStore;
private MVCCAsyncStore<byte[], byte[]> rrMvccStore;
private MetaRangeStore mrStore;
private MVCCAsyncStore<byte[], byte[]> mrMvccStore;
private TableStore trStore;
private MVCCAsyncStore<byte[], byte[]> trMvccStore;
private StorageServerClientManager clientManager;
@SuppressWarnings("unchecked")
@Before
public void setUp() {
this.scheduler = OrderedScheduler.newSchedulerBuilder()
.name("test-scheduler")
.numThreads(1)
.build();
this.mvccStoreFactory = mock(MVCCStoreFactory.class);
this.rrStoreFactory = mock(RootRangeStoreFactory.class);
this.mrStoreFactory = mock(MetaRangeStoreFactory.class);
this.tableStoreFactory = mock(TableStoreFactory.class);
this.rrMvccStore = mock(MVCCAsyncStore.class);
this.mrMvccStore = mock(MVCCAsyncStore.class);
this.trMvccStore = mock(MVCCAsyncStore.class);
this.clientManager = mock(StorageServerClientManager.class);
this.container = new RangeStoreServiceImpl(
SCID,
scheduler,
mvccStoreFactory,
clientManager,
rrStoreFactory,
mrStoreFactory,
tableStoreFactory);
assertEquals(SCID, this.container.getId());
}
@After
public void tearDown() {
if (null != scheduler) {
scheduler.shutdown();
}
}
private void mockStorageContainer(long scId) {
when(mvccStoreFactory.openStore(
eq(ROOT_STORAGE_CONTAINER_ID),
eq(ROOT_STREAM_ID),
eq(ROOT_RANGE_ID),
eq(0))
).thenReturn(FutureUtils.value(rrMvccStore));
when(mvccStoreFactory.openStore(
eq(scId),
eq(CONTAINER_META_STREAM_ID),
eq(CONTAINER_META_RANGE_ID),
eq(0))
).thenReturn(FutureUtils.value(mrMvccStore));
when(mvccStoreFactory.openStore(
eq(scId),
eq(STREAM_ID),
eq(RANGE_ID),
anyInt())
).thenReturn(FutureUtils.value(trMvccStore));
this.rrStore = mock(RootRangeStore.class);
when(rrStoreFactory.createStore(eq(rrMvccStore)))
.thenReturn(rrStore);
this.mrStore = mock(MetaRangeStore.class);
when(mrStoreFactory.createStore(eq(mrMvccStore)))
.thenReturn(mrStore);
this.trStore = mock(TableStore.class);
when(tableStoreFactory.createStore(eq(trMvccStore)))
.thenReturn(trStore);
when(clientManager.getStreamProperties(eq(STREAM_ID)))
.thenReturn(FutureUtils.value(StreamProperties.getDefaultInstance()));
}
@Test
public void testStart() throws Exception {
mockStorageContainer(SCID);
FutureUtils.result(container.start());
// root range is not started because it is not the root container
verify(mvccStoreFactory, times(0))
.openStore(eq(ROOT_STORAGE_CONTAINER_ID), eq(ROOT_STREAM_ID), eq(ROOT_RANGE_ID), eq(0));
verify(rrStoreFactory, times(0))
.createStore(eq(rrMvccStore));
// meta range should be started
verify(mvccStoreFactory, times(1))
.openStore(eq(SCID), eq(CONTAINER_META_STREAM_ID), eq(CONTAINER_META_RANGE_ID), eq(0));
verify(mrStoreFactory, times(1))
.createStore(eq(mrMvccStore));
}
@Test
public void testStartRootContainer() throws Exception {
mockStorageContainer(ROOT_STORAGE_CONTAINER_ID);
RangeStoreServiceImpl container = new RangeStoreServiceImpl(
ROOT_STORAGE_CONTAINER_ID,
scheduler,
mvccStoreFactory,
clientManager,
rrStoreFactory,
mrStoreFactory,
tableStoreFactory);
FutureUtils.result(container.start());
// root range is not started because it is not the root container
verify(mvccStoreFactory, times(1))
.openStore(eq(ROOT_STORAGE_CONTAINER_ID), eq(ROOT_STREAM_ID), eq(ROOT_RANGE_ID), eq(0));
verify(rrStoreFactory, times(1))
.createStore(eq(rrMvccStore));
// meta range should be started
verify(mvccStoreFactory, times(1))
.openStore(eq(ROOT_STORAGE_CONTAINER_ID), eq(CONTAINER_META_STREAM_ID), eq(CONTAINER_META_RANGE_ID), eq(0));
verify(mrStoreFactory, times(1))
.createStore(eq(mrMvccStore));
}
@Test
public void testClose() throws Exception {
mockStorageContainer(SCID);
when(mvccStoreFactory.closeStores(eq(SCID)))
.thenReturn(FutureUtils.Void());
FutureUtils.result(container.stop());
verify(mvccStoreFactory, times(1))
.closeStores(eq(SCID));
}
//
// Root Range Methods
//
@Test
public void testCreateNamespace() throws Exception {
mockStorageContainer(SCID);
CreateNamespaceResponse expectedResp = CreateNamespaceResponse.getDefaultInstance();
when(rrStore.createNamespace(any(CreateNamespaceRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
CreateNamespaceRequest expectedReq = CreateNamespaceRequest.getDefaultInstance();
assertSame(
expectedResp,
FutureUtils.result(rrStore.createNamespace(expectedReq)));
verify(rrStore, times(1))
.createNamespace(same(expectedReq));
}
@Test
public void testDeleteNamespace() throws Exception {
mockStorageContainer(SCID);
DeleteNamespaceResponse expectedResp = DeleteNamespaceResponse.getDefaultInstance();
when(rrStore.deleteNamespace(any(DeleteNamespaceRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
DeleteNamespaceRequest expectedReq = DeleteNamespaceRequest.getDefaultInstance();
assertSame(
expectedResp,
FutureUtils.result(rrStore.deleteNamespace(expectedReq)));
verify(rrStore, times(1))
.deleteNamespace(same(expectedReq));
}
@Test
public void testGetNamespace() throws Exception {
mockStorageContainer(SCID);
GetNamespaceResponse expectedResp = GetNamespaceResponse.getDefaultInstance();
when(rrStore.getNamespace(any(GetNamespaceRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
GetNamespaceRequest expectedReq = GetNamespaceRequest.getDefaultInstance();
assertSame(
expectedResp,
FutureUtils.result(rrStore.getNamespace(expectedReq)));
verify(rrStore, times(1))
.getNamespace(same(expectedReq));
}
@Test
public void testCreateStream() throws Exception {
mockStorageContainer(SCID);
CreateStreamResponse expectedResp = CreateStreamResponse.getDefaultInstance();
when(rrStore.createStream(any(CreateStreamRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
CreateStreamRequest expectedReq = CreateStreamRequest.getDefaultInstance();
assertSame(
expectedResp,
FutureUtils.result(rrStore.createStream(expectedReq)));
verify(rrStore, times(1))
.createStream(same(expectedReq));
}
@Test
public void testDeleteStream() throws Exception {
mockStorageContainer(SCID);
DeleteStreamResponse expectedResp = DeleteStreamResponse.getDefaultInstance();
when(rrStore.deleteStream(any(DeleteStreamRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
DeleteStreamRequest expectedReq = DeleteStreamRequest.getDefaultInstance();
assertSame(
expectedResp,
FutureUtils.result(rrStore.deleteStream(expectedReq)));
verify(rrStore, times(1))
.deleteStream(same(expectedReq));
}
@Test
public void testGetStream() throws Exception {
mockStorageContainer(SCID);
GetStreamResponse expectedResp = GetStreamResponse.getDefaultInstance();
when(rrStore.getStream(any(GetStreamRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
GetStreamRequest expectedReq = GetStreamRequest.getDefaultInstance();
assertSame(
expectedResp,
FutureUtils.result(rrStore.getStream(expectedReq)));
verify(rrStore, times(1))
.getStream(same(expectedReq));
}
//
// Meta Range Methods
//
@Test
public void testGetActiveRanges() throws Exception {
mockStorageContainer(SCID);
GetActiveRangesResponse expectedResp = GetActiveRangesResponse.getDefaultInstance();
when(mrStore.getActiveRanges(any(GetActiveRangesRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
GetActiveRangesRequest expectedReq = GetActiveRangesRequest.getDefaultInstance();
assertSame(
expectedResp,
FutureUtils.result(mrStore.getActiveRanges(expectedReq)));
verify(mrStore, times(1))
.getActiveRanges(same(expectedReq));
}
//
// Table API
//
private PutRequest newPutRequest() {
RoutingHeader header = RoutingHeader.newBuilder()
.setStreamId(STREAM_ID)
.setRangeId(RANGE_ID)
.build();
return PutRequest.newBuilder()
.setHeader(header)
.build();
}
private DeleteRangeRequest newDeleteRequest() {
RoutingHeader header = RoutingHeader.newBuilder()
.setStreamId(STREAM_ID)
.setRangeId(RANGE_ID)
.build();
return DeleteRangeRequest.newBuilder()
.setHeader(header)
.build();
}
private RangeRequest newRangeRequest() {
RoutingHeader header = RoutingHeader.newBuilder()
.setStreamId(STREAM_ID)
.setRangeId(RANGE_ID)
.build();
return RangeRequest.newBuilder()
.setHeader(header)
.build();
}
private IncrementRequest newIncrRequest() {
RoutingHeader header = RoutingHeader.newBuilder()
.setStreamId(STREAM_ID)
.setRangeId(RANGE_ID)
.build();
return IncrementRequest.newBuilder()
.setHeader(header)
.build();
}
private TxnRequest newTxnRequest() {
RoutingHeader header = RoutingHeader.newBuilder()
.setStreamId(STREAM_ID)
.setRangeId(RANGE_ID)
.build();
return TxnRequest.newBuilder()
.setHeader(header)
.build();
}
@Test
public void testRangeWhenTableStoreNotCached() throws Exception {
mockStorageContainer(SCID);
RangeResponse expectedResp = RangeResponse.getDefaultInstance();
when(trStore.range(any(RangeRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
RangeRequest request = newRangeRequest();
RangeResponse response = FutureUtils.result(container.range(request));
assertSame(expectedResp, response);
assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
}
@Test
public void testRangeWhenTableStoreCached() throws Exception {
mockStorageContainer(SCID);
RangeResponse expectedResp = RangeResponse.getDefaultInstance();
when(trStore.range(any(RangeRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
container.getTableStoreCache().getTableStores().put(RID, trStore);
RangeRequest request = newRangeRequest();
RangeResponse response = FutureUtils.result(container.range(request));
assertSame(expectedResp, response);
}
@Test
public void testPutWhenTableStoreNotCached() throws Exception {
mockStorageContainer(SCID);
PutResponse expectedResp = PutResponse.getDefaultInstance();
when(trStore.put(any(PutRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
PutRequest request = newPutRequest();
PutResponse response = FutureUtils.result(container.put(request));
assertSame(expectedResp, response);
assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
}
@Test
public void testPutWhenTableStoreCached() throws Exception {
mockStorageContainer(SCID);
PutResponse expectedResp = PutResponse.getDefaultInstance();
when(trStore.put(any(PutRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
container.getTableStoreCache().getTableStores().put(RID, trStore);
PutRequest request = newPutRequest();
PutResponse response = FutureUtils.result(container.put(request));
assertSame(expectedResp, response);
}
@Test
public void testDeleteWhenTableStoreNotCached() throws Exception {
mockStorageContainer(SCID);
DeleteRangeResponse expectedResp = DeleteRangeResponse.getDefaultInstance();
when(trStore.delete(any(DeleteRangeRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
DeleteRangeRequest request = newDeleteRequest();
DeleteRangeResponse response = FutureUtils.result(container.delete(request));
assertSame(expectedResp, response);
assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
}
@Test
public void testDeleteWhenTableStoreCached() throws Exception {
mockStorageContainer(SCID);
DeleteRangeResponse expectedResp = DeleteRangeResponse.getDefaultInstance();
when(trStore.delete(any(DeleteRangeRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
container.getTableStoreCache().getTableStores().put(RID, trStore);
DeleteRangeRequest request = newDeleteRequest();
DeleteRangeResponse response = FutureUtils.result(container.delete(request));
assertSame(expectedResp, response);
}
@Test
public void testTxnWhenTableStoreNotCached() throws Exception {
mockStorageContainer(SCID);
TxnResponse expectedResp = TxnResponse.getDefaultInstance();
when(trStore.txn(any(TxnRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
TxnRequest request = newTxnRequest();
TxnResponse response = FutureUtils.result(container.txn(request));
assertSame(expectedResp, response);
assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
}
@Test
public void testTxnWhenTableStoreCached() throws Exception {
mockStorageContainer(SCID);
TxnResponse expectedResp = TxnResponse.getDefaultInstance();
when(trStore.txn(any(TxnRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
container.getTableStoreCache().getTableStores().put(RID, trStore);
TxnRequest request = newTxnRequest();
TxnResponse response = FutureUtils.result(container.txn(request));
assertSame(expectedResp, response);
}
}