blob: 9b8d820cf4c065a4f9b325395c77e0c106dfe125 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage.basic;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.SearchRow;
import org.apache.ignite.internal.storage.Storage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Storage implementation based on {@link ConcurrentHashMap}.
*/
public class ConcurrentHashMapStorage implements Storage {
/** Storage content. */
private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public DataRow read(SearchRow key) throws StorageException {
byte[] keyBytes = key.keyBytes();
byte[] valueBytes = map.get(new ByteArray(keyBytes));
return new SimpleDataRow(keyBytes, valueBytes);
}
/** {@inheritDoc} */
@Override public Collection<DataRow> readAll(Collection<? extends SearchRow> keys) {
return keys.stream()
.map(SearchRow::keyBytes)
.map(key -> new SimpleDataRow(key, map.get(new ByteArray(key))))
.collect(Collectors.toList());
}
/** {@inheritDoc} */
@Override public void write(DataRow row) throws StorageException {
map.put(new ByteArray(row.keyBytes()), row.valueBytes());
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
rows.forEach(row -> map.put(new ByteArray(row.keyBytes()), row.valueBytes()));
}
/** {@inheritDoc} */
@Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
return rows.stream()
.map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null ? null : row)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/** {@inheritDoc} */
@Override public void remove(SearchRow key) throws StorageException {
map.remove(new ByteArray(key.keyBytes()));
}
/** {@inheritDoc} */
@Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
return keys.stream()
.map(SearchRow::keyBytes)
.map(key -> new SimpleDataRow(key, map.remove(new ByteArray(key))))
.filter(SimpleDataRow::hasValueBytes)
.collect(Collectors.toList());
}
/** {@inheritDoc} */
@Override public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues) {
return keyValues.stream()
.filter(kv -> {
ByteArray key = new ByteArray(kv.keyBytes());
byte[] currentValue = map.get(key);
if (Arrays.equals(currentValue, kv.valueBytes())) {
map.remove(key);
return true;
}
return false;
})
.collect(Collectors.toList());
}
/** {@inheritDoc} */
@Nullable
@Override public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
byte[] keyBytes = key.keyBytes();
ByteArray mapKey = new ByteArray(keyBytes);
byte[] existingDataBytes = map.get(mapKey);
clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
switch (clo.operationType()) {
case WRITE:
map.put(mapKey, clo.newRow().valueBytes());
break;
case REMOVE:
map.remove(mapKey);
break;
case NOOP:
break;
}
return clo.result();
}
/** {@inheritDoc} */
@Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
Iterator<SimpleDataRow> iter = map.entrySet().stream()
.map(e -> new SimpleDataRow(e.getKey().bytes(), e.getValue()))
.filter(filter)
.iterator();
return new Cursor<>() {
/** {@inheritDoc} */
@Override public boolean hasNext() {
return iter.hasNext();
}
/** {@inheritDoc} */
@Override public DataRow next() {
return iter.next();
}
/** {@inheritDoc} */
@NotNull @Override public Iterator<DataRow> iterator() {
return this;
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
// No-op.
}
};
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
// No-op.
}
}