blob: 4f34d0d1259ae66f4004679f4c40d684551f65d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed.raft;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.jetbrains.annotations.NotNull;
/**
* Partition command handler.
*/
public class PartitionListener implements RaftGroupListener {
/**
* Storage.
* This is a temporary solution, it will apply until persistence layer would not be implemented.
* TODO: IGNITE-14790.
*/
private ConcurrentHashMap<KeyWrapper, BinaryRow> storage = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
while (iterator.hasNext()) {
CommandClosure<ReadCommand> clo = iterator.next();
if (clo.command() instanceof GetCommand) {
clo.result(new SingleRowResponse(storage.get(
extractAndWrapKey(((GetCommand)clo.command()).getKeyRow())
)));
}
else if (clo.command() instanceof GetAllCommand) {
Set<BinaryRow> keyRows = ((GetAllCommand)clo.command()).getKeyRows();
assert keyRows != null && !keyRows.isEmpty();
final Set<BinaryRow> res = keyRows.stream()
.map(this::extractAndWrapKey)
.map(storage::get)
.filter(Objects::nonNull)
.filter(BinaryRow::hasValue)
.collect(Collectors.toSet());
clo.result(new MultiRowsResponse(res));
}
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
}
}
/** {@inheritDoc} */
@Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
while (iterator.hasNext()) {
CommandClosure<WriteCommand> clo = iterator.next();
if (clo.command() instanceof InsertCommand) {
BinaryRow row = ((InsertCommand)clo.command()).getRow();
assert row.hasValue() : "Insert command should have a value.";
BinaryRow previous = storage.putIfAbsent(extractAndWrapKey(row), row);
clo.result(previous == null);
}
else if (clo.command() instanceof DeleteCommand) {
BinaryRow deleted = storage.remove(
extractAndWrapKey(((DeleteCommand)clo.command()).getKeyRow())
);
clo.result(deleted != null);
}
else if (clo.command() instanceof ReplaceCommand) {
ReplaceCommand cmd = ((ReplaceCommand)clo.command());
BinaryRow expected = cmd.getOldRow();
KeyWrapper key = extractAndWrapKey(expected);
BinaryRow current = storage.get(key);
if ((current == null && !expected.hasValue()) ||
equalValues(current, expected)) {
storage.put(key, cmd.getRow());
clo.result(true);
}
else
clo.result(false);
}
else if (clo.command() instanceof UpsertCommand) {
BinaryRow row = ((UpsertCommand)clo.command()).getRow();
assert row.hasValue() : "Upsert command should have a value.";
storage.put(extractAndWrapKey(row), row);
clo.result(null);
}
else if (clo.command() instanceof InsertAllCommand) {
Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
final Set<BinaryRow> res = rows.stream()
.map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
.filter(Objects::nonNull)
.filter(BinaryRow::hasValue)
.collect(Collectors.toSet());
clo.result(new MultiRowsResponse(res));
}
else if (clo.command() instanceof UpsertAllCommand) {
Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
rows.forEach(k -> storage.put(extractAndWrapKey(k), k));
clo.result(null);
}
else if (clo.command() instanceof DeleteAllCommand) {
Set<BinaryRow> rows = ((DeleteAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
final Set<BinaryRow> res = rows.stream()
.map(k -> {
if (k.hasValue())
return null;
else
return storage.remove(extractAndWrapKey(k));
})
.filter(Objects::nonNull)
.filter(BinaryRow::hasValue)
.collect(Collectors.toSet());
clo.result(new MultiRowsResponse(res));
}
else if (clo.command() instanceof DeleteExactCommand) {
BinaryRow row = ((DeleteExactCommand)clo.command()).getRow();
assert row != null;
assert row.hasValue();
final KeyWrapper key = extractAndWrapKey(row);
final BinaryRow old = storage.get(key);
if (old == null || !old.hasValue())
clo.result(false);
else
clo.result(equalValues(row, old) && storage.remove(key) != null);
}
else if (clo.command() instanceof DeleteExactAllCommand) {
Set<BinaryRow> rows = ((DeleteExactAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
final Set<BinaryRow> res = rows.stream()
.map(k -> {
final KeyWrapper key = extractAndWrapKey(k);
final BinaryRow old = storage.get(key);
if (old == null || !old.hasValue() || !equalValues(k, old))
return null;
return storage.remove(key);
})
.filter(Objects::nonNull)
.filter(BinaryRow::hasValue)
.collect(Collectors.toSet());
clo.result(new MultiRowsResponse(res));
}
else if (clo.command() instanceof ReplaceIfExistCommand) {
BinaryRow row = ((ReplaceIfExistCommand)clo.command()).getRow();
assert row != null;
final KeyWrapper key = extractAndWrapKey(row);
final BinaryRow oldRow = storage.get(key);
if (oldRow == null || !oldRow.hasValue())
clo.result(false);
else
clo.result(storage.put(key, row) == oldRow);
}
else if (clo.command() instanceof GetAndDeleteCommand) {
BinaryRow row = ((GetAndDeleteCommand)clo.command()).getKeyRow();
assert row != null;
BinaryRow oldRow = storage.remove(extractAndWrapKey(row));
if (oldRow == null || !oldRow.hasValue())
clo.result(new SingleRowResponse(null));
else
clo.result(new SingleRowResponse(oldRow));
}
else if (clo.command() instanceof GetAndReplaceCommand) {
BinaryRow row = ((GetAndReplaceCommand)clo.command()).getRow();
assert row != null && row.hasValue();
BinaryRow oldRow = storage.get(extractAndWrapKey(row));
storage.computeIfPresent(extractAndWrapKey(row), (key, val) -> row);
if (oldRow == null || !oldRow.hasValue())
clo.result(new SingleRowResponse(null));
else
clo.result(new SingleRowResponse(oldRow));
}
else if (clo.command() instanceof GetAndUpsertCommand) {
BinaryRow row = ((GetAndUpsertCommand)clo.command()).getKeyRow();
assert row != null && row.hasValue();
BinaryRow oldRow = storage.put(extractAndWrapKey(row), row);
if (oldRow == null || !oldRow.hasValue())
clo.result(new SingleRowResponse(null));
else
clo.result(new SingleRowResponse(oldRow));
}
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
}
}
/** {@inheritDoc} */
@Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
// Not implemented yet.
}
/** {@inheritDoc} */
@Override public boolean onSnapshotLoad(Path path) {
// Not implemented yet.
return false;
}
/** {@inheritDoc} */
@Override public void onShutdown() {
// No-op.
}
/**
* Wrapper provides correct byte[] comparison.
*/
private static class KeyWrapper {
/** Data. */
private final byte[] data;
/** Hash. */
private final int hash;
/**
* Constructor.
*
* @param data Wrapped data.
*/
KeyWrapper(byte[] data, int hash) {
assert data != null;
this.data = data;
this.hash = hash;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
KeyWrapper wrapper = (KeyWrapper)o;
return Arrays.equals(data, wrapper.data);
}
/** {@inheritDoc} */
@Override public int hashCode() {
return hash;
}
}
/**
* Compares two rows.
*
* @param row Row to compare.
* @param row2 Row to compare.
* @return True if these rows is equivalent, false otherwise.
*/
private boolean equalValues(BinaryRow row, BinaryRow row2) {
if (row == row2)
return true;
if (row == null || row2 == null)
return false;
if (row.hasValue() ^ row2.hasValue())
return false;
return row.valueSlice().compareTo(row2.valueSlice()) == 0;
}
/**
* Makes a wrapped key from a table row.
*
* @param row Row.
* @return Extracted key.
*/
@NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
final byte[] bytes = new byte[row.keySlice().capacity()];
row.keySlice().get(bytes);
return new KeyWrapper(bytes, row.hash());
}
}