blob: b1f94fc8d770dc1974dbd27e2bb73b4ee0c9b125 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metastorage.impl;
import static org.apache.ignite.internal.metastorage.command.GetAllCommand.getAllCommand;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.command.GetCommand;
import org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
import org.apache.ignite.internal.metastorage.command.InvokeCommand;
import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
import org.apache.ignite.internal.metastorage.command.PutAllCommand;
import org.apache.ignite.internal.metastorage.command.PutCommand;
import org.apache.ignite.internal.metastorage.command.RemoveAllCommand;
import org.apache.ignite.internal.metastorage.command.RemoveCommand;
import org.apache.ignite.internal.metastorage.command.SyncTimeCommand;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
/**
* {@link MetaStorageService} implementation.
*/
public class MetaStorageServiceImpl implements MetaStorageService {
private static final IgniteLogger LOG = Loggers.forClass(MetaStorageService.class);
/** Default batch size that is requested from the remote server. */
public static final int BATCH_SIZE = 1000;
private final MetaStorageServiceContext context;
private final ClusterTime clusterTime;
/**
* Constructor.
*
* @param metaStorageRaftGrpSvc Meta storage raft group service.
*/
public MetaStorageServiceImpl(
String nodeName,
RaftGroupService metaStorageRaftGrpSvc,
IgniteSpinBusyLock busyLock,
ClusterTime clusterTime
) {
this.context = new MetaStorageServiceContext(
metaStorageRaftGrpSvc,
new MetaStorageCommandsFactory(),
// TODO: Extract the pool size into configuration, see https://issues.apache.org/jira/browse/IGNITE-18735
Executors.newFixedThreadPool(5, NamedThreadFactory.create(nodeName, "metastorage-publisher", LOG)),
busyLock
);
this.clusterTime = clusterTime;
}
public RaftGroupService raftGroupService() {
return context.raftService();
}
@Override
public CompletableFuture<Entry> get(ByteArray key) {
return get(key, MetaStorageManager.LATEST_REVISION);
}
@Override
public CompletableFuture<Entry> get(ByteArray key, long revUpperBound) {
GetCommand getCommand = context.commandsFactory().getCommand().key(key.bytes()).revision(revUpperBound).build();
return context.raftService().run(getCommand);
}
@Override
public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) {
return getAll(keys, MetaStorageManager.LATEST_REVISION);
}
@Override
public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
GetAllCommand getAllCommand = getAllCommand(context.commandsFactory(), keys, revUpperBound);
return context.raftService().<List<Entry>>run(getAllCommand)
.thenApply(MetaStorageServiceImpl::multipleEntryResult);
}
@Override
public CompletableFuture<Void> put(ByteArray key, byte[] value) {
PutCommand putCommand = context.commandsFactory().putCommand()
.key(key.bytes())
.value(value)
.initiatorTimeLong(clusterTime.nowLong())
.build();
return context.raftService().run(putCommand);
}
@Override
public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
PutAllCommand putAllCommand = putAllCommand(context.commandsFactory(), vals, clusterTime.now());
return context.raftService().run(putAllCommand);
}
@Override
public CompletableFuture<Void> remove(ByteArray key) {
RemoveCommand removeCommand = context.commandsFactory().removeCommand().key(key.bytes())
.initiatorTimeLong(clusterTime.nowLong()).build();
return context.raftService().run(removeCommand);
}
@Override
public CompletableFuture<Void> removeAll(Set<ByteArray> keys) {
RemoveAllCommand removeAllCommand = removeAllCommand(context.commandsFactory(), keys, clusterTime.now());
return context.raftService().run(removeAllCommand);
}
@Override
public CompletableFuture<Boolean> invoke(Condition condition, Operation success, Operation failure) {
return invoke(condition, List.of(success), List.of(failure));
}
@Override
public CompletableFuture<Boolean> invoke(
Condition condition,
Collection<Operation> success,
Collection<Operation> failure
) {
InvokeCommand invokeCommand = context.commandsFactory().invokeCommand()
.condition(condition)
.success(success)
.failure(failure)
.initiatorTimeLong(clusterTime.nowLong())
.build();
return context.raftService().run(invokeCommand);
}
@Override
public CompletableFuture<StatementResult> invoke(Iif iif) {
MultiInvokeCommand multiInvokeCommand = context.commandsFactory().multiInvokeCommand()
.iif(iif)
.initiatorTimeLong(clusterTime.nowLong())
.build();
return context.raftService().run(multiInvokeCommand);
}
@Override
public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) {
return range(keyFrom, keyTo, revUpperBound, false);
}
@Override
public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray keyTo) {
return range(keyFrom, keyTo, false);
}
@Override
public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray keyTo, boolean includeTombstones) {
return range(keyFrom, keyTo, MetaStorageManager.LATEST_REVISION, includeTombstones);
}
@Override
public Publisher<Entry> range(
ByteArray keyFrom,
@Nullable ByteArray keyTo,
long revUpperBound,
boolean includeTombstones
) {
Function<byte[], ReadCommand> getRangeCommand = prevKey -> context.commandsFactory().getRangeCommand()
.keyFrom(keyFrom.bytes())
.keyTo(keyTo == null ? null : keyTo.bytes())
.revUpperBound(revUpperBound)
.includeTombstones(includeTombstones)
.previousKey(prevKey)
.batchSize(BATCH_SIZE)
.build();
return new CursorPublisher(context, getRangeCommand);
}
@Override
public Publisher<Entry> prefix(ByteArray prefix, long revUpperBound) {
Function<byte[], ReadCommand> getPrefixCommand = prevKey -> context.commandsFactory().getPrefixCommand()
.prefix(prefix.bytes())
.revUpperBound(revUpperBound)
.includeTombstones(false)
.previousKey(prevKey)
.batchSize(BATCH_SIZE)
.build();
return new CursorPublisher(context, getPrefixCommand);
}
/**
* Sends idle safe time sync message. Should be called only on the leader node.
*
* @param safeTime New safe time.
* @return Future that will be completed when message is sent.
*/
public CompletableFuture<Void> syncTime(HybridTimestamp safeTime, long term) {
SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand()
.initiatorTimeLong(safeTime.longValue())
.initiatorTerm(term)
.build();
return context.raftService().run(syncTimeCommand);
}
// TODO: IGNITE-19417 Implement.
@Override
public CompletableFuture<Void> compact() {
throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<Long> currentRevision() {
GetCurrentRevisionCommand cmd = context.commandsFactory().getCurrentRevisionCommand().build();
return context.raftService().run(cmd);
}
@Override
public void close() {
context.close();
}
private static Map<ByteArray, Entry> multipleEntryResult(List<Entry> entries) {
Map<ByteArray, Entry> res = IgniteUtils.newHashMap(entries.size());
for (Entry e : entries) {
res.put(new ByteArray(e.key()), e);
}
return res;
}
/**
* Creates put all command.
*
* @param commandsFactory Commands factory.
* @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
* @param ts Local time.
*/
private PutAllCommand putAllCommand(MetaStorageCommandsFactory commandsFactory, Map<ByteArray, byte[]> vals, HybridTimestamp ts) {
assert !vals.isEmpty();
int size = vals.size();
List<byte[]> keys = new ArrayList<>(size);
List<byte[]> values = new ArrayList<>(size);
for (Map.Entry<ByteArray, byte[]> e : vals.entrySet()) {
byte[] key = e.getKey().bytes();
byte[] val = e.getValue();
assert key != null : "Key could not be null.";
assert val != null : "Value could not be null.";
keys.add(key);
values.add(val);
}
return commandsFactory.putAllCommand()
.keys(keys)
.values(values)
.initiatorTimeLong(ts.longValue())
.build();
}
/**
* Creates remove all command.
*
* @param commandsFactory Commands factory.
* @param keys The keys collection. Couldn't be {@code null}.
* @param ts Local time.
*/
private RemoveAllCommand removeAllCommand(MetaStorageCommandsFactory commandsFactory, Set<ByteArray> keys, HybridTimestamp ts) {
List<byte[]> list = new ArrayList<>(keys.size());
for (ByteArray key : keys) {
list.add(key.bytes());
}
return commandsFactory.removeAllCommand().keys(list).initiatorTimeLong(ts.longValue()).build();
}
}