| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.metastorage; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.configuration.schemas.runner.NodeConfiguration; |
| import org.apache.ignite.internal.configuration.ConfigurationManager; |
| import org.apache.ignite.internal.manager.IgniteComponent; |
| import org.apache.ignite.internal.metastorage.client.CompactedException; |
| import org.apache.ignite.internal.metastorage.client.Condition; |
| import org.apache.ignite.internal.metastorage.client.Entry; |
| import org.apache.ignite.internal.metastorage.client.MetaStorageService; |
| import org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl; |
| import org.apache.ignite.internal.metastorage.client.Operation; |
| import org.apache.ignite.internal.metastorage.client.OperationTimeoutException; |
| import org.apache.ignite.internal.metastorage.client.WatchListener; |
| import org.apache.ignite.internal.metastorage.server.KeyValueStorage; |
| import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; |
| import org.apache.ignite.internal.metastorage.watch.AggregatedWatch; |
| import org.apache.ignite.internal.metastorage.watch.KeyCriterion; |
| import org.apache.ignite.internal.metastorage.watch.WatchAggregator; |
| import org.apache.ignite.internal.raft.Loza; |
| import org.apache.ignite.internal.util.Cursor; |
| import org.apache.ignite.internal.util.IgniteSpinBusyLock; |
| import org.apache.ignite.internal.util.IgniteUtils; |
| import org.apache.ignite.internal.vault.VaultManager; |
| import org.apache.ignite.lang.ByteArray; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteInternalException; |
| import org.apache.ignite.lang.IgniteLogger; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.lang.NodeStoppingException; |
| import org.apache.ignite.network.ClusterNode; |
| import org.apache.ignite.network.ClusterService; |
| import org.apache.ignite.network.TopologyEventHandler; |
| import org.apache.ignite.raft.client.service.RaftGroupService; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.util.ByteUtils.bytesToLong; |
| import static org.apache.ignite.internal.util.ByteUtils.longToBytes; |
| |
| /** |
| * MetaStorage manager is responsible for: |
| * <ul> |
| * <li>Handling cluster init message.</li> |
| * <li>Managing meta storage lifecycle including instantiation meta storage raft group.</li> |
| * <li>Providing corresponding meta storage service proxy interface</li> |
| * </ul> |
| */ |
| // TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided. |
| @SuppressWarnings("unused") |
| public class MetaStorageManager implements IgniteComponent { |
| /** Logger. */ |
| private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageManager.class); |
| |
| /** Meta storage raft group name. */ |
| private static final String METASTORAGE_RAFT_GROUP_NAME = "metastorage_raft_group"; |
| |
| /** |
| * Special key for the vault where the applied revision for {@link MetaStorageManager#storeEntries} |
| * operation is stored. This mechanism is needed for committing processed watches to {@link VaultManager}. |
| */ |
| public static final ByteArray APPLIED_REV = ByteArray.fromString("applied_revision"); |
| |
| /** Vault manager in order to commit processed watches with corresponding applied revision. */ |
| private final VaultManager vaultMgr; |
| |
| /** Configuration manager that handles local configuration. */ |
| private final ConfigurationManager locCfgMgr; |
| |
| /** Cluster network service that is used in order to handle cluster init message. */ |
| private final ClusterService clusterNetSvc; |
| |
| /** Raft manager that is used for metastorage raft group handling. */ |
| private final Loza raftMgr; |
| |
| /** Meta storage service. */ |
| private volatile CompletableFuture<MetaStorageService> metaStorageSvcFut; |
| |
| /** Raft group service. */ |
| private volatile CompletableFuture<RaftGroupService> raftGroupServiceFut; |
| |
| /** |
| * Aggregator of multiple watches to deploy them as one batch. |
| * |
| * @see WatchAggregator |
| */ |
| private final WatchAggregator watchAggregator = new WatchAggregator(); |
| |
| /** |
| * Future which will be completed with {@link IgniteUuid}, |
| * when aggregated watch will be successfully deployed. |
| * Can be resolved to {@link Optional#empty()} if no watch deployed at the moment. |
| */ |
| private CompletableFuture<Optional<IgniteUuid>> deployFut = new CompletableFuture<>(); |
| |
| /** |
| * If true - all new watches will be deployed immediately. |
| * |
| * If false - all new watches will be aggregated to one batch |
| * for further deploy by {@link MetaStorageManager#deployWatches()} |
| */ |
| private boolean deployed; |
| |
| /** Flag indicates if meta storage nodes were set on start */ |
| private boolean metaStorageNodesOnStart; |
| |
| /** Actual storage for the Metastorage. */ |
| private final KeyValueStorage storage; |
| |
| /** Busy lock for stop synchronisation. */ |
| private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); |
| |
| /** |
| * The constructor. |
| * |
| * @param vaultMgr Vault manager. |
| * @param locCfgMgr Local configuration manager. |
| * @param clusterNetSvc Cluster network service. |
| * @param raftMgr Raft manager. |
| * @param storage Storage. This component owns this resource and will manage its lifecycle. |
| */ |
| public MetaStorageManager( |
| VaultManager vaultMgr, |
| ConfigurationManager locCfgMgr, |
| ClusterService clusterNetSvc, |
| Loza raftMgr, |
| KeyValueStorage storage |
| ) { |
| this.vaultMgr = vaultMgr; |
| this.locCfgMgr = locCfgMgr; |
| this.clusterNetSvc = clusterNetSvc; |
| this.raftMgr = raftMgr; |
| this.storage = storage; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() { |
| String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY) |
| .metastorageNodes().value(); |
| |
| Predicate<ClusterNode> metaStorageNodesContainsLocPred = |
| clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name()); |
| |
| if (metastorageNodes.length > 0) { |
| metaStorageNodesOnStart = true; |
| |
| List<ClusterNode> metaStorageMembers = clusterNetSvc.topologyService().allMembers().stream() |
| .filter(metaStorageNodesContainsLocPred) |
| .collect(Collectors.toList()); |
| |
| storage.start(); |
| |
| raftGroupServiceFut = raftMgr.prepareRaftGroup( |
| METASTORAGE_RAFT_GROUP_NAME, |
| metaStorageMembers, |
| () -> new MetaStorageListener(storage) |
| ); |
| |
| this.metaStorageSvcFut = raftGroupServiceFut.thenApply(service -> |
| new MetaStorageServiceImpl(service, clusterNetSvc.topologyService().localMember().id()) |
| ); |
| |
| if (hasMetastorageLocally(locCfgMgr)) { |
| clusterNetSvc.topologyService().addEventHandler(new TopologyEventHandler() { |
| @Override public void onAppeared(ClusterNode member) { |
| // No-op. |
| } |
| |
| @Override public void onDisappeared(ClusterNode member) { |
| metaStorageSvcFut.thenCompose(svc -> svc.closeCursors(member.id())); |
| } |
| }); |
| } |
| } |
| else |
| this.metaStorageSvcFut = new CompletableFuture<>(); |
| |
| // TODO: IGNITE-14088: Uncomment and use real serializer factory |
| // Arrays.stream(MetaStorageMessageTypes.values()).forEach( |
| // msgTypeInstance -> net.registerMessageMapper( |
| // msgTypeInstance.msgType(), |
| // new DefaultMessageMapperProvider() |
| // ) |
| // ); |
| |
| // TODO: IGNITE-14414 Cluster initialization flow. Here we should complete metaStorageServiceFuture. |
| // clusterNetSvc.messagingService().addMessageHandler((message, senderAddr, correlationId) -> {}); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop() { |
| busyLock.block(); |
| |
| Optional<IgniteUuid> watchId; |
| |
| try { |
| // If deployed future is not done, that means that stop was called in the middle of |
| // IgniteImpl.start, before deployWatches, or before init phase. |
| // It is correct to check completeness of the future because the method calls are guarded by busy lock. |
| // TODO: add busy lock for init method https://issues.apache.org/jira/browse/IGNITE-14414 |
| if (deployFut.isDone()) { |
| watchId = deployFut.get(); |
| |
| try { |
| if (watchId.isPresent()) |
| metaStorageSvcFut.get().stopWatch(watchId.get()); |
| } |
| catch (InterruptedException | ExecutionException e) { |
| LOG.error("Failed to get meta storage service."); |
| |
| throw new IgniteInternalException(e); |
| } |
| } |
| } catch (InterruptedException | ExecutionException e) { |
| LOG.error("Failed to get watch."); |
| |
| throw new IgniteInternalException(e); |
| } |
| |
| try { |
| if (raftGroupServiceFut != null) { |
| raftGroupServiceFut.get().shutdown(); |
| |
| raftMgr.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME, metastorageNodes()); |
| } |
| } |
| catch (InterruptedException | ExecutionException e) { |
| LOG.error("Failed to get meta storage raft group service."); |
| |
| throw new IgniteInternalException(e); |
| } |
| |
| try { |
| storage.close(); |
| } |
| catch (Exception e) { |
| throw new IgniteInternalException("Exception when stopping the storage", e); |
| } |
| } |
| |
| /** |
| * Deploy all registered watches. |
| */ |
| public synchronized void deployWatches() throws NodeStoppingException { |
| if (!busyLock.enterBusy()) |
| throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); |
| |
| try { |
| var watch = watchAggregator.watch( |
| appliedRevision() + 1, |
| this::storeEntries |
| ); |
| |
| if (watch.isEmpty()) |
| deployFut.complete(Optional.empty()); |
| else { |
| CompletableFuture<Void> fut = |
| dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id -> deployFut.complete(Optional.of(id))); |
| |
| if (metaStorageNodesOnStart) |
| fut.join(); |
| else { |
| // TODO: need to wait for this future in init phase https://issues.apache.org/jira/browse/IGNITE-14414 |
| } |
| } |
| |
| deployed = true; |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Register watch listener by key. |
| * |
| * @param key The target key. |
| * @param lsnr Listener which will be notified for each update. |
| * @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel |
| * subscription |
| */ |
| public synchronized CompletableFuture<Long> registerWatch( |
| @Nullable ByteArray key, |
| @NotNull WatchListener lsnr |
| ) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return waitForReDeploy(watchAggregator.add(key, lsnr)); |
| } finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Register watch listener by key prefix. |
| * |
| * @param key Prefix to listen. |
| * @param lsnr Listener which will be notified for each update. |
| * @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel |
| * subscription |
| */ |
| public synchronized CompletableFuture<Long> registerWatchByPrefix( |
| @Nullable ByteArray key, |
| @NotNull WatchListener lsnr |
| ) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return waitForReDeploy(watchAggregator.addPrefix(key, lsnr)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Register watch listener by collection of keys. |
| * |
| * @param keys Collection listen. |
| * @param lsnr Listener which will be notified for each update. |
| * @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel |
| * subscription |
| */ |
| public synchronized CompletableFuture<Long> registerWatch( |
| @NotNull Collection<ByteArray> keys, |
| @NotNull WatchListener lsnr |
| ) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return waitForReDeploy(watchAggregator.add(keys, lsnr)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Register watch listener by range of keys. |
| * |
| * @param from Start key of range. |
| * @param to End key of range (exclusively). |
| * @param lsnr Listener which will be notified for each update. |
| * @return future with id of registered watch. |
| */ |
| public synchronized CompletableFuture<Long> registerWatch( |
| @NotNull ByteArray from, |
| @NotNull ByteArray to, |
| @NotNull WatchListener lsnr |
| ) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return waitForReDeploy(watchAggregator.add(from, to, lsnr)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Unregister watch listener by id. |
| * |
| * @param id of watch to unregister. |
| * @return future, which will be completed when unregister finished. |
| */ |
| public synchronized CompletableFuture<Void> unregisterWatch(long id) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| watchAggregator.cancel(id); |
| if (deployed) |
| return updateWatches().thenAccept(v -> {}); |
| else |
| return deployFut.thenAccept(uuid -> {}); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#get(ByteArray) |
| */ |
| public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.get(key)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#get(ByteArray, long) |
| */ |
| public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key, long revUpperBound) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#getAll(Set) |
| */ |
| public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#getAll(Set, long) |
| */ |
| public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#put(ByteArray, byte[]) |
| */ |
| public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#getAndPut(ByteArray, byte[]) |
| */ |
| public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, byte[] val) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, val)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#putAll(Map) |
| */ |
| public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#getAndPutAll(Map) |
| */ |
| public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.getAndPutAll(vals)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#remove(ByteArray) |
| */ |
| public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.remove(key)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#getAndRemove(ByteArray) |
| */ |
| public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#removeAll(Set) |
| */ |
| public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#getAndRemoveAll(Set) |
| */ |
| public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Invoke with single success/failure operation. |
| * |
| * @see MetaStorageService#invoke(Condition, Operation, Operation) |
| */ |
| public @NotNull CompletableFuture<Boolean> invoke( |
| @NotNull Condition cond, |
| @NotNull Operation success, |
| @NotNull Operation failure |
| ) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#invoke(Condition, Collection, Collection) |
| */ |
| public @NotNull CompletableFuture<Boolean> invoke( |
| @NotNull Condition cond, |
| @NotNull Collection<Operation> success, |
| @NotNull Collection<Operation> failure |
| ) { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure)); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#range(ByteArray, ByteArray, long) |
| */ |
| public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) throws NodeStoppingException { |
| if (!busyLock.enterBusy()) |
| throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); |
| |
| try { |
| return new CursorWrapper<>( |
| metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound)) |
| ); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Retrieves entries for the given key range in lexicographic order. |
| * Entries will be filtered out by the current applied revision as an upper bound. |
| * Applied revision is a revision of the last successful vault update. |
| * |
| * @param keyFrom Start key of range (inclusive). Couldn't be {@code null}. |
| * @param keyTo End key of range (exclusive). Could be {@code null}. |
| * @return Cursor built upon entries corresponding to the given range and applied revision. |
| * @throws OperationTimeoutException If the operation is timed out. |
| * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. |
| * @see ByteArray |
| * @see Entry |
| */ |
| public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) throws NodeStoppingException { |
| if (!busyLock.enterBusy()) |
| throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); |
| |
| try { |
| return new CursorWrapper<>( |
| metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, appliedRevision())) |
| ); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#range(ByteArray, ByteArray) |
| */ |
| public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) throws NodeStoppingException { |
| if (!busyLock.enterBusy()) |
| throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); |
| |
| try { |
| return new CursorWrapper<>( |
| metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo)) |
| ); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Retrieves entries for the given key prefix in lexicographic order. |
| * Entries will be filtered out by the current applied revision as an upper bound. |
| * Applied revision is a revision of the last successful vault update. |
| * |
| * Prefix query is a synonym of the range query {@code (prefixKey, nextKey(prefixKey))}. |
| * |
| * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be {@code null}. |
| * @return Cursor built upon entries corresponding to the given range and applied revision. |
| * @throws OperationTimeoutException If the operation is timed out. |
| * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. |
| * @see ByteArray |
| * @see Entry |
| */ |
| public @NotNull Cursor<Entry> prefixWithAppliedRevision(@NotNull ByteArray keyPrefix) throws NodeStoppingException { |
| if (!busyLock.enterBusy()) |
| throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); |
| |
| try { |
| var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix); |
| |
| return new CursorWrapper<>( |
| metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), appliedRevision())) |
| ); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * Retrieves entries for the given key prefix in lexicographic order. Short cut for |
| * {@link #prefix(ByteArray, long)} where {@code revUpperBound == -1}. |
| * |
| * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be {@code null}. |
| * @return Cursor built upon entries corresponding to the given range and revision. |
| * @throws OperationTimeoutException If the operation is timed out. |
| * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. |
| * @see ByteArray |
| * @see Entry |
| */ |
| public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix) throws NodeStoppingException { |
| return prefix(keyPrefix, -1); |
| } |
| |
| /** |
| * Retrieves entries for the given key prefix in lexicographic order. Entries will be filtered out by upper bound |
| * of given revision number. |
| * |
| * Prefix query is a synonym of the range query {@code range(prefixKey, nextKey(prefixKey))}. |
| * |
| * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be {@code null}. |
| * @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision. |
| * @return Cursor built upon entries corresponding to the given range and revision. |
| * @throws OperationTimeoutException If the operation is timed out. |
| * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. |
| * @see ByteArray |
| * @see Entry |
| */ |
| public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix, long revUpperBound) throws NodeStoppingException { |
| if (!busyLock.enterBusy()) |
| throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); |
| |
| try { |
| var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix); |
| return new CursorWrapper<>( |
| metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), revUpperBound)) |
| ); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @see MetaStorageService#compact() |
| */ |
| public @NotNull CompletableFuture<Void> compact() { |
| if (!busyLock.enterBusy()) |
| return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping).")); |
| |
| try { |
| return metaStorageSvcFut.thenCompose(MetaStorageService::compact); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @return Applied revision for {@link VaultManager#putAll} operation. |
| */ |
| private long appliedRevision() { |
| byte[] appliedRevision = vaultMgr.get(APPLIED_REV).join().value(); |
| |
| return appliedRevision == null ? 0L : bytesToLong(appliedRevision); |
| } |
| |
| /** |
| * Stop current batch of consolidated watches and register new one from current {@link WatchAggregator}. |
| * |
| * @return Ignite UUID of new consolidated watch. |
| */ |
| private CompletableFuture<Optional<IgniteUuid>> updateWatches() { |
| long revision = appliedRevision() + 1; |
| |
| deployFut = deployFut |
| .thenCompose(idOpt -> |
| idOpt |
| .map(id -> metaStorageSvcFut.thenCompose(svc -> svc.stopWatch(id))) |
| .orElseGet(() -> CompletableFuture.completedFuture(null)) |
| ) |
| .thenCompose(r -> |
| watchAggregator.watch(revision, this::storeEntries) |
| .map(watch -> dispatchAppropriateMetaStorageWatch(watch).thenApply(Optional::of)) |
| .orElseGet(() -> CompletableFuture.completedFuture(Optional.empty())) |
| ); |
| |
| return deployFut; |
| } |
| |
| /** |
| * Store entries with appropriate associated revision. |
| * |
| * @param entries to store. |
| * @param revision associated revision. |
| */ |
| private void storeEntries(Collection<IgniteBiTuple<ByteArray, byte[]>> entries, long revision) { |
| Map<ByteArray, byte[]> batch = IgniteUtils.newHashMap(entries.size() + 1); |
| |
| batch.put(APPLIED_REV, longToBytes(revision)); |
| |
| entries.forEach(e -> batch.put(e.getKey(), e.getValue())); |
| |
| byte[] appliedRevisionBytes = vaultMgr.get(APPLIED_REV).join().value(); |
| |
| long appliedRevision = appliedRevisionBytes == null ? 0L : bytesToLong(appliedRevisionBytes); |
| |
| if (revision <= appliedRevision) { |
| throw new IgniteInternalException(String.format( |
| "Current revision (%d) must be greater than the revision in the Vault (%d)", |
| revision, appliedRevision |
| )); |
| } |
| |
| vaultMgr.putAll(batch).join(); |
| } |
| |
| /** |
| * @param id of watch to redeploy. |
| * @return future, which will be completed after redeploy finished. |
| */ |
| private CompletableFuture<Long> waitForReDeploy(long id) { |
| if (deployed) |
| return updateWatches().thenApply(uid -> id); |
| else |
| return deployFut.thenApply(uid -> id); |
| } |
| |
| /** |
| * Checks whether the given node hosts meta storage. |
| * |
| * @param nodeName Node unique name. |
| * @param metastorageMembers Meta storage members names. |
| * @return {@code true} if the node has meta storage, {@code false} otherwise. |
| */ |
| public static boolean hasMetastorage(String nodeName, String[] metastorageMembers) { |
| boolean isNodeHasMetasorage = false; |
| |
| for (String name : metastorageMembers) { |
| if (name.equals(nodeName)) { |
| isNodeHasMetasorage = true; |
| |
| break; |
| } |
| } |
| |
| return isNodeHasMetasorage; |
| } |
| |
| /** |
| * Checks whether the local node hosts meta storage. |
| * |
| * @param configurationMgr Configuration manager. |
| * @return {@code true} if the node has meta storage, {@code false} otherwise. |
| */ |
| public boolean hasMetastorageLocally(ConfigurationManager configurationMgr) { |
| String[] metastorageMembers = configurationMgr |
| .configurationRegistry() |
| .getConfiguration(NodeConfiguration.KEY) |
| .metastorageNodes() |
| .value(); |
| |
| return hasMetastorage(vaultMgr.name().join(), metastorageMembers); |
| } |
| |
| // TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches. |
| /** Cursor wrapper. */ |
| private final class CursorWrapper<T> implements Cursor<T> { |
| /** Inner cursor future. */ |
| private final CompletableFuture<Cursor<T>> innerCursorFut; |
| |
| /** Inner iterator future. */ |
| private final CompletableFuture<Iterator<T>> innerIterFut; |
| |
| private final InnerIterator it = new InnerIterator(); |
| |
| /** |
| * @param innerCursorFut Inner cursor future. |
| */ |
| CursorWrapper( |
| CompletableFuture<Cursor<T>> innerCursorFut |
| ) { |
| this.innerCursorFut = innerCursorFut; |
| this.innerIterFut = innerCursorFut.thenApply(Iterable::iterator); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void close() throws Exception { |
| if (!busyLock.enterBusy()) |
| throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); |
| |
| try { |
| innerCursorFut.thenApply(cursor -> { |
| try { |
| cursor.close(); |
| |
| return null; |
| } |
| catch (Exception e) { |
| throw new IgniteInternalException(e); |
| } |
| }).get(); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @NotNull @Override public Iterator<T> iterator() { |
| return it; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| return it.hasNext(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public T next() { |
| return it.next(); |
| } |
| |
| private class InnerIterator implements Iterator<T> { |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| if (!busyLock.enterBusy()) |
| return false; |
| |
| try { |
| try { |
| return innerIterFut.thenApply(Iterator::hasNext).get(); |
| } |
| catch (InterruptedException | ExecutionException e) { |
| throw new IgniteInternalException(e); |
| } |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public T next() { |
| if (!busyLock.enterBusy()) |
| throw new NoSuchElementException("No such element because node is stopping."); |
| |
| try { |
| try { |
| return innerIterFut.thenApply(Iterator::next).get(); |
| } |
| catch (InterruptedException | ExecutionException e) { |
| throw new IgniteInternalException(e); |
| } |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Dispatches appropriate metastorage watch method according to inferred watch criterion. |
| * |
| * @param aggregatedWatch Aggregated watch. |
| * @return Future, which will be completed after new watch registration finished. |
| */ |
| private CompletableFuture<IgniteUuid> dispatchAppropriateMetaStorageWatch(AggregatedWatch aggregatedWatch) { |
| if (aggregatedWatch.keyCriterion() instanceof KeyCriterion.CollectionCriterion) { |
| var criterion = (KeyCriterion.CollectionCriterion) aggregatedWatch.keyCriterion(); |
| |
| return metaStorageSvcFut.thenCompose(metaStorageSvc -> metaStorageSvc.watch( |
| criterion.keys(), |
| aggregatedWatch.revision(), |
| aggregatedWatch.listener())); |
| } |
| else if (aggregatedWatch.keyCriterion() instanceof KeyCriterion.ExactCriterion) { |
| var criterion = (KeyCriterion.ExactCriterion) aggregatedWatch.keyCriterion(); |
| |
| return metaStorageSvcFut.thenCompose(metaStorageSvc -> metaStorageSvc.watch( |
| criterion.key(), |
| aggregatedWatch.revision(), |
| aggregatedWatch.listener())); |
| } |
| else if (aggregatedWatch.keyCriterion() instanceof KeyCriterion.RangeCriterion) { |
| var criterion = (KeyCriterion.RangeCriterion) aggregatedWatch.keyCriterion(); |
| |
| return metaStorageSvcFut.thenCompose(metaStorageSvc -> metaStorageSvc.watch( |
| criterion.from(), |
| criterion.to(), |
| aggregatedWatch.revision(), |
| aggregatedWatch.listener())); |
| } |
| else |
| throw new UnsupportedOperationException("Unsupported type of criterion"); |
| } |
| |
| /** |
| * Return metastorage nodes. |
| * |
| * This code will be deleted after node init phase is developed. |
| * https://issues.apache.org/jira/browse/IGNITE-14414 |
| */ |
| private List<ClusterNode> metastorageNodes() { |
| String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY) |
| .metastorageNodes().value(); |
| |
| Predicate<ClusterNode> metaStorageNodesContainsLocPred = |
| clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name()); |
| |
| List<ClusterNode> metaStorageMembers = clusterNetSvc.topologyService().allMembers().stream() |
| .filter(metaStorageNodesContainsLocPred) |
| .collect(Collectors.toList()); |
| |
| return metaStorageMembers; |
| } |
| } |