blob: 0692fb217f9765c4280e4429a559637ad876fcd7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.stream.storage.impl.store;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.coder.ByteArrayCoder;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.exceptions.ObjectClosedException;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.SharedResourceManager;
import org.apache.bookkeeper.statelib.StateStores;
import org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stream.protocol.RangeId;
import org.apache.bookkeeper.stream.storage.StorageResources;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
/**
* A default implementation of {@link MVCCStoreFactory}.
*/
@Accessors(fluent = true)
@Slf4j
public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
// store supplier
private final Supplier<MVCCAsyncStore<byte[], byte[]>> storeSupplier;
// storage resources
private final StorageResources storageResources;
// scheduler
@Getter(value = AccessLevel.PACKAGE)
private final OrderedScheduler writeIOScheduler;
@Getter(value = AccessLevel.PACKAGE)
private final OrderedScheduler readIOScheduler;
@Getter(value = AccessLevel.PACKAGE)
private final OrderedScheduler checkpointScheduler;
// dirs
private final File[] localStateDirs;
// checkpoint manager
private final Supplier<CheckpointStore> checkpointStoreSupplier;
private CheckpointStore checkpointStore;
// stores
private final Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> stores;
private final boolean serveReadOnlyTable;
private boolean closed = false;
private final StorageConfiguration storageConf;
public MVCCStoreFactoryImpl(Supplier<Namespace> namespaceSupplier,
Supplier<CheckpointStore> checkpointStoreSupplier,
File[] localStoreDirs,
StorageResources storageResources,
boolean serveReadOnlyTable, StorageConfiguration storageConf) {
this.storeSupplier = StateStores.mvccKvBytesStoreSupplier(namespaceSupplier);
this.storageResources = storageResources;
this.writeIOScheduler =
SharedResourceManager.shared().get(storageResources.ioWriteScheduler());
this.readIOScheduler =
SharedResourceManager.shared().get(storageResources.ioReadScheduler());
this.checkpointScheduler =
SharedResourceManager.shared().get(storageResources.checkpointScheduler());
this.localStateDirs = localStoreDirs;
this.checkpointStoreSupplier = checkpointStoreSupplier;
this.storageConf = storageConf;
this.stores = Maps.newHashMap();
this.serveReadOnlyTable = serveReadOnlyTable;
}
private ScheduledExecutorService chooseWriteIOExecutor(long streamId) {
return writeIOScheduler.chooseThread(streamId);
}
private ScheduledExecutorService chooseReadIOExecutor(long streamId) {
return readIOScheduler.chooseThread(streamId);
}
private ScheduledExecutorService chooseCheckpointIOExecutor(long streamId) {
return checkpointScheduler.chooseThread(streamId);
}
private File chooseLocalStoreDir(long streamId) {
int idx = (int) (streamId % localStateDirs.length);
return localStateDirs[idx];
}
static String normalizedName(long id) {
return String.format("%018d", id);
}
static String streamName(long scId,
long streamId,
long rangeId) {
// TODO: change to filesystem path
return String.format(
"%s_%018d_%018d_%018d",
"streams",
scId,
streamId,
rangeId);
}
private synchronized void addStore(long scId, long streamId, long rangeId,
MVCCAsyncStore<byte[], byte[]> store) {
Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores = stores.get(scId);
if (null == scStores) {
scStores = Maps.newHashMap();
stores.putIfAbsent(scId, scStores);
}
RangeId rid = RangeId.of(streamId, rangeId);
MVCCAsyncStore<byte[], byte[]> oldStore = scStores.get(rid);
if (null != oldStore) {
store.closeAsync();
} else {
log.info("Add store (scId = {}, streamId = {}, rangeId = {}) at storage container ({})",
scId, streamId, rangeId, scId);
scStores.put(rid, store);
}
}
private synchronized MVCCAsyncStore<byte[], byte[]> getStore(long scId, long streamId, long rangeId) {
Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores = stores.get(scId);
if (null == scStores) {
return null;
} else {
RangeId rid = RangeId.of(streamId, rangeId);
return scStores.get(rid);
}
}
@Override
public CompletableFuture<MVCCAsyncStore<byte[], byte[]>> openStore(long scId, long streamId, long rangeId,
int ttlSeconds) {
MVCCAsyncStore<byte[], byte[]> store = getStore(scId, streamId, rangeId);
if (null == store) {
return newStore(scId, streamId, rangeId, ttlSeconds);
} else {
return FutureUtils.value(store);
}
}
CompletableFuture<MVCCAsyncStore<byte[], byte[]>> newStore(long scId, long streamId, long rangeId, int ttlSeconds) {
synchronized (this) {
if (closed) {
return FutureUtils.exception(new ObjectClosedException("MVCCStoreFactory"));
}
}
log.info("Initializing stream({})/range({}) at storage container ({})",
streamId, rangeId, scId);
MVCCAsyncStore<byte[], byte[]> store = storeSupplier.get();
File targetDir = chooseLocalStoreDir(streamId);
// used for store ranges
Path rangeStorePath = Paths.get(
targetDir.getAbsolutePath(),
"ranges",
normalizedName(scId),
normalizedName(streamId),
normalizedName(rangeId));
String storeName = String.format(
"%s/%s/%s",
normalizedName(scId),
normalizedName(streamId),
normalizedName(rangeId));
if (null == checkpointStore) {
checkpointStore = checkpointStoreSupplier.get();
}
// build a spec
StateStoreSpec spec = StateStoreSpec.builder()
.name(storeName)
.keyCoder(ByteArrayCoder.of())
.valCoder(ByteArrayCoder.of())
.localStateStoreDir(rangeStorePath.toFile())
.stream(streamName(scId, streamId, rangeId))
.writeIOScheduler(chooseWriteIOExecutor(streamId))
.readIOScheduler(chooseReadIOExecutor(streamId))
.checkpointStore(checkpointStore)
.checkpointDuration(Duration.ofMinutes(15))
.checkpointIOScheduler(chooseCheckpointIOExecutor(streamId))
.isReadonly(serveReadOnlyTable)
.checkpointChecksumEnable(storageConf.getCheckpointChecksumEnable())
.checkpointChecksumCompatible(storageConf.getCheckpointChecksumCompatible())
.localStorageCleanupEnable(storageConf.getLocalStorageCleanupEnable())
.checkpointRestoreIdleLimit(
Duration.ofMillis(storageConf.getCheckpointRestoreIdleLimitMs()))
.ttlSeconds(ttlSeconds)
.build();
return store.init(spec).whenComplete((ignored, throwable) -> {
// since the store has not been added, so can't release its resources during close sc
if (null != throwable) {
log.info("Clearing resources hold by stream({})/range({}) at storage container ({}) ",
streamId, rangeId, scId);
store.closeAsync().whenComplete((i, t) -> {
if (null != t) {
log.error("Clear resources hold by {} fail", store.name());
}
});
}
}).thenApply(ignored -> {
log.info("Successfully initialize stream({})/range({}) at storage container ({})",
streamId, rangeId, scId);
addStore(scId, streamId, rangeId, store);
return store;
});
}
@Override
public CompletableFuture<Void> closeStores(long scId) {
Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores;
synchronized (this) {
scStores = stores.remove(scId);
}
if (null == scStores) {
log.info("scStores for {} on store factory is null, return directly", scId);
return FutureUtils.Void();
}
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
for (MVCCAsyncStore<byte[], byte[]> store : scStores.values()) {
log.info("Closing {} of sc {}", store.name(), scId);
closeFutures.add(store.closeAsync());
}
return FutureUtils.collect(closeFutures).thenApply(ignored -> null);
}
@Override
public void close() {
Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> storesToClose;
synchronized (this) {
if (closed) {
return;
}
storesToClose = Maps.newHashMap(stores);
closed = true;
}
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
for (Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores : storesToClose.values()) {
for (MVCCAsyncStore<byte[], byte[]> store : scStores.values()) {
closeFutures.add(store.closeAsync());
}
}
try {
FutureUtils.result(FutureUtils.collect(closeFutures));
log.info("Successfully closed all the range stores opened by this range factory");
} catch (Exception e) {
log.info("Encountered issue on closing all the range stores opened by this range factory");
}
if (null != checkpointStore) {
checkpointStore.close();
checkpointStore = null;
}
SharedResourceManager.shared().release(
storageResources.ioWriteScheduler(), writeIOScheduler);
SharedResourceManager.shared().release(
storageResources.ioReadScheduler(), readIOScheduler);
SharedResourceManager.shared().release(
storageResources.checkpointScheduler(), checkpointScheduler);
}
}