blob: 298d0b5c50900854b3ecc74b478efdb311bc542a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.metastorage.persistence;
import java.io.Serializable;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.cleanupGuardKey;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.globalKey;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemKey;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.localKeyPrefix;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.versionKey;
/** */
class InMemoryCachedDistributedMetaStorageBridge {
/** */
private final JdkMarshaller marshaller;
/** */
private final SortedMap<String, byte[]> cache = new TreeMap<>();
/** */
public InMemoryCachedDistributedMetaStorageBridge(JdkMarshaller marshaller) {
this.marshaller = marshaller;
}
/**
* Get unmarshalled data by key.
*
* @param globalKey The key.
* @return Value associated with the key.
* @throws IgniteCheckedException If unmarshalling failed.
*/
public Serializable read(String globalKey) throws IgniteCheckedException {
return unmarshal(marshaller, readMarshalled(globalKey));
}
/**
* Get raw data by key.
*
* @param globalKey The key.
* @return Value associated with the key.
*/
public byte[] readMarshalled(String globalKey) {
return cache.get(globalKey);
}
/**
* Iterate over all values corresponding to the keys with given prefix. It is guaranteed that iteration will be
* executed in ascending keys order.
*
* @param globalKeyPrefix Prefix for the keys that will be iterated.
* @param cb Callback that will be applied to all {@code <key, value>} pairs.
* @throws IgniteCheckedException If unmarshalling failed.
*/
public void iterate(
String globalKeyPrefix,
BiConsumer<String, ? super Serializable> cb
) throws IgniteCheckedException {
for (Map.Entry<String, byte[]> entry : cache.tailMap(globalKeyPrefix).entrySet()) {
if (!entry.getKey().startsWith(globalKeyPrefix))
break;
cb.accept(entry.getKey(), unmarshal(marshaller, entry.getValue()));
}
}
/**
* Write data into storage.
*
* @param globalKey The key.
* @param valBytes Value bytes.
*/
public void write(String globalKey, @Nullable byte[] valBytes) {
if (valBytes == null)
cache.remove(globalKey);
else
cache.put(globalKey, valBytes);
}
/**
* Returns all {@code <key, value>} pairs currently stored in distributed metastorage. Values are not unmarshalled.
* All keys are sorted in ascending order.
*
* @return Array of all keys and values.
*/
public DistributedMetaStorageKeyValuePair[] localFullData() {
return cache.entrySet().stream().map(
entry -> new DistributedMetaStorageKeyValuePair(entry.getKey(), entry.getValue())
).toArray(DistributedMetaStorageKeyValuePair[]::new);
}
/** */
public void writeFullNodeData(DistributedMetaStorageClusterNodeData fullNodeData) {
assert fullNodeData.fullData != null;
cache.clear();
for (DistributedMetaStorageKeyValuePair item : fullNodeData.fullData)
cache.put(item.key, item.valBytes);
}
/** */
public DistributedMetaStorageVersion readInitialData(
ReadOnlyMetastorage metastorage
) throws IgniteCheckedException {
if (metastorage.readRaw(cleanupGuardKey()) != null)
return DistributedMetaStorageVersion.INITIAL_VERSION;
DistributedMetaStorageVersion storedVer =
(DistributedMetaStorageVersion)metastorage.read(versionKey());
if (storedVer == null)
return DistributedMetaStorageVersion.INITIAL_VERSION;
else {
DistributedMetaStorageVersion ver = storedVer;
DistributedMetaStorageHistoryItem lastHistItem;
DistributedMetaStorageHistoryItem histItem =
(DistributedMetaStorageHistoryItem)metastorage.read(historyItemKey(storedVer.id() + 1));
if (histItem != null) {
lastHistItem = histItem;
ver = storedVer.nextVersion(histItem);
}
else
lastHistItem = (DistributedMetaStorageHistoryItem)metastorage.read(historyItemKey(storedVer.id()));
metastorage.iterate(
localKeyPrefix(),
(key, val) -> cache.put(globalKey(key), (byte[])val),
false
);
// Last item rollover.
if (lastHistItem != null) {
for (int i = 0, len = lastHistItem.keys().length; i < len; i++) {
String key = lastHistItem.keys()[i];
byte[] valBytes = lastHistItem.valuesBytesArray()[i];
if (valBytes == null)
cache.remove(key);
else
cache.put(key, valBytes);
}
}
return ver;
}
}
}