blob: 98652bbb87bca057bd4e44c9eb67b5cffe25b29e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metastorage.server.raft;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.ignite.internal.metastorage.common.ConditionType;
import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
import org.apache.ignite.internal.metastorage.common.command.GetCommand;
import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
import org.apache.ignite.internal.metastorage.common.command.PutCommand;
import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorsCloseCommand;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.Entry;
import org.apache.ignite.internal.metastorage.server.EntryEvent;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.Operation;
import org.apache.ignite.internal.metastorage.server.RevisionCondition;
import org.apache.ignite.internal.metastorage.server.TombstoneCondition;
import org.apache.ignite.internal.metastorage.server.ValueCondition;
import org.apache.ignite.internal.metastorage.server.WatchEvent;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.jetbrains.annotations.TestOnly;
/**
* Meta storage listener.
* TODO: IGNITE-14693 Implement Meta storage exception handling logic.
*/
public class MetaStorageListener implements RaftGroupListener {
/** Storage. */
private final KeyValueStorage storage;
/** Cursors map. */
private final Map<IgniteUuid, CursorMeta> cursors;
/**
* @param storage Storage.
*/
public MetaStorageListener(KeyValueStorage storage) {
this.storage = storage;
this.cursors = new ConcurrentHashMap<>();
}
/** {@inheritDoc} */
@Override public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
while (iter.hasNext()) {
CommandClosure<ReadCommand> clo = iter.next();
if (clo.command() instanceof GetCommand) {
GetCommand getCmd = (GetCommand) clo.command();
Entry e;
if (getCmd.revision() != 0)
e = storage.get(getCmd.key(), getCmd.revision());
else
e = storage.get(getCmd.key());
SingleEntryResponse resp = new SingleEntryResponse(
e.key(), e.value(), e.revision(), e.updateCounter()
);
clo.result(resp);
}
else if (clo.command() instanceof GetAllCommand) {
GetAllCommand getAllCmd = (GetAllCommand) clo.command();
Collection<Entry> entries;
if (getAllCmd.revision() != 0)
entries = storage.getAll(getAllCmd.keys(), getAllCmd.revision());
else
entries = storage.getAll(getAllCmd.keys());
List<SingleEntryResponse> res = new ArrayList<>(entries.size());
for (Entry e : entries)
res.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
clo.result(new MultipleEntryResponse(res));
}
else if (clo.command() instanceof CursorHasNextCommand) {
CursorHasNextCommand cursorHasNextCmd = (CursorHasNextCommand) clo.command();
CursorMeta cursorDesc = cursors.get(cursorHasNextCmd.cursorId());
clo.result(!(cursorDesc == null) && cursorDesc.cursor().hasNext());
}
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
}
}
/** {@inheritDoc} */
@Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
while (iter.hasNext()) {
CommandClosure<WriteCommand> clo = iter.next();
if (clo.command() instanceof PutCommand) {
PutCommand putCmd = (PutCommand) clo.command();
storage.put(putCmd.key(), putCmd.value());
clo.result(null);
}
else if (clo.command() instanceof GetAndPutCommand) {
GetAndPutCommand getAndPutCmd = (GetAndPutCommand) clo.command();
Entry e = storage.getAndPut(getAndPutCmd.key(), getAndPutCmd.value());
clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
}
else if (clo.command() instanceof PutAllCommand) {
PutAllCommand putAllCmd = (PutAllCommand) clo.command();
storage.putAll(putAllCmd.keys(), putAllCmd.values());
clo.result(null);
}
else if (clo.command() instanceof GetAndPutAllCommand) {
GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand) clo.command();
Collection<Entry> entries = storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.vals());
List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
for (Entry e : entries)
resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
clo.result(new MultipleEntryResponse(resp));
}
else if (clo.command() instanceof RemoveCommand) {
RemoveCommand rmvCmd = (RemoveCommand) clo.command();
storage.remove(rmvCmd.key());
clo.result(null);
}
else if (clo.command() instanceof GetAndRemoveCommand) {
GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand) clo.command();
Entry e = storage.getAndRemove(getAndRmvCmd.key());
clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
}
else if (clo.command() instanceof RemoveAllCommand) {
RemoveAllCommand rmvAllCmd = (RemoveAllCommand) clo.command();
storage.removeAll(rmvAllCmd.keys());
clo.result(null);
}
else if (clo.command() instanceof GetAndRemoveAllCommand) {
GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand) clo.command();
Collection<Entry> entries = storage.getAndRemoveAll(getAndRmvAllCmd.keys());
List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
for (Entry e : entries)
resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
clo.result(new MultipleEntryResponse(resp));
}
else if (clo.command() instanceof InvokeCommand) {
InvokeCommand cmd = (InvokeCommand) clo.command();
boolean res = storage.invoke(
toCondition(cmd.condition()),
toOperations(cmd.success()),
toOperations(cmd.failure())
);
clo.result(res);
}
else if (clo.command() instanceof RangeCommand) {
RangeCommand rangeCmd = (RangeCommand) clo.command();
IgniteUuid cursorId = rangeCmd.getCursorId();
Cursor<Entry> cursor = (rangeCmd.revUpperBound() != -1) ?
storage.range(
rangeCmd.keyFrom(),
rangeCmd.keyTo(),
rangeCmd.revUpperBound()) :
storage.range(
rangeCmd.keyFrom(),
rangeCmd.keyTo());
cursors.put(
cursorId,
new CursorMeta(
cursor,
CursorType.RANGE,
rangeCmd.requesterNodeId()
)
);
clo.result(cursorId);
}
else if (clo.command() instanceof CursorNextCommand) {
CursorNextCommand cursorNextCmd = (CursorNextCommand) clo.command();
CursorMeta cursorDesc = cursors.get(cursorNextCmd.cursorId());
if (cursorDesc == null) {
clo.result(new NoSuchElementException("Corresponding cursor on server side not found."));
return;
}
if (cursorDesc.type() == CursorType.RANGE) {
Entry e = (Entry) cursorDesc.cursor().next();
clo.result(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
}
else if (cursorDesc.type() == CursorType.WATCH) {
WatchEvent evt = (WatchEvent) cursorDesc.cursor().next();
List<SingleEntryResponse> resp = new ArrayList<>(evt.entryEvents().size() * 2);
for (EntryEvent e : evt.entryEvents()) {
Entry o = e.oldEntry();
Entry n = e.entry();
resp.add(new SingleEntryResponse(o.key(), o.value(), o.revision(), o.updateCounter()));
resp.add(new SingleEntryResponse(n.key(), n.value(), n.revision(), n.updateCounter()));
}
clo.result(new MultipleEntryResponse(resp));
}
}
else if (clo.command() instanceof CursorCloseCommand) {
CursorCloseCommand cursorCloseCmd = (CursorCloseCommand) clo.command();
CursorMeta cursorDesc = cursors.get(cursorCloseCmd.cursorId());
if (cursorDesc == null) {
clo.result(null);
return;
}
try {
cursorDesc.cursor().close();
}
catch (Exception e) {
throw new IgniteInternalException(e);
}
clo.result(null);
}
else if (clo.command() instanceof WatchRangeKeysCommand) {
WatchRangeKeysCommand watchCmd = (WatchRangeKeysCommand) clo.command();
IgniteUuid cursorId = watchCmd.getCursorId();
Cursor<WatchEvent> cursor =
storage.watch(watchCmd.keyFrom(), watchCmd.keyTo(), watchCmd.revision());
cursors.put(
cursorId,
new CursorMeta(
cursor,
CursorType.WATCH,
watchCmd.requesterNodeId()
)
);
clo.result(cursorId);
}
else if (clo.command() instanceof WatchExactKeysCommand) {
WatchExactKeysCommand watchCmd = (WatchExactKeysCommand) clo.command();
IgniteUuid cursorId = watchCmd.getCursorId();
Cursor<WatchEvent> cursor = storage.watch(watchCmd.keys(), watchCmd.revision());
cursors.put(
cursorId,
new CursorMeta(
cursor,
CursorType.WATCH,
watchCmd.requesterNodeId()
)
);
clo.result(cursorId);
}
else if (clo.command() instanceof CursorsCloseCommand) {
CursorsCloseCommand cursorsCloseCmd = (CursorsCloseCommand) clo.command();
Iterator<CursorMeta> cursorsIter = cursors.values().iterator();
while (cursorsIter.hasNext()) {
CursorMeta cursorDesc = cursorsIter.next();
if (cursorDesc.requesterNodeId().equals(cursorsCloseCmd.nodeId())) {
try {
cursorDesc.cursor().close();
}
catch (Exception e) {
throw new IgniteInternalException(e);
}
cursorsIter.remove();
}
}
clo.result(null);
}
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
}
}
/** {@inheritDoc} */
@Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
storage.snapshot(path).whenComplete((unused, throwable) -> {
doneClo.accept(throwable);
});
}
/** {@inheritDoc} */
@Override public boolean onSnapshotLoad(Path path) {
storage.restoreSnapshot(path);
return true;
}
/** {@inheritDoc} */
@Override public void onShutdown() {
try {
storage.close();
}
catch (Exception e) {
throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
}
}
/**
* @return {@link KeyValueStorage} that is backing this listener.
*/
@TestOnly
public KeyValueStorage getStorage() {
return storage;
}
/** */
private static Condition toCondition(ConditionInfo info) {
byte[] key = info.key();
ConditionType type = info.type();
if (type == ConditionType.KEY_EXISTS)
return new ExistenceCondition(ExistenceCondition.Type.EXISTS, key);
else if (type == ConditionType.KEY_NOT_EXISTS)
return new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key);
else if (type == ConditionType.TOMBSTONE)
return new TombstoneCondition(key);
else if (type == ConditionType.VAL_EQUAL)
return new ValueCondition(ValueCondition.Type.EQUAL, key, info.value());
else if (type == ConditionType.VAL_NOT_EQUAL)
return new ValueCondition(ValueCondition.Type.NOT_EQUAL, key, info.value());
else if (type == ConditionType.REV_EQUAL)
return new RevisionCondition(RevisionCondition.Type.EQUAL, key, info.revision());
else if (type == ConditionType.REV_NOT_EQUAL)
return new RevisionCondition(RevisionCondition.Type.NOT_EQUAL, key, info.revision());
else if (type == ConditionType.REV_GREATER)
return new RevisionCondition(RevisionCondition.Type.GREATER, key, info.revision());
else if (type == ConditionType.REV_GREATER_OR_EQUAL)
return new RevisionCondition(RevisionCondition.Type.GREATER_OR_EQUAL, key, info.revision());
else if (type == ConditionType.REV_LESS)
return new RevisionCondition(RevisionCondition.Type.LESS, key, info.revision());
else if (type == ConditionType.REV_LESS_OR_EQUAL)
return new RevisionCondition(RevisionCondition.Type.LESS_OR_EQUAL, key, info.revision());
else
throw new IllegalArgumentException("Unknown condition type: " + type);
}
/** */
private static List<Operation> toOperations(List<OperationInfo> infos) {
List<Operation> ops = new ArrayList<>(infos.size());
for (OperationInfo info : infos)
ops.add(new Operation(info.type(), info.key(), info.value()));
return ops;
}
/**
* Cursor meta information: origin node id and type.
*/
private class CursorMeta {
/** Cursor. */
private final Cursor<?> cursor;
/** Cursor type. */
private final CursorType type;
/** Id of the node that creates cursor. */
private final String requesterNodeId;
/**
* The constructor.
*
* @param cursor Cursor.
* @param type Cursor type.
* @param requesterNodeId Id of the node that creates cursor.
*/
CursorMeta(Cursor<?> cursor,
CursorType type,
String requesterNodeId
) {
this.cursor = cursor;
this.type = type;
this.requesterNodeId = requesterNodeId;
}
/**
* @return Cursor.
*/
public Cursor<?> cursor() {
return cursor;
}
/**
* @return Cursor type.
*/
public CursorType type() {
return type;
}
/**
* @return Id of the node that creates cursor.
*/
public String requesterNodeId() {
return requesterNodeId;
}
}
/** Cursor type. */
private enum CursorType {
RANGE,
WATCH;
}
}