blob: e0ecccab790d54c798035aba1d68bb0ef1f92f7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.database;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.Page;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
/**
* Metadata storage.
*/
public class MetadataStorage implements MetaStore {
/** */
private PageMemory pageMem;
/**
* @param pageMem Page memory.
*/
public MetadataStorage(PageMemory pageMem) {
this.pageMem = pageMem;
}
/** {@inheritDoc} */
@Override public synchronized IgniteBiTuple<FullPageId, Boolean> getOrAllocateForIndex(int cacheId, String idxName)
throws IgniteCheckedException {
byte[] idxNameBytes = idxName.getBytes(StandardCharsets.UTF_8);
FullPageId metaId = metaPage(cacheId);
Page meta = pageMem.page(metaId);
try {
while (true) {
SearchState state = new SearchState();
FullPageId existingIdxRoot = tryFindIndexRoot(meta, idxNameBytes, cacheId, state);
if (existingIdxRoot != null)
return F.t(existingIdxRoot, false);
FullPageId allocatedRoot = allocateAndWriteIndexRoot(meta, cacheId, idxNameBytes, state);
if (allocatedRoot != null)
return F.t(allocatedRoot, true);
// else retry.
}
}
finally {
pageMem.releasePage(meta);
}
}
/**
* @param meta Cache metadata start page.
* @param idxNameBytes Index name to find.
* @return Non-zero root page iD if an index with the given name was found.
*/
private FullPageId tryFindIndexRoot(Page meta, byte[] idxNameBytes, int cacheId, SearchState state) throws IgniteCheckedException {
ByteBuffer buf = meta.getForRead();
try {
// Save version
state.ver = meta.version();
long nextPageId = buf.getLong();
state.writePage = meta.id();
FullPageId rootPageId = scanForIndexRoot(buf, idxNameBytes, cacheId);
if (rootPageId != null)
return rootPageId;
while (nextPageId > 0) {
// Meta page.
Page nextPage = pageMem.page(new FullPageId(nextPageId, 0));
try {
state.writePage = nextPageId;
buf = nextPage.getForRead();
try {
nextPageId = buf.getLong();
rootPageId = scanForIndexRoot(buf, idxNameBytes, cacheId);
if (rootPageId != null)
return rootPageId;
}
finally {
nextPage.releaseRead();
}
}
finally {
pageMem.releasePage(nextPage);
}
}
state.position = buf.position();
}
finally {
meta.releaseRead();
}
return null;
}
/**
* @param meta Cache metadata page.
* @param cacheId Cache ID.
* @param idxNameBytes Index name bytes.
* @param state Search state.
* @return Root page ID for the index.
*/
private FullPageId allocateAndWriteIndexRoot(
Page meta,
int cacheId,
byte[] idxNameBytes,
SearchState state
) throws IgniteCheckedException {
ByteBuffer buf = meta.getForWrite();
try {
// The only case 0 can be returned.
if (meta.version() != state.ver)
return null;
// Otherwise it is safe to allocate and write data or link new page directly to the saved page.
long writePageId = state.writePage;
Page writePage;
ByteBuffer writeBuf;
if (writePageId == meta.id()) {
writePage = meta;
writeBuf = buf;
}
else {
writePage = pageMem.page(new FullPageId(writePageId, 0));
writeBuf = writePage.getForWrite();
}
try {
long nextPageId = writeBuf.getLong();
assert nextPageId == 0: "[nextPageId=" + U.hexLong(nextPageId) + ", writePageId=" + U.hexLong(writePageId) + ']';
// Position buffer to the last record.
writeBuf.position(state.position);
FullPageId idxRoot = pageMem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_META);
if (writeBuf.remaining() < idxNameBytes.length + 9) {
// Link new meta page.
FullPageId newMeta = pageMem.allocatePage(0, 0, PageIdAllocator.FLAG_META);
writeBuf.putLong(0, newMeta.pageId());
// Release old write-locked page.
if (writePageId != meta.id()) {
writePage.incrementVersion();
writePage.releaseWrite(true);
pageMem.releasePage(writePage);
}
writePage = pageMem.page(newMeta);
writeBuf = writePage.getForWrite();
writePageId = newMeta.pageId();
// Next page, just need to make an offset.
writeBuf.putLong(0);
}
writeBuf.put((byte)idxNameBytes.length);
writeBuf.put(idxNameBytes);
writeBuf.putLong(idxRoot.pageId());
return idxRoot;
}
finally {
if (writePageId != meta.id()) {
writePage.incrementVersion();
writePage.releaseWrite(true);
pageMem.releasePage(writePage);
}
}
}
finally {
meta.releaseWrite(true);
}
}
/**
* @param buf Byte buffer to scan for the index.
* @param idxName Index name.
* @return Non-zero value if index with the given name was found.
*/
private FullPageId scanForIndexRoot(ByteBuffer buf, byte[] idxName, int cacheId) {
// 10 = 1 byte per size + 1 byte minimal index name + 8 bytes page id.
while (buf.remaining() >= 10) {
int nameSize = buf.get() & 0xFF;
if (nameSize == 0) {
// Rewind.
buf.position(buf.position() - 1);
return null;
}
byte[] name = new byte[nameSize];
buf.get(name);
long pageId = buf.getLong();
if (Arrays.equals(name, idxName))
return new FullPageId(pageId, cacheId);
}
return null;
}
/**
* @param cacheId Cache ID to get meta page for.
* @return Meta page.
*/
private FullPageId metaPage(int cacheId) throws IgniteCheckedException {
Page meta = pageMem.metaPage();
try {
boolean written = false;
ByteBuffer buf = meta.getForWrite();
try {
int cnt = buf.getShort() & 0xFFFF;
int cnt0 = cnt;
int writePos = 0;
while (cnt0 > 0) {
int readId = buf.getInt();
long pageId = buf.getLong();
if (readId != 0) {
if (readId == cacheId)
return new FullPageId(pageId, 0);
cnt0--;
}
else
writePos = buf.position() - 12;
}
if (writePos != 0)
buf.position(writePos);
FullPageId fullId = pageMem.allocatePage(0, 0, PageIdAllocator.FLAG_META);
assert !fullId.equals(meta.fullId()) : "Duplicate page allocated " +
"[metaId=" + meta.fullId() + ", allocated=" + fullId + ']';
buf.putInt(cacheId);
buf.putLong(fullId.pageId());
written = true;
buf.putShort(0, (short)(cnt + 1));
return fullId;
}
finally {
meta.incrementVersion();
meta.releaseWrite(written);
}
}
finally {
pageMem.releasePage(meta);
}
}
/**
* Search state.
*/
private static class SearchState {
/** */
private int ver;
/** */
private long writePage;
/** */
private int position;
}
}