blob: ea5fffd536610fc16ccc9423d1e75999044e278a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.chm;
import static java.util.stream.Collectors.toList;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.SearchRow;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
/**
* Storage implementation based on {@link ConcurrentHashMap}.
*/
public class TestConcurrentHashMapPartitionStorage implements PartitionStorage {
/** Name of the snapshot file. */
private static final String SNAPSHOT_FILE = "snapshot_file";
/** Storage content. */
private final ConcurrentSkipListMap<ByteArray, byte[]> map = new ConcurrentSkipListMap<>();
private final int partId;
/**
* Constructor.
*
* @param partId Partition id.
*/
public TestConcurrentHashMapPartitionStorage(int partId) {
assert partId >= 0 : partId;
this.partId = partId;
}
/** {@inheritDoc} */
@Override
public int partitionId() {
return partId;
}
/** {@inheritDoc} */
@Override
@Nullable
public DataRow read(SearchRow key) throws StorageException {
byte[] keyBytes = key.keyBytes();
byte[] valueBytes = map.get(new ByteArray(keyBytes));
return valueBytes == null ? null : new SimpleDataRow(keyBytes, valueBytes);
}
/** {@inheritDoc} */
@Override
public Collection<DataRow> readAll(List<? extends SearchRow> keys) {
return keys.stream()
.map(this::read)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/** {@inheritDoc} */
@Override
public void write(DataRow row) throws StorageException {
map.put(new ByteArray(row.keyBytes()), row.valueBytes());
}
/** {@inheritDoc} */
@Override
public void writeAll(List<? extends DataRow> rows) throws StorageException {
rows.forEach(this::write);
}
/** {@inheritDoc} */
@Override
public Collection<DataRow> insertAll(List<? extends DataRow> rows) throws StorageException {
return rows.stream()
.map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null ? null : row)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/** {@inheritDoc} */
@Override
public void remove(SearchRow key) throws StorageException {
map.remove(new ByteArray(key.keyBytes()));
}
/** {@inheritDoc} */
@Override
public Collection<SearchRow> removeAll(List<? extends SearchRow> keys) {
var skippedRows = new ArrayList<SearchRow>(keys.size());
for (SearchRow key : keys) {
byte[] keyBytes = key.keyBytes();
byte[] removedValueBytes = map.remove(new ByteArray(keyBytes));
if (removedValueBytes == null) {
skippedRows.add(key);
}
}
return skippedRows;
}
/** {@inheritDoc} */
@Override
public Collection<DataRow> removeAllExact(List<? extends DataRow> keyValues) {
var skippedRows = new ArrayList<DataRow>(keyValues.size());
for (DataRow row : keyValues) {
var key = new ByteArray(row.keyBytes());
byte[] existingValueBytes = map.get(key);
if (Arrays.equals(existingValueBytes, row.valueBytes())) {
map.remove(key);
} else {
skippedRows.add(row);
}
}
return skippedRows;
}
/** {@inheritDoc} */
@Nullable
@Override
public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
byte[] keyBytes = key.keyBytes();
ByteArray mapKey = new ByteArray(keyBytes);
byte[] existingDataBytes = map.get(mapKey);
clo.call(existingDataBytes == null ? null : new SimpleDataRow(keyBytes, existingDataBytes));
switch (clo.operationType()) {
case WRITE:
DataRow newRow = clo.newRow();
assert newRow != null;
map.put(mapKey, newRow.valueBytes());
break;
case REMOVE:
map.remove(mapKey);
break;
case NOOP:
break;
default:
throw new UnsupportedOperationException(String.valueOf(clo.operationType()));
}
return clo.result();
}
/** {@inheritDoc} */
@Override
public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
Iterator<SimpleDataRow> iter = map.entrySet().stream()
.map(e -> new SimpleDataRow(e.getKey().bytes(), e.getValue()))
.filter(filter)
.iterator();
return Cursor.fromIterator(iter);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
return CompletableFuture.runAsync(() -> {
try (
OutputStream out = Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE));
ObjectOutputStream objOut = new ObjectOutputStream(out)
) {
objOut.writeObject(map.keySet().stream().map(ByteArray::bytes).collect(toList()));
objOut.writeObject(new ArrayList<>(map.values()));
} catch (Exception e) {
throw new IgniteInternalException(e);
}
});
}
/** {@inheritDoc} */
@Override
public void restoreSnapshot(Path snapshotPath) {
try (
InputStream in = Files.newInputStream(snapshotPath.resolve(SNAPSHOT_FILE));
ObjectInputStream objIn = new ObjectInputStream(in)
) {
var keys = (List<byte[]>) objIn.readObject();
var values = (List<byte[]>) objIn.readObject();
map.clear();
for (int i = 0; i < keys.size(); i++) {
map.put(new ByteArray(keys.get(i)), values.get(i));
}
} catch (Exception e) {
throw new IgniteInternalException(e);
}
}
/** {@inheritDoc} */
@Override
public void close() throws Exception {
// No-op.
}
@Override
public void destroy() {
map.clear();
}
@Override
public long rowsCount() {
return map.size();
}
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestConcurrentHashMapPartitionStorage that = (TestConcurrentHashMapPartitionStorage) o;
if (!map.equals(that.map)) {
return false;
}
return true;
}
/** {@inheritDoc} */
@Override
public int hashCode() {
int hash = 0;
for (Map.Entry<ByteArray, byte[]> entry : map.entrySet()) {
hash += entry.getKey().hashCode() ^ Arrays.hashCode(entry.getValue());
}
return hash;
}
}