blob: 676ff20313d896c50091a1030a437136066ba4fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metastorage;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageCommandListener;
import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
import org.apache.ignite.internal.metastorage.watch.KeyCriterion;
import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.internal.metastorage.client.CompactedException;
import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.MetaStorageService;
import org.apache.ignite.internal.metastorage.client.Operation;
import org.apache.ignite.internal.metastorage.client.OperationTimeoutException;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* MetaStorage manager is responsible for:
* <ul>
* <li>Handling cluster init message.</li>
* <li>Managing meta storage lifecycle including instantiation meta storage raft group.</li>
* <li>Providing corresponding meta storage service proxy interface</li>
* </ul>
*/
// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
@SuppressWarnings("unused")
public class MetaStorageManager {
/** Meta storage raft group name. */
private static final String METASTORAGE_RAFT_GROUP_NAME = "metastorage_raft_group";
/** Vault manager in order to commit processed watches with corresponding applied revision. */
private final VaultManager vaultMgr;
/** Cluster network service that is used in order to handle cluster init message. */
private final ClusterService clusterNetSvc;
/** Raft manager that is used for metastorage raft group handling. */
private final Loza raftMgr;
/** Meta storage service. */
private CompletableFuture<MetaStorageService> metaStorageSvcFut;
/**
* Aggregator of multiple watches to deploy them as one batch.
*
* @see WatchAggregator
*/
private final WatchAggregator watchAggregator;
/**
* Future which will be completed with {@link IgniteUuid},
* when aggregated watch will be successfully deployed.
* Can be resolved to {@link Optional#empty()} if no watch deployed at the moment.
*/
private CompletableFuture<Optional<IgniteUuid>> deployFut;
/**
* If true - all new watches will be deployed immediately.
*
* If false - all new watches will be aggregated to one batch
* for further deploy by {@link MetaStorageManager#deployWatches()}
*/
private boolean deployed;
/**
* The constructor.
*
* @param vaultMgr Vault manager.
* @param clusterNetSvc Cluster network service.
* @param raftMgr Raft manager.
*/
public MetaStorageManager(
VaultManager vaultMgr,
ConfigurationManager locCfgMgr,
ClusterService clusterNetSvc,
Loza raftMgr
) {
this.vaultMgr = vaultMgr;
this.clusterNetSvc = clusterNetSvc;
this.raftMgr = raftMgr;
watchAggregator = new WatchAggregator();
deployFut = new CompletableFuture<>();
String locNodeName = locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
.name().value();
String[] metastorageNodes = locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
.metastorageNodes().value();
Predicate<ClusterNode> metaStorageNodesContainsLocPred =
clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
if (metastorageNodes.length > 0) {
this.metaStorageSvcFut = CompletableFuture.completedFuture(new MetaStorageServiceImpl(
raftMgr.startRaftGroup(
METASTORAGE_RAFT_GROUP_NAME,
clusterNetSvc.topologyService().allMembers().stream().filter(
metaStorageNodesContainsLocPred).
collect(Collectors.toList()),
new MetaStorageCommandListener(new SimpleInMemoryKeyValueStorage())
)
)
);
}
else
this.metaStorageSvcFut = new CompletableFuture<>();
// TODO: IGNITE-14088: Uncomment and use real serializer factory
// Arrays.stream(MetaStorageMessageTypes.values()).forEach(
// msgTypeInstance -> net.registerMessageMapper(
// msgTypeInstance.msgType(),
// new DefaultMessageMapperProvider()
// )
// );
// TODO: IGNITE-14414 Cluster initialization flow. Here we should complete metaStorageServiceFuture.
clusterNetSvc.messagingService().addMessageHandler((message, sender, correlationId) -> {});
}
/**
* Deploy all registered watches.
*/
public synchronized void deployWatches() {
try {
var watch = watchAggregator.watch(
vaultMgr.appliedRevision() + 1,
this::storeEntries
);
if (watch.isEmpty())
deployFut.complete(Optional.empty());
else
dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id -> deployFut.complete(Optional.of(id))).join();
}
catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException("Couldn't receive applied revision during deploy watches", e);
}
deployed = true;
}
/**
* Register watch listener by key.
*
* @param key The target key.
* @param lsnr Listener which will be notified for each update.
* @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
* subscription
*/
public synchronized CompletableFuture<Long> registerWatch(
@Nullable ByteArray key,
@NotNull WatchListener lsnr
) {
return waitForReDeploy(watchAggregator.add(key, lsnr));
}
/**
* Register watch listener by key prefix.
*
* @param key Prefix to listen.
* @param lsnr Listener which will be notified for each update.
* @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
* subscription
*/
public synchronized CompletableFuture<Long> registerWatchByPrefix(
@Nullable ByteArray key,
@NotNull WatchListener lsnr
) {
return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
}
/**
* Register watch listener by collection of keys.
*
* @param keys Collection listen.
* @param lsnr Listener which will be notified for each update.
* @return Subscription identifier. Could be used in {@link #unregisterWatch} method in order to cancel
* subscription
*/
public synchronized CompletableFuture<Long> registerWatch(
@NotNull Collection<ByteArray> keys,
@NotNull WatchListener lsnr
) {
return waitForReDeploy(watchAggregator.add(keys, lsnr));
}
/**
* Register watch listener by range of keys.
*
* @param from Start key of range.
* @param to End key of range (exclusively).
* @param lsnr Listener which will be notified for each update.
* @return future with id of registered watch.
*/
public synchronized CompletableFuture<Long> registerWatch(
@NotNull ByteArray from,
@NotNull ByteArray to,
@NotNull WatchListener lsnr
) {
return waitForReDeploy(watchAggregator.add(from, to, lsnr));
}
/**
* Unregister watch listener by id.
*
* @param id of watch to unregister.
* @return future, which will be completed when unregister finished.
*/
public synchronized CompletableFuture<Void> unregisterWatch(long id) {
watchAggregator.cancel(id);
if (deployed)
return updateWatches().thenAccept(v -> {});
else
return deployFut.thenAccept(uuid -> {});
}
/**
* @see MetaStorageService#get(ByteArray)
*/
public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) {
return metaStorageSvcFut.thenCompose(svc -> svc.get(key));
}
/**
* @see MetaStorageService#get(ByteArray, long)
*/
public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key, long revUpperBound) {
return metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound));
}
/**
* @see MetaStorageService#getAll(Set)
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
}
/**
* @see MetaStorageService#getAll(Set, long)
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound));
}
/**
* @see MetaStorageService#put(ByteArray, byte[])
*/
public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val) {
return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val));
}
/**
* @see MetaStorageService#getAndPut(ByteArray, byte[])
*/
public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, byte[] val) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, val));
}
/**
* @see MetaStorageService#putAll(Map)
*/
public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals));
}
/**
* @see MetaStorageService#getAndPutAll(Map)
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAndPutAll(vals));
}
/**
* @see MetaStorageService#remove(ByteArray)
*/
public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
return metaStorageSvcFut.thenCompose(svc -> svc.remove(key));
}
/**
* @see MetaStorageService#getAndRemove(ByteArray)
*/
public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key));
}
/**
* @see MetaStorageService#removeAll(Set)
*/
public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) {
return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
}
/**
* @see MetaStorageService#getAndRemoveAll(Set)
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys));
}
/**
* Invoke with single success/failure operation.
*
* @see MetaStorageService#invoke(Condition, Operation, Operation)
*/
public @NotNull CompletableFuture<Boolean> invoke(
@NotNull Condition cond,
@NotNull Operation success,
@NotNull Operation failure
) {
return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
}
/**
* @see MetaStorageService#invoke(Condition, Collection, Collection)
*/
public @NotNull CompletableFuture<Boolean> invoke(
@NotNull Condition cond,
@NotNull Collection<Operation> success,
@NotNull Collection<Operation> failure
) {
return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
}
/**
* @see MetaStorageService#range(ByteArray, ByteArray, long)
*/
public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) {
return new CursorWrapper<>(
metaStorageSvcFut,
metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound))
);
}
/**
* Retrieves entries for the given key range in lexicographic order.
* Entries will be filtered out by the current applied revision as an upper bound.
* Applied revision is a revision of the last successful vault update.
*
* @param keyFrom Start key of range (inclusive). Couldn't be {@code null}.
* @param keyTo End key of range (exclusive). Could be {@code null}.
* @return Cursor built upon entries corresponding to the given range and applied revision.
* @throws OperationTimeoutException If the operation is timed out.
* @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
* @see ByteArray
* @see Entry
*/
public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
return new CursorWrapper<>(
metaStorageSvcFut,
metaStorageSvcFut.thenApply(svc -> {
try {
return svc.range(keyFrom, keyTo, vaultMgr.appliedRevision());
}
catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException(e);
}
})
);
}
/**
* @see MetaStorageService#range(ByteArray, ByteArray)
*/
public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
return new CursorWrapper<>(
metaStorageSvcFut,
metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo))
);
}
/**
* @see MetaStorageService#compact()
*/
public @NotNull CompletableFuture<Void> compact() {
return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
}
/**
* Stop current batch of consolidated watches and register new one from current {@link WatchAggregator}.
*
* @return Ignite UUID of new consolidated watch.
*/
private CompletableFuture<Optional<IgniteUuid>> updateWatches() {
Long revision;
try {
revision = vaultMgr.appliedRevision() + 1;
}
catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException("Couldn't receive applied revision during watch redeploy", e);
}
final var finalRevision = revision;
deployFut = deployFut
.thenCompose(idOpt -> idOpt.map(id -> metaStorageSvcFut.thenCompose(svc -> svc.stopWatch(id))).
orElse(CompletableFuture.completedFuture(null))
.thenCompose(r -> {
var watch = watchAggregator.watch(finalRevision, this::storeEntries);
if (watch.isEmpty())
return CompletableFuture.completedFuture(Optional.empty());
else
return dispatchAppropriateMetaStorageWatch(watch.get()).thenApply(Optional::of);
}));
return deployFut;
}
/**
* Store entries with appropriate associated revision.
*
* @param entries to store.
* @param revision associated revision.
* @return future, which will be completed when store action finished.
*/
private CompletableFuture<Void> storeEntries(Collection<IgniteBiTuple<ByteArray, byte[]>> entries, long revision) {
try {
return vaultMgr.putAll(entries.stream().collect(
Collectors.toMap(e -> e.getKey(), IgniteBiTuple::getValue)), revision);
}
catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException("Couldn't put entries with considered revision.", e);
}
}
/**
* @param id of watch to redeploy.
* @return future, which will be completed after redeploy finished.
*/
private CompletableFuture<Long> waitForReDeploy(long id) {
if (deployed)
return updateWatches().thenApply(uid -> id);
else
return deployFut.thenApply(uid -> id);
}
/**
* Checks whether the given node hosts meta storage.
*
* @param nodeName Node unique name.
* @param metastorageMembers Meta storage members names.
* @return {@code true} if the node has meta storage, {@code false} otherwise.
*/
public static boolean hasMetastorage(String nodeName, String[] metastorageMembers) {
boolean isNodeHasMetasorage = false;
for (String name : metastorageMembers) {
if (name.equals(nodeName)) {
isNodeHasMetasorage = true;
break;
}
}
return isNodeHasMetasorage;
}
/**
* Checks whether the local node hosts meta storage.
*
* @param configurationMgr Configuration manager.
* @return {@code true} if the node has meta storage, {@code false} otherwise.
*/
public static boolean hasMetastorageLocally(ConfigurationManager configurationMgr) {
String locNodeName = configurationMgr
.configurationRegistry()
.getConfiguration(NodeConfiguration.KEY)
.name()
.value();
String[] metastorageMembers = configurationMgr
.configurationRegistry()
.getConfiguration(NodeConfiguration.KEY)
.metastorageNodes()
.value();
return hasMetastorage(locNodeName, metastorageMembers);
}
// TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
/** Cursor wrapper. */
private final class CursorWrapper<T> implements Cursor<T> {
/** Meta storage service future. */
private final CompletableFuture<MetaStorageService> metaStorageSvcFut;
/** Inner cursor future. */
private final CompletableFuture<Cursor<T>> innerCursorFut;
/** Inner iterator future. */
private final CompletableFuture<Iterator<T>> innerIterFut;
private final InnerIterator it = new InnerIterator();
/**
* @param metaStorageSvcFut Meta storage service future.
* @param innerCursorFut Inner cursor future.
*/
CursorWrapper(
CompletableFuture<MetaStorageService> metaStorageSvcFut,
CompletableFuture<Cursor<T>> innerCursorFut
) {
this.metaStorageSvcFut = metaStorageSvcFut;
this.innerCursorFut = innerCursorFut;
this.innerIterFut = innerCursorFut.thenApply(Iterable::iterator);
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
innerCursorFut.thenCompose(cursor -> {
try {
cursor.close();
return null;
}
catch (Exception e) {
throw new IgniteInternalException(e);
}
}).get();
}
/** {@inheritDoc} */
@NotNull @Override public Iterator<T> iterator() {
return it;
}
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public T next() {
return it.next();
}
private class InnerIterator implements Iterator<T> {
@Override public boolean hasNext() {
try {
return innerIterFut.thenApply(Iterator::hasNext).get();
}
catch (InterruptedException | ExecutionException e) {
throw new IgniteInternalException(e);
}
}
/** {@inheritDoc} */
@Override public T next() {
try {
return innerIterFut.thenApply(Iterator::next).get();
}
catch (InterruptedException | ExecutionException e) {
throw new IgniteInternalException(e);
}
}
}
}
/**
* Dispatches appropriate metastorage watch method according to inferred watch criterion.
*
* @param aggregatedWatch Aggregated watch.
* @return Future, which will be completed after new watch registration finished.
*/
private CompletableFuture<IgniteUuid> dispatchAppropriateMetaStorageWatch(AggregatedWatch aggregatedWatch) {
if (aggregatedWatch.keyCriterion() instanceof KeyCriterion.CollectionCriterion) {
var criterion = (KeyCriterion.CollectionCriterion) aggregatedWatch.keyCriterion();
return metaStorageSvcFut.thenCompose(metaStorageSvc -> metaStorageSvc.watch(
criterion.keys(),
aggregatedWatch.revision(),
aggregatedWatch.listener()));
}
else if (aggregatedWatch.keyCriterion() instanceof KeyCriterion.ExactCriterion) {
var criterion = (KeyCriterion.ExactCriterion) aggregatedWatch.keyCriterion();
return metaStorageSvcFut.thenCompose(metaStorageSvc -> metaStorageSvc.watch(
criterion.key(),
aggregatedWatch.revision(),
aggregatedWatch.listener()));
}
else if (aggregatedWatch.keyCriterion() instanceof KeyCriterion.RangeCriterion) {
var criterion = (KeyCriterion.RangeCriterion) aggregatedWatch.keyCriterion();
return metaStorageSvcFut.thenCompose(metaStorageSvc -> metaStorageSvc.watch(
criterion.from(),
criterion.to(),
aggregatedWatch.revision(),
aggregatedWatch.listener()));
}
else
throw new UnsupportedOperationException("Unsupported type of criterion");
}
}