blob: f03654648370ef01998538e7aaf7f6e59bf29af5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.metastorage;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMetrics;
import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorageImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_AUX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
/**
* General purpose key-value local-only storage.
*/
public class MetaStorage implements CheckpointListener, ReadWriteMetastorage {
/** */
public static final String METASTORAGE_CACHE_NAME = "MetaStorage";
/** */
public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME);
/** Metastorage cache directory to store data. */
public static final String METASTORAGE_DIR_NAME = "metastorage";
/** Old special partition reserved for metastore space. */
public static final int OLD_METASTORE_PARTITION = 0x0;
/** Special partition reserved for metastore space. */
public static final int METASTORE_PARTITION = 0x1;
/** The set of all metastorage partitions. */
public static final Set<Integer> METASTORAGE_PARTITIONS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(OLD_METASTORE_PARTITION, METASTORE_PARTITION)));
/** This flag is used ONLY FOR TESTING the migration of a metastorage from Part 0 to Part 1. */
public static boolean PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false;
/** Marker for removed entry. */
private static final byte[] TOMBSTONE = new byte[0];
/** Temporary metastorage memory size. */
private static final int TEMPORARY_METASTORAGE_IN_MEMORY_SIZE = 128 * 1024 * 1024;
/** Temporary metastorage buffer size (file). */
private static final int TEMPORARY_METASTORAGE_BUFFER_SIZE = 1024 * 1024;
/** */
private final IgniteWriteAheadLogManager wal;
/** */
private final DataRegion dataRegion;
/** */
private final IgniteLogger log;
/** */
private MetastorageTree tree;
/** */
private AtomicLong rmvId = new AtomicLong();
/** */
private final boolean readOnly;
/** */
private boolean empty;
/** */
private RootPage treeRoot;
/** */
private RootPage reuseListRoot;
/** */
private PartitionMetaStorageImpl<MetastorageRowStoreEntry> partStorage;
/** */
private SortedMap<String, byte[]> lastUpdates;
/** */
private final Marshaller marshaller = JdkMarshaller.DEFAULT;
/** Partition id. */
private int partId;
/** Cctx. */
private final GridCacheSharedContext<?, ?> cctx;
/** */
public MetaStorage(
GridCacheSharedContext<?, ?> cctx,
DataRegion dataRegion,
boolean readOnly
) {
this.cctx = cctx;
wal = cctx.wal();
this.dataRegion = dataRegion;
this.readOnly = readOnly;
log = cctx.logger(getClass());
}
/** */
public void init(GridCacheDatabaseSharedManager db) throws IgniteCheckedException {
dataRegion.metrics().clear();
initInternal(db);
if (!PRESERVE_LEGACY_METASTORAGE_PARTITION_ID) {
if (partId == OLD_METASTORE_PARTITION)
db.temporaryMetaStorage(copyDataToTmpStorage());
else if (db.temporaryMetaStorage() != null) {
restoreDataFromTmpStorage(db.temporaryMetaStorage());
db.temporaryMetaStorage(null);
db.addCheckpointListener(new CheckpointListener() {
/** {@inheritDoc} */
@Override public void onMarkCheckpointBegin(Context ctx) {
}
/** {@inheritDoc} */
@Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
assert cctx.pageStore() != null;
int partTag = ((PageMemoryEx)dataRegion.pageMemory()).invalidate(METASTORAGE_CACHE_ID, OLD_METASTORE_PARTITION);
cctx.pageStore().truncate(METASTORAGE_CACHE_ID, OLD_METASTORE_PARTITION, partTag);
int idxTag =
((PageMemoryEx)dataRegion.pageMemory()).invalidate(METASTORAGE_CACHE_ID, PageIdAllocator.INDEX_PARTITION);
PageStore store =
((FilePageStoreManager)cctx.pageStore()).getStore(METASTORAGE_CACHE_ID, PageIdAllocator.INDEX_PARTITION);
store.truncate(idxTag);
db.removeCheckpointListener(this);
}
/** {@inheritDoc} */
@Override public void beforeCheckpointBegin(Context ctx) {
}
}, dataRegion);
}
}
}
/** Frees the allocated resources. */
public void close() {
if (tree != null)
tree.close();
if (partStorage != null)
partStorage.close();
}
/**
* Copying all data from the 'meta' to temporary storage.
*
* @return Target temporary storage
*/
private TmpStorage copyDataToTmpStorage() throws IgniteCheckedException {
TmpStorage tmpStorage = new TmpStorage(TEMPORARY_METASTORAGE_IN_MEMORY_SIZE, log);
GridCursor<MetastorageDataRow> cur = tree.find(null, null);
while (cur.next()) {
MetastorageDataRow row = cur.get();
tmpStorage.add(row.key(), partStorage.readRow(row.link()));
}
return tmpStorage;
}
/**
* Data recovery from temporary storage
*
* @param tmpStorage temporary storage.
*/
private void restoreDataFromTmpStorage(TmpStorage tmpStorage) throws IgniteCheckedException {
for (Iterator<IgniteBiTuple<String, byte[]>> it = tmpStorage.stream().iterator(); it.hasNext(); ) {
IgniteBiTuple<String, byte[]> t = it.next();
writeRaw(t.get1(), t.get2());
}
try {
tmpStorage.close();
}
catch (IOException e) {
log.error(e.getMessage(), e);
}
}
/**
* @param db Database.
*/
private void initInternal(IgniteCacheDatabaseSharedManager db) throws IgniteCheckedException {
if (PRESERVE_LEGACY_METASTORAGE_PARTITION_ID)
getOrAllocateMetas(partId = OLD_METASTORE_PARTITION);
else if (!readOnly || getOrAllocateMetas(partId = OLD_METASTORE_PARTITION))
getOrAllocateMetas(partId = METASTORE_PARTITION);
if (!empty) {
String freeListName = METASTORAGE_CACHE_NAME + "##FreeList";
String treeName = METASTORAGE_CACHE_NAME + "##Tree";
partStorage = new PartitionMetaStorageImpl<MetastorageRowStoreEntry>(
METASTORAGE_CACHE_ID,
freeListName,
dataRegion,
null,
wal,
reuseListRoot.pageId().pageId(),
reuseListRoot.isAllocated(),
cctx.diagnostic().pageLockTracker(),
cctx.kernalContext(),
null,
FLAG_AUX
) {
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
return pageMem.allocatePage(grpId, partId, FLAG_AUX);
}
};
MetastorageRowStore rowStore = new MetastorageRowStore(partStorage, db);
tree = new MetastorageTree(
METASTORAGE_CACHE_ID,
treeName,
dataRegion.pageMemory(),
wal,
rmvId,
partStorage,
rowStore,
treeRoot.pageId().pageId(),
treeRoot.isAllocated(),
cctx.kernalContext().failure(),
cctx.diagnostic().pageLockTracker(),
partId
);
if (!readOnly)
((GridCacheDatabaseSharedManager)db).addCheckpointListener(this, dataRegion);
}
}
/** {@inheritDoc} */
@Override public Serializable read(String key) throws IgniteCheckedException {
byte[] data = readRaw(key);
Serializable res = null;
if (data != null)
res = marshaller.unmarshal(data, U.gridClassLoader());
return res;
}
/** {@inheritDoc} */
@Override public void iterate(
String keyPrefix,
BiConsumer<String, ? super Serializable> cb,
boolean unmarshal
) throws IgniteCheckedException {
if (empty)
return;
Iterator<Map.Entry<String, byte[]>> updatesIter = null;
if (readOnly) {
if (lastUpdates != null) {
SortedMap<String, byte[]> prefixedSubmap = lastUpdates.subMap(keyPrefix, keyPrefix + "\uFFFF");
if (!prefixedSubmap.isEmpty())
updatesIter = prefixedSubmap.entrySet().iterator();
}
}
Map.Entry<String, byte[]> curUpdatesEntry = null;
if (updatesIter != null) {
assert updatesIter.hasNext();
curUpdatesEntry = updatesIter.next();
}
MetastorageSearchRow lower = new MetastorageSearchRow(keyPrefix);
MetastorageSearchRow upper = new MetastorageSearchRow(keyPrefix + "\uFFFF");
GridCursor<MetastorageDataRow> cur = tree.find(lower, upper);
while (cur.next()) {
MetastorageDataRow row = cur.get();
String key = row.key();
byte[] valBytes = partStorage.readRow(row.link());
int c = 0;
while (curUpdatesEntry != null && (c = curUpdatesEntry.getKey().compareTo(key)) < 0)
curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
if (curUpdatesEntry != null && c == 0)
curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
else
applyCallback(cb, unmarshal, key, valBytes);
}
while (curUpdatesEntry != null)
curUpdatesEntry = advanceCurrentUpdatesEntry(cb, unmarshal, updatesIter, curUpdatesEntry);
}
/** */
private Map.Entry<String, byte[]> advanceCurrentUpdatesEntry(
BiConsumer<String, ? super Serializable> cb,
boolean unmarshal,
Iterator<Map.Entry<String, byte[]>> updatesIter,
Map.Entry<String, byte[]> curUpdatesEntry
) throws IgniteCheckedException {
applyCallback(cb, unmarshal, curUpdatesEntry.getKey(), curUpdatesEntry.getValue());
return updatesIter.hasNext() ? updatesIter.next() : null;
}
/** */
private void applyCallback(
BiConsumer<String, ? super Serializable> cb,
boolean unmarshal,
String key,
byte[] valBytes
) throws IgniteCheckedException {
if (valBytes != TOMBSTONE) {
if (unmarshal) {
Serializable val = marshaller.unmarshal(valBytes, U.gridClassLoader());
cb.accept(key, val);
}
else
cb.accept(key, valBytes);
}
}
/** {@inheritDoc} */
@Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
assert val != null;
if (!readOnly)
writeRaw(key, marshaller.marshal(val));
}
/** {@inheritDoc} */
@Override public void remove(@NotNull String key) throws IgniteCheckedException {
removeData(key);
}
/** {@inheritDoc} */
@Override public void writeRaw(String key, byte[] data) throws IgniteCheckedException {
if (!readOnly) {
WALPointer ptr;
synchronized (this) {
ptr = wal.log(new MetastoreDataRecord(key, data));
MetastorageDataRow oldRow = tree.findOne(new MetastorageSearchRow(key));
byte[] keyBytes = key.getBytes();
long keyLink;
if (oldRow != null)
keyLink = oldRow.keyLink();
else if (keyBytes.length > MetastorageTree.MAX_KEY_LEN)
keyLink = tree.rowStore().addRow(keyBytes);
else
keyLink = 0L;
long dataLink = tree.rowStore().addRow(data);
tree.put(new MetastorageDataRow(dataLink, key, keyLink));
if (oldRow != null)
tree.rowStore().removeRow(oldRow.link());
}
wal.flush(ptr, false);
}
}
/** {@inheritDoc} */
@Override public byte[] readRaw(String key) throws IgniteCheckedException {
if (readOnly) {
if (lastUpdates != null) {
byte[] res = lastUpdates.get(key);
if (res != null)
return res != TOMBSTONE ? res : null;
}
if (empty)
return null;
}
MetastorageDataRow row = tree.findOne(new MetastorageSearchRow(key));
if (row == null)
return null;
return partStorage.readRow(row.link());
}
/** */
public void removeData(String key) throws IgniteCheckedException {
if (!readOnly) {
WALPointer ptr;
synchronized (this) {
MetastorageDataRow oldRow = tree.findOne(new MetastorageSearchRow(key));
if (oldRow == null)
return;
ptr = wal.log(new MetastoreDataRecord(key, null));
tree.removex(oldRow);
tree.rowStore().removeRow(oldRow.link());
if (oldRow.keyLink() != 0L)
tree.rowStore().removeRow(oldRow.keyLink());
}
wal.flush(ptr, false);
}
}
/** */
private void checkRootsPageIdFlag(long treeRoot, long reuseListRoot) throws StorageException {
if (PageIdUtils.flag(treeRoot) != FLAG_AUX && PageIdUtils.flag(treeRoot) != FLAG_DATA)
throw new StorageException("Wrong tree root page id flag: treeRoot="
+ U.hexLong(treeRoot) + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID);
if (PageIdUtils.flag(reuseListRoot) != FLAG_AUX && PageIdUtils.flag(reuseListRoot) != FLAG_DATA)
throw new StorageException("Wrong reuse list root page id flag: reuseListRoot="
+ U.hexLong(reuseListRoot) + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID);
}
/**
* Initializing the selected partition for use as MetaStorage
*
* @param partId Partition id.
* @return true if the partion is empty
*/
private boolean getOrAllocateMetas(int partId) throws IgniteCheckedException {
empty = false;
PageMemoryEx pageMem = (PageMemoryEx)dataRegion.pageMemory();
long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, partId);
long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, partMetaId);
try {
if (readOnly) {
long pageAddr = pageMem.readLock(METASTORAGE_CACHE_ID, partMetaId, partMetaPage);
try {
if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
empty = true;
return true;
}
PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
long treeRoot = io.getTreeRoot(pageAddr);
long reuseListRoot = io.getReuseListRoot(pageAddr);
checkRootsPageIdFlag(treeRoot, reuseListRoot);
this.treeRoot = new RootPage(new FullPageId(treeRoot, METASTORAGE_CACHE_ID), false);
this.reuseListRoot = new RootPage(new FullPageId(reuseListRoot, METASTORAGE_CACHE_ID), false);
rmvId.set(io.getGlobalRemoveId(pageAddr));
}
finally {
pageMem.readUnlock(METASTORAGE_CACHE_ID, partId, partMetaPage);
}
}
else {
boolean allocated = false;
long pageAddr = pageMem.writeLock(METASTORAGE_CACHE_ID, partMetaId, partMetaPage);
try {
long treeRoot, reuseListRoot;
if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
// Initialize new page.
PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest();
PageMetrics metrics = pageMem.metrics().cacheGrpPageMetrics(METASTORAGE_CACHE_ID);
//MetaStorage never encrypted so realPageSize == pageSize.
io.initNewPage(pageAddr, partMetaId, pageMem.pageSize(), metrics);
treeRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, FLAG_AUX);
reuseListRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, FLAG_AUX);
assert PageIdUtils.flag(treeRoot) == FLAG_AUX;
assert PageIdUtils.flag(reuseListRoot) == FLAG_AUX;
io.setTreeRoot(pageAddr, treeRoot);
io.setReuseListRoot(pageAddr, reuseListRoot);
if (PageHandler.isWalDeltaRecordNeeded(pageMem, METASTORAGE_CACHE_ID, partMetaId, partMetaPage, wal, null)) {
assert io.getType() == PageIO.T_PART_META;
wal.log(new MetaPageInitRecord(
METASTORAGE_CACHE_ID,
partMetaId,
io.getType(),
io.getVersion(),
treeRoot,
reuseListRoot
));
}
allocated = true;
}
else {
PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
treeRoot = io.getTreeRoot(pageAddr);
reuseListRoot = io.getReuseListRoot(pageAddr);
rmvId.set(io.getGlobalRemoveId(pageAddr));
checkRootsPageIdFlag(treeRoot, reuseListRoot);
}
this.treeRoot = new RootPage(new FullPageId(treeRoot, METASTORAGE_CACHE_ID), allocated);
this.reuseListRoot = new RootPage(new FullPageId(reuseListRoot, METASTORAGE_CACHE_ID), allocated);
}
finally {
pageMem.writeUnlock(METASTORAGE_CACHE_ID, partMetaId, partMetaPage, null, allocated);
}
}
}
finally {
pageMem.releasePage(METASTORAGE_CACHE_ID, partMetaId, partMetaPage);
}
return false;
}
/**
* @return Page memory.
*/
public PageMemory pageMemory() {
return dataRegion.pageMemory();
}
/** {@inheritDoc} */
@Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
Executor executor = ctx.executor();
if (executor == null) {
partStorage.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
saveStoreMetadata();
}
else {
executor.execute(() -> {
try {
partStorage.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
});
executor.execute(() -> {
try {
saveStoreMetadata();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
});
}
}
/** {@inheritDoc} */
@Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
partStorage.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
}
/** {@inheritDoc} */
@Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
/* No-op. */
}
/**
* @throws IgniteCheckedException If failed.
*/
private void saveStoreMetadata() throws IgniteCheckedException {
PageMemoryEx pageMem = (PageMemoryEx)pageMemory();
long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, partId);
long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, partMetaId);
try {
long partMetaPageAddr = pageMem.writeLock(METASTORAGE_CACHE_ID, partMetaId, partMetaPage);
if (partMetaPageAddr == 0L) {
U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage + ']');
return;
}
boolean changed = false;
try {
PagePartitionMetaIO io = PageIO.getPageIO(partMetaPageAddr);
changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId.get());
}
finally {
pageMem.writeUnlock(METASTORAGE_CACHE_ID, partMetaId, partMetaPage, null, changed);
}
}
finally {
pageMem.releasePage(METASTORAGE_CACHE_ID, partMetaId, partMetaPage);
}
}
/** */
public void applyUpdate(String key, byte[] value) throws IgniteCheckedException {
if (readOnly) {
if (lastUpdates == null)
lastUpdates = new TreeMap<>();
lastUpdates.put(key, value != null ? value : TOMBSTONE);
}
else {
if (value != null)
writeRaw(key, value);
else
removeData(key);
}
}
/** */
public Marshaller marshaller() {
return marshaller;
}
/**
* Temporary storage internal
*/
private interface TmpStorageInternal extends Closeable {
/**
* Put data
*
* @param key Key.
* @param val Value.
*/
boolean add(String key, byte[] val) throws IOException;
/**
* Read data from storage
*/
Stream<IgniteBiTuple<String, byte[]>> stream() throws IOException;
}
/**
* Temporary storage (memory)
*/
private static class MemoryTmpStorage implements TmpStorageInternal {
/** Buffer. */
final ByteBuffer buf;
/** Size. */
int size;
/**
* @param size Size.
*/
MemoryTmpStorage(int size) {
buf = ByteBuffer.allocateDirect(size);
}
/** {@inheritDoc} */
@Override public boolean add(String key, byte[] val) {
byte[] keyData = key.getBytes(StandardCharsets.UTF_8);
if (val.length + keyData.length + 8 > buf.remaining())
return false;
buf.putInt(keyData.length).putInt(val.length).put(keyData).put(val);
size++;
return true;
}
/** {@inheritDoc} */
@Override public Stream<IgniteBiTuple<String, byte[]>> stream() {
buf.flip();
return Stream.generate(() -> {
int keyLen = buf.getInt();
int dataLen = buf.getInt();
byte[] tmpBuf = new byte[Math.max(keyLen, dataLen)];
buf.get(tmpBuf, 0, keyLen);
String key = new String(tmpBuf, 0, keyLen, StandardCharsets.UTF_8);
buf.get(tmpBuf, 0, dataLen);
return new IgniteBiTuple<>(key, tmpBuf.length > dataLen ? Arrays.copyOf(tmpBuf, dataLen) : tmpBuf);
}).limit(size);
}
/** {@inheritDoc} */
@Override public void close() throws IOException {
}
}
/**
* Temporary storage (file)
*/
private static class FileTmpStorage implements TmpStorageInternal {
/** Cache. */
final ByteBuffer cache = ByteBuffer.allocateDirect(TEMPORARY_METASTORAGE_BUFFER_SIZE);
/** File. */
RandomAccessFile file;
/** Size. */
long size;
/** {@inheritDoc} */
@Override public boolean add(String key, byte[] val) throws IOException {
if (file == null)
file = new RandomAccessFile(File.createTempFile("m_storage", "bin"), "rw");
byte[] keyData = key.getBytes(StandardCharsets.UTF_8);
if (val.length + keyData.length + 8 > cache.remaining())
flushCache(false);
cache.putInt(keyData.length).putInt(val.length).put(keyData).put(val);
size++;
return true;
}
/** {@inheritDoc} */
@Override public Stream<IgniteBiTuple<String, byte[]>> stream() throws IOException {
if (file == null)
return Stream.empty();
flushCache(true);
file.getChannel().position(0);
readToCache();
return Stream.generate(() -> {
if (cache.remaining() <= 8) {
cache.compact();
try {
readToCache();
}
catch (IOException e) {
throw new IgniteException(e);
}
}
int keyLen = cache.getInt();
int dataLen = cache.getInt();
if (cache.remaining() < keyLen + dataLen) {
cache.compact();
try {
readToCache();
}
catch (IOException e) {
throw new IgniteException(e);
}
}
byte[] tmpBuf = new byte[Math.max(keyLen, dataLen)];
cache.get(tmpBuf, 0, keyLen);
String key = new String(tmpBuf, 0, keyLen, StandardCharsets.UTF_8);
cache.get(tmpBuf, 0, dataLen);
return new IgniteBiTuple<>(key, tmpBuf.length > dataLen ? Arrays.copyOf(tmpBuf, dataLen) : tmpBuf);
}).limit(size);
}
/** {@inheritDoc} */
@Override public void close() throws IOException {
file.close();
}
/**
* Read data to cache
*/
private void readToCache() throws IOException {
int len = (int)Math.min(file.length() - file.getChannel().position(), cache.remaining());
while (len > 0)
len -= file.getChannel().read(cache);
cache.flip();
}
/**
* Write cache to file.
*
* @param force force metadata.
*/
private void flushCache(boolean force) throws IOException {
if (cache.position() > 0) {
cache.flip();
while (cache.remaining() > 0)
file.getChannel().write(cache);
cache.clear();
}
file.getChannel().force(force);
}
}
/**
* Temporary storage
*/
public static class TmpStorage implements Closeable {
/** Chain of internal storages. */
final List<TmpStorageInternal> chain = new ArrayList<>(2);
/** Current internal storage. */
TmpStorageInternal current;
/** Logger. */
final IgniteLogger log;
/**
* @param memBufSize Memory buffer size.
* @param log Logger.
*/
TmpStorage(int memBufSize, IgniteLogger log) {
this.log = log;
chain.add(current = new MemoryTmpStorage(memBufSize));
}
/**
* Put data
*
* @param key Key.
* @param val Value.
*/
public void add(String key, byte[] val) throws IgniteCheckedException {
try {
while (!current.add(key, val))
chain.add(current = new FileTmpStorage());
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
}
/**
* Read data from storage
*/
public Stream<IgniteBiTuple<String, byte[]>> stream() {
return chain.stream().flatMap(storage -> {
try {
return storage.stream();
}
catch (IOException e) {
throw new IgniteException(e);
}
});
}
/** {@inheritDoc} */
@Override public void close() throws IOException {
for (TmpStorageInternal storage : chain) {
try {
storage.close();
}
catch (IOException ex) {
log.error(ex.getMessage(), ex);
}
}
}
}
}