| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.bookkeeper.stream.storage.impl.store; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import java.io.File; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.time.Duration; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.function.Supplier; |
| import lombok.AccessLevel; |
| import lombok.Getter; |
| import lombok.experimental.Accessors; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.common.coder.ByteArrayCoder; |
| import org.apache.bookkeeper.common.concurrent.FutureUtils; |
| import org.apache.bookkeeper.common.exceptions.ObjectClosedException; |
| import org.apache.bookkeeper.common.util.OrderedScheduler; |
| import org.apache.bookkeeper.common.util.SharedResourceManager; |
| import org.apache.bookkeeper.statelib.StateStores; |
| import org.apache.bookkeeper.statelib.api.StateStoreSpec; |
| import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore; |
| import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore; |
| import org.apache.bookkeeper.stream.protocol.RangeId; |
| import org.apache.bookkeeper.stream.storage.StorageResources; |
| import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; |
| import org.apache.distributedlog.api.namespace.Namespace; |
| |
| /** |
| * A default implementation of {@link MVCCStoreFactory}. |
| */ |
| @Accessors(fluent = true) |
| @Slf4j |
| public class MVCCStoreFactoryImpl implements MVCCStoreFactory { |
| |
| // store supplier |
| private final Supplier<MVCCAsyncStore<byte[], byte[]>> storeSupplier; |
| // storage resources |
| private final StorageResources storageResources; |
| // scheduler |
| @Getter(value = AccessLevel.PACKAGE) |
| private final OrderedScheduler writeIOScheduler; |
| @Getter(value = AccessLevel.PACKAGE) |
| private final OrderedScheduler readIOScheduler; |
| @Getter(value = AccessLevel.PACKAGE) |
| private final OrderedScheduler checkpointScheduler; |
| // dirs |
| private final File[] localStateDirs; |
| // checkpoint manager |
| private final Supplier<CheckpointStore> checkpointStoreSupplier; |
| private CheckpointStore checkpointStore; |
| // stores |
| private final Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> stores; |
| private final boolean serveReadOnlyTable; |
| private boolean closed = false; |
| private final StorageConfiguration storageConf; |
| |
| public MVCCStoreFactoryImpl(Supplier<Namespace> namespaceSupplier, |
| Supplier<CheckpointStore> checkpointStoreSupplier, |
| File[] localStoreDirs, |
| StorageResources storageResources, |
| boolean serveReadOnlyTable, StorageConfiguration storageConf) { |
| this.storeSupplier = StateStores.mvccKvBytesStoreSupplier(namespaceSupplier); |
| this.storageResources = storageResources; |
| this.writeIOScheduler = |
| SharedResourceManager.shared().get(storageResources.ioWriteScheduler()); |
| this.readIOScheduler = |
| SharedResourceManager.shared().get(storageResources.ioReadScheduler()); |
| this.checkpointScheduler = |
| SharedResourceManager.shared().get(storageResources.checkpointScheduler()); |
| this.localStateDirs = localStoreDirs; |
| this.checkpointStoreSupplier = checkpointStoreSupplier; |
| this.storageConf = storageConf; |
| this.stores = Maps.newHashMap(); |
| this.serveReadOnlyTable = serveReadOnlyTable; |
| } |
| |
| private ScheduledExecutorService chooseWriteIOExecutor(long streamId) { |
| return writeIOScheduler.chooseThread(streamId); |
| } |
| |
| private ScheduledExecutorService chooseReadIOExecutor(long streamId) { |
| return readIOScheduler.chooseThread(streamId); |
| } |
| |
| private ScheduledExecutorService chooseCheckpointIOExecutor(long streamId) { |
| return checkpointScheduler.chooseThread(streamId); |
| } |
| |
| private File chooseLocalStoreDir(long streamId) { |
| int idx = (int) (streamId % localStateDirs.length); |
| return localStateDirs[idx]; |
| } |
| |
| static String normalizedName(long id) { |
| return String.format("%018d", id); |
| } |
| |
| static String streamName(long scId, |
| long streamId, |
| long rangeId) { |
| // TODO: change to filesystem path |
| return String.format( |
| "%s_%018d_%018d_%018d", |
| "streams", |
| scId, |
| streamId, |
| rangeId); |
| } |
| |
| private synchronized void addStore(long scId, long streamId, long rangeId, |
| MVCCAsyncStore<byte[], byte[]> store) { |
| Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores = stores.get(scId); |
| if (null == scStores) { |
| scStores = Maps.newHashMap(); |
| stores.putIfAbsent(scId, scStores); |
| } |
| RangeId rid = RangeId.of(streamId, rangeId); |
| MVCCAsyncStore<byte[], byte[]> oldStore = scStores.get(rid); |
| if (null != oldStore) { |
| store.closeAsync(); |
| } else { |
| log.info("Add store (scId = {}, streamId = {}, rangeId = {}) at storage container ({})", |
| scId, streamId, rangeId, scId); |
| scStores.put(rid, store); |
| } |
| } |
| |
| private synchronized MVCCAsyncStore<byte[], byte[]> getStore(long scId, long streamId, long rangeId) { |
| Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores = stores.get(scId); |
| if (null == scStores) { |
| return null; |
| } else { |
| RangeId rid = RangeId.of(streamId, rangeId); |
| return scStores.get(rid); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<MVCCAsyncStore<byte[], byte[]>> openStore(long scId, long streamId, long rangeId, |
| int ttlSeconds) { |
| MVCCAsyncStore<byte[], byte[]> store = getStore(scId, streamId, rangeId); |
| if (null == store) { |
| return newStore(scId, streamId, rangeId, ttlSeconds); |
| } else { |
| return FutureUtils.value(store); |
| } |
| } |
| |
| CompletableFuture<MVCCAsyncStore<byte[], byte[]>> newStore(long scId, long streamId, long rangeId, int ttlSeconds) { |
| synchronized (this) { |
| if (closed) { |
| return FutureUtils.exception(new ObjectClosedException("MVCCStoreFactory")); |
| } |
| } |
| |
| log.info("Initializing stream({})/range({}) at storage container ({})", |
| streamId, rangeId, scId); |
| |
| MVCCAsyncStore<byte[], byte[]> store = storeSupplier.get(); |
| |
| File targetDir = chooseLocalStoreDir(streamId); |
| // used for store ranges |
| Path rangeStorePath = Paths.get( |
| targetDir.getAbsolutePath(), |
| "ranges", |
| normalizedName(scId), |
| normalizedName(streamId), |
| normalizedName(rangeId)); |
| String storeName = String.format( |
| "%s/%s/%s", |
| normalizedName(scId), |
| normalizedName(streamId), |
| normalizedName(rangeId)); |
| |
| if (null == checkpointStore) { |
| checkpointStore = checkpointStoreSupplier.get(); |
| } |
| |
| // build a spec |
| StateStoreSpec spec = StateStoreSpec.builder() |
| .name(storeName) |
| .keyCoder(ByteArrayCoder.of()) |
| .valCoder(ByteArrayCoder.of()) |
| .localStateStoreDir(rangeStorePath.toFile()) |
| .stream(streamName(scId, streamId, rangeId)) |
| .writeIOScheduler(chooseWriteIOExecutor(streamId)) |
| .readIOScheduler(chooseReadIOExecutor(streamId)) |
| .checkpointStore(checkpointStore) |
| .checkpointDuration(Duration.ofMinutes(15)) |
| .checkpointIOScheduler(chooseCheckpointIOExecutor(streamId)) |
| .isReadonly(serveReadOnlyTable) |
| .checkpointChecksumEnable(storageConf.getCheckpointChecksumEnable()) |
| .checkpointChecksumCompatible(storageConf.getCheckpointChecksumCompatible()) |
| .localStorageCleanupEnable(storageConf.getLocalStorageCleanupEnable()) |
| .checkpointRestoreIdleLimit( |
| Duration.ofMillis(storageConf.getCheckpointRestoreIdleLimitMs())) |
| .ttlSeconds(ttlSeconds) |
| .build(); |
| |
| |
| return store.init(spec).whenComplete((ignored, throwable) -> { |
| // since the store has not been added, so can't release its resources during close sc |
| if (null != throwable) { |
| log.info("Clearing resources hold by stream({})/range({}) at storage container ({}) ", |
| streamId, rangeId, scId); |
| store.closeAsync().whenComplete((i, t) -> { |
| if (null != t) { |
| log.error("Clear resources hold by {} fail", store.name()); |
| } |
| }); |
| } |
| }).thenApply(ignored -> { |
| log.info("Successfully initialize stream({})/range({}) at storage container ({})", |
| streamId, rangeId, scId); |
| addStore(scId, streamId, rangeId, store); |
| return store; |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<Void> closeStores(long scId) { |
| Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores; |
| synchronized (this) { |
| scStores = stores.remove(scId); |
| } |
| if (null == scStores) { |
| log.info("scStores for {} on store factory is null, return directly", scId); |
| return FutureUtils.Void(); |
| } |
| |
| List<CompletableFuture<Void>> closeFutures = Lists.newArrayList(); |
| for (MVCCAsyncStore<byte[], byte[]> store : scStores.values()) { |
| log.info("Closing {} of sc {}", store.name(), scId); |
| closeFutures.add(store.closeAsync()); |
| } |
| |
| return FutureUtils.collect(closeFutures).thenApply(ignored -> null); |
| } |
| |
| @Override |
| public void close() { |
| Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> storesToClose; |
| synchronized (this) { |
| if (closed) { |
| return; |
| } |
| storesToClose = Maps.newHashMap(stores); |
| closed = true; |
| } |
| |
| List<CompletableFuture<Void>> closeFutures = Lists.newArrayList(); |
| for (Map<RangeId, MVCCAsyncStore<byte[], byte[]>> scStores : storesToClose.values()) { |
| for (MVCCAsyncStore<byte[], byte[]> store : scStores.values()) { |
| closeFutures.add(store.closeAsync()); |
| } |
| } |
| try { |
| FutureUtils.result(FutureUtils.collect(closeFutures)); |
| log.info("Successfully closed all the range stores opened by this range factory"); |
| } catch (Exception e) { |
| log.info("Encountered issue on closing all the range stores opened by this range factory"); |
| } |
| if (null != checkpointStore) { |
| checkpointStore.close(); |
| checkpointStore = null; |
| } |
| |
| SharedResourceManager.shared().release( |
| storageResources.ioWriteScheduler(), writeIOScheduler); |
| SharedResourceManager.shared().release( |
| storageResources.ioReadScheduler(), readIOScheduler); |
| SharedResourceManager.shared().release( |
| storageResources.checkpointScheduler(), checkpointScheduler); |
| } |
| } |