blob: dd37cb95a1dadb122ddae347d982195bc36b9cd2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed.raft;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.SearchRow;
import org.apache.ignite.internal.storage.Storage;
import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure;
import org.apache.ignite.internal.storage.basic.InsertInvokeClosure;
import org.apache.ignite.internal.storage.basic.ReplaceExactInvokeClosure;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.jetbrains.annotations.NotNull;
/**
* Partition command handler.
*/
public class PartitionListener implements RaftGroupListener {
/** Partition storage. */
private final Storage storage;
/**
* Constructor.
*
* @param storage Storage.
*/
public PartitionListener(Storage storage) {
this.storage = storage;
}
/** {@inheritDoc} */
@Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
while (iterator.hasNext()) {
CommandClosure<ReadCommand> clo = iterator.next();
if (clo.command() instanceof GetCommand) {
DataRow readValue = storage.read(extractAndWrapKey(((GetCommand) clo.command()).getKeyRow()));
ByteBufferRow responseRow = null;
if (readValue.hasValueBytes())
responseRow = new ByteBufferRow(readValue.valueBytes());
clo.result(new SingleRowResponse(responseRow));
}
else if (clo.command() instanceof GetAllCommand) {
Set<BinaryRow> keyRows = ((GetAllCommand)clo.command()).getKeyRows();
assert keyRows != null && !keyRows.isEmpty();
List<SearchRow> keys = keyRows.stream().map(PartitionListener::extractAndWrapKey)
.collect(Collectors.toList());
List<BinaryRow> res = storage
.readAll(keys)
.stream()
.filter(DataRow::hasValueBytes)
.map(read -> new ByteBufferRow(read.valueBytes()))
.collect(Collectors.toList());
clo.result(new MultiRowsResponse(res));
}
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
}
}
/** {@inheritDoc} */
@Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
while (iterator.hasNext()) {
CommandClosure<WriteCommand> clo = iterator.next();
if (clo.command() instanceof InsertCommand) {
BinaryRow row = ((InsertCommand)clo.command()).getRow();
assert row.hasValue() : "Insert command should have a value.";
DataRow newRow = extractAndWrapKeyValue(row);
InsertInvokeClosure writeIfAbsent = new InsertInvokeClosure(newRow);
storage.invoke(newRow, writeIfAbsent);
clo.result(writeIfAbsent.result());
}
else if (clo.command() instanceof DeleteCommand) {
SearchRow newRow = extractAndWrapKey(((DeleteCommand)clo.command()).getKeyRow());
var getAndRemoveClosure = new GetAndRemoveInvokeClosure();
storage.invoke(newRow, getAndRemoveClosure);
clo.result(getAndRemoveClosure.result());
}
else if (clo.command() instanceof ReplaceCommand) {
ReplaceCommand cmd = ((ReplaceCommand)clo.command());
DataRow expected = extractAndWrapKeyValue(cmd.getOldRow());
DataRow newRow = extractAndWrapKeyValue(cmd.getRow());
var replaceClosure = new ReplaceExactInvokeClosure(expected, newRow);
storage.invoke(expected, replaceClosure);
clo.result(replaceClosure.result());
}
else if (clo.command() instanceof UpsertCommand) {
BinaryRow row = ((UpsertCommand)clo.command()).getRow();
assert row.hasValue() : "Upsert command should have a value.";
storage.write(extractAndWrapKeyValue(row));
clo.result(null);
}
else if (clo.command() instanceof InsertAllCommand) {
Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
List<DataRow> keyValues = rows.stream().map(PartitionListener::extractAndWrapKeyValue)
.collect(Collectors.toList());
List<BinaryRow> res = storage.insertAll(keyValues).stream()
.filter(DataRow::hasValueBytes)
.map(inserted -> new ByteBufferRow(inserted.valueBytes()))
.filter(BinaryRow::hasValue)
.collect(Collectors.toList());
clo.result(new MultiRowsResponse(res));
}
else if (clo.command() instanceof UpsertAllCommand) {
Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
storage.writeAll(rows.stream().map(PartitionListener::extractAndWrapKeyValue).collect(Collectors.toList()));
clo.result(null);
}
else if (clo.command() instanceof DeleteAllCommand) {
Set<BinaryRow> rows = ((DeleteAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
List<SearchRow> keys = rows.stream().map(PartitionListener::extractAndWrapKey)
.collect(Collectors.toList());
List<BinaryRow> res = storage.removeAll(keys).stream()
.filter(DataRow::hasValueBytes)
.map(removed -> new ByteBufferRow(removed.valueBytes()))
.filter(BinaryRow::hasValue)
.collect(Collectors.toList());
clo.result(new MultiRowsResponse(res));
}
else if (clo.command() instanceof DeleteExactCommand) {
BinaryRow row = ((DeleteExactCommand)clo.command()).getRow();
assert row != null;
assert row.hasValue();
DataRow keyValue = extractAndWrapKeyValue(row);
var deleteExact = new DeleteExactInvokeClosure(keyValue);
storage.invoke(keyValue, deleteExact);
clo.result(deleteExact.result());
}
else if (clo.command() instanceof DeleteExactAllCommand) {
Set<BinaryRow> rows = ((DeleteExactAllCommand)clo.command()).getRows();
assert rows != null && !rows.isEmpty();
List<DataRow> keyValues = rows.stream().map(PartitionListener::extractAndWrapKeyValue)
.collect(Collectors.toList());
List<BinaryRow> res = storage.removeAllExact(keyValues).stream()
.filter(DataRow::hasValueBytes)
.map(inserted -> new ByteBufferRow(inserted.valueBytes()))
.filter(BinaryRow::hasValue)
.collect(Collectors.toList());
clo.result(new MultiRowsResponse(res));
}
else if (clo.command() instanceof ReplaceIfExistCommand) {
BinaryRow row = ((ReplaceIfExistCommand)clo.command()).getRow();
assert row != null;
DataRow keyValue = extractAndWrapKeyValue(row);
var replaceIfExists = new GetAndReplaceInvokeClosure(keyValue, true);
storage.invoke(keyValue, replaceIfExists);
clo.result(replaceIfExists.result());
}
else if (clo.command() instanceof GetAndDeleteCommand) {
BinaryRow row = ((GetAndDeleteCommand)clo.command()).getKeyRow();
assert row != null;
SearchRow keyRow = extractAndWrapKey(row);
var getAndRemoveClosure = new GetAndRemoveInvokeClosure();
storage.invoke(keyRow, getAndRemoveClosure);
if (getAndRemoveClosure.result())
clo.result(new SingleRowResponse(new ByteBufferRow(getAndRemoveClosure.oldRow().valueBytes())));
else
clo.result(new SingleRowResponse(null));
}
else if (clo.command() instanceof GetAndReplaceCommand) {
BinaryRow row = ((GetAndReplaceCommand)clo.command()).getRow();
assert row != null && row.hasValue();
DataRow keyValue = extractAndWrapKeyValue(row);
var getAndReplace = new GetAndReplaceInvokeClosure(keyValue, true);
storage.invoke(keyValue, getAndReplace);
DataRow oldRow = getAndReplace.oldRow();
BinaryRow res = oldRow.hasValueBytes() ? new ByteBufferRow(oldRow.valueBytes()) : null;
clo.result(new SingleRowResponse(res));
}
else if (clo.command() instanceof GetAndUpsertCommand) {
BinaryRow row = ((GetAndUpsertCommand)clo.command()).getKeyRow();
assert row != null && row.hasValue();
DataRow keyValue = extractAndWrapKeyValue(row);
var getAndReplace = new GetAndReplaceInvokeClosure(keyValue, false);
storage.invoke(keyValue, getAndReplace);
DataRow oldRow = getAndReplace.oldRow();
if (oldRow.hasValueBytes())
clo.result(new SingleRowResponse(new ByteBufferRow(oldRow.valueBytes())));
else
clo.result(new SingleRowResponse(null));
}
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
}
}
/** {@inheritDoc} */
@Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
// Not implemented yet.
}
/** {@inheritDoc} */
@Override public boolean onSnapshotLoad(Path path) {
// Not implemented yet.
return false;
}
/** {@inheritDoc} */
@Override public void onShutdown() {
try {
storage.close();
}
catch (Exception e) {
throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
}
}
/**
* Extracts a key and a value from the {@link BinaryRow} and wraps it in a {@link DataRow}.
*
* @param row Binary row.
* @return Data row.
*/
@NotNull private static DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
byte[] key = new byte[row.keySlice().capacity()];
row.keySlice().get(key);
return new SimpleDataRow(key, row.bytes());
}
/**
* Extracts a key from the {@link BinaryRow} and wraps it in a {@link SearchRow}.
*
* @param row Binary row.
* @return Search row.
*/
@NotNull private static SearchRow extractAndWrapKey(@NotNull BinaryRow row) {
byte[] key = new byte[row.keySlice().capacity()];
row.keySlice().get(key);
return new SimpleDataRow(key, null);
}
}