blob: f5709e46982205b9681fd545a7327291c6f24ef7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
import org.apache.iotdb.db.exception.metadata.schemafile.SchemaPageOverflowException;
import org.apache.iotdb.db.exception.metadata.schemafile.SegmentNotFoundException;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SegmentedPage extends SchemaPage implements ISegmentedPage {
// segment address array inside a page, map segmentIndex -> segmentOffset
// if only one full-page segment inside, it still stores the offset
// TODO offset bits of segment never removed since it is 'sequential-indexed'
private final transient List<Short> segOffsetLst;
// maintains leaf segment instance inside this page, lazily instantiated
// map segmentIndex -> segmentInstance
private final transient Map<Short, ISegment<ByteBuffer, ICachedMNode>> segCacheMap;
/**
* This class is aimed to manage space inside one page.
*
* <p>A segment inside a page has 3 representation: index, offset and instance. <br>
*
* <ul>
* <li>Index is meant to decouple file-wide indexing with in-page compaction
* <li>Offset is meant for in-page indexing
* <li>Segment instance is meant for records manipulations
* </ul>
*
* <b>Page Header Structure as in {@linkplain ISchemaPage}.</b>
*
* <p>Page Body Structure:
*
* <ul>
* <li>var length * memberNum: {@linkplain WrappedSegment} contains serialized ICacheMNodes.
* <li>... spare space...
* <li>2 bytes * memberNum: offset of segments, using marking deletion as {@linkplain
* #deleteSegment} mentioned.
* </ul>
*/
public SegmentedPage(ByteBuffer pageBuffer, AtomicInteger ai, ReadWriteLock rwl) {
super(pageBuffer, ai, rwl);
segCacheMap = new ConcurrentHashMap<>();
segOffsetLst = new ArrayList<>();
pageBuffer.position(pageBuffer.capacity() - SchemaFileConfig.SEG_OFF_DIG * memberNum);
for (int idx = 0; idx < memberNum; idx++) {
segOffsetLst.add(ReadWriteIOUtils.readShort(pageBuffer));
}
}
public SegmentedPage(ByteBuffer pageBuffer) {
super(pageBuffer, new AtomicInteger(), new ReentrantReadWriteLock());
segCacheMap = new ConcurrentHashMap<>();
segOffsetLst = new ArrayList<>();
pageBuffer.position(pageBuffer.capacity() - SchemaFileConfig.SEG_OFF_DIG * memberNum);
for (int idx = 0; idx < memberNum; idx++) {
segOffsetLst.add(ReadWriteIOUtils.readShort(pageBuffer));
}
}
// region Interface Implementation
@Override
public long write(short segIdx, String key, ByteBuffer buffer) throws MetadataException {
ISegment<ByteBuffer, ICachedMNode> tarSeg = getSegment(segIdx);
if (tarSeg.insertRecord(key, buffer) < 0) {
short spare = spareSize;
// relocate inside page, if not enough space for new size segment, throw exception
tarSeg =
relocateSegment(
tarSeg, segIdx, SchemaFile.reEstimateSegSize(tarSeg.size() + buffer.capacity()));
if (tarSeg == null) {
return -1;
}
// relocated but still not enough
if (tarSeg.insertRecord(key, buffer) < 0) {
throw new MetadataException("failed to insert buffer into relocated segment");
}
return spare >= spareSize ? 0L : spareSize - spare;
}
return 0L;
}
@Override
public ICachedMNode read(short segIdx, String key) throws MetadataException {
return getSegment(segIdx).getRecordByKey(key);
}
@Override
public ICachedMNode readByAlias(short segIdx, String alias) throws MetadataException {
return getSegment(segIdx).getRecordByAlias(alias);
}
@Override
public long update(short segIdx, String key, ByteBuffer buffer) throws MetadataException {
ISegment<ByteBuffer, ICachedMNode> seg = getSegment(segIdx);
if (seg.updateRecord(key, buffer) < 0) {
short spare = spareSize;
seg =
relocateSegment(
seg, segIdx, SchemaFile.reEstimateSegSize(seg.size() + buffer.capacity()));
if (seg == null) {
return -1;
}
if (seg.updateRecord(key, buffer) < 0) {
throw new MetadataException("failed to update buffer upon relocated segment");
}
return spare >= spareSize ? 0 : spareSize - spare;
}
return 0L;
}
@Override
public Queue<ICachedMNode> getChildren(short segId) throws MetadataException {
return getSegment(segId).getAllRecords();
}
@Override
public void removeRecord(short segId, String key) throws SegmentNotFoundException {
getSegment(segId).removeRecord(key);
}
/**
* Implementing marking deletion will not modify {@linkplain #memberNum} nor truncate {@linkplain
* #segOffsetLst}.
*/
@Override
public synchronized void deleteSegment(short segId) throws SegmentNotFoundException {
spareSize += getSegmentSize(segId);
getSegment(segId).delete();
segCacheMap.remove(segId);
segOffsetLst.set(segId, (short) -1);
}
@Override
public void purgeSegments() {
segCacheMap.clear();
segOffsetLst.clear();
memberNum = 0;
spareOffset = SchemaFileConfig.PAGE_HEADER_SIZE;
spareSize = (short) (pageBuffer.capacity() - SchemaFileConfig.PAGE_HEADER_SIZE);
}
@Override
public int validSegments() {
return memberNum;
}
@Override
public short getSpareSize() {
return spareSize;
}
@Override
public boolean isCapableForSegSize(short size) {
return spareSize >= size + SchemaFileConfig.SEG_OFF_DIG;
}
@Override
public short getSegmentSize(short segId) throws SegmentNotFoundException {
return getSegment(segId).size();
}
@Override
public void getPageBuffer(ByteBuffer dst) {
this.pageBuffer.clear();
dst.put(this.pageBuffer);
}
@Override
public synchronized short allocNewSegment(short size) throws MetadataException {
ISegment<ByteBuffer, ICachedMNode> newSeg =
WrappedSegment.initAsSegment(allocSpareBufferSlice(size));
if (newSeg == null) {
compactSegments();
newSeg = WrappedSegment.initAsSegment(allocSpareBufferSlice(size));
}
if (newSeg == null) {
throw new SchemaPageOverflowException(pageIndex);
}
return registerNewSegment(newSeg);
}
@Override
public long transplantSegment(ISegmentedPage srcPage, short segId, short newSegSize)
throws MetadataException {
if (!isCapableForSegSize(newSegSize)) {
throw new SchemaPageOverflowException(pageIndex);
}
if (maxAppendableSegmentSize() < newSegSize) {
compactSegments();
}
ByteBuffer newBuf = ByteBuffer.allocate(newSegSize);
srcPage.extendsSegmentTo(newBuf, segId);
newBuf.clear();
this.pageBuffer.clear();
this.pageBuffer.position(spareOffset);
this.pageBuffer.put(newBuf);
this.pageBuffer.position(spareOffset);
this.pageBuffer.limit(spareOffset + newSegSize);
ISegment<ByteBuffer, ICachedMNode> newSeg =
WrappedSegment.loadAsSegment(this.pageBuffer.slice());
// registerNewSegment will modify page status considering the new segment
return SchemaFile.getGlobalIndex(pageIndex, registerNewSegment(newSeg));
}
@Override
public void extendsSegmentTo(ByteBuffer dstBuffer, short segId) throws MetadataException {
getSegment(segId).extendsTo(dstBuffer);
}
@Override
public void setNextSegAddress(short segId, long address) throws SegmentNotFoundException {
getSegment(segId).setNextSegAddress(address);
}
@Override
public long getNextSegAddress(short segId) throws SegmentNotFoundException {
return getSegment(segId).getNextSegAddress();
}
@Override
public String splitWrappedSegment(
String key, ByteBuffer recBuf, ISchemaPage dstPage, boolean inclineSplit)
throws MetadataException {
// only full page leaf segment can be split
if (segOffsetLst.size() != 1
|| segOffsetLst.get(0) != SchemaFileConfig.PAGE_HEADER_SIZE
|| getSegment((short) 0).size() != SchemaFileConfig.SEG_MAX_SIZ) {
throw new SegmentNotFoundException(pageIndex);
}
return getSegment((short) 0)
.splitByKey(key, recBuf, dstPage.getEntireSegmentSlice(), inclineSplit);
}
@Override
public String inspect() throws SegmentNotFoundException {
syncPageBuffer();
StringBuilder builder =
new StringBuilder(
String.format(
"page_id:%d, total_seg:%d, spare_from:%d, spare_size:%d%n",
pageIndex, memberNum, spareOffset, spareSize));
for (short idx = 0; idx < segOffsetLst.size(); idx++) {
short offset = segOffsetLst.get(idx);
if (offset < 0) {
builder.append(String.format("seg_id:%d deleted, offset:%d%n", idx, offset));
} else {
ISegment<?, ?> seg = getSegment(idx);
builder.append(
String.format(
"seg_id:%d, offset:%d, address:%s, next_seg:%s, %s%n",
idx,
offset,
Long.toHexString(SchemaFile.getGlobalIndex(pageIndex, idx)),
seg.getNextSegAddress() == -1 ? -1 : Long.toHexString(seg.getNextSegAddress()),
seg.inspect()));
}
}
return builder.toString();
}
@Override
public synchronized void syncPageBuffer() {
super.syncPageBuffer();
for (Map.Entry<Short, ISegment<ByteBuffer, ICachedMNode>> entry : segCacheMap.entrySet()) {
entry.getValue().syncBuffer();
}
pageBuffer.position(SchemaFileConfig.PAGE_LENGTH - memberNum * SchemaFileConfig.SEG_OFF_DIG);
for (short offset : segOffsetLst) {
ReadWriteIOUtils.write(offset, pageBuffer);
}
}
@Override
public ISegmentedPage getAsSegmentedPage() {
return this;
}
@Override
public ByteBuffer getEntireSegmentSlice() throws MetadataException {
if (segOffsetLst.size() != 1
|| segOffsetLst.get(0) != SchemaFileConfig.PAGE_HEADER_SIZE
|| spareSize != 0
|| spareOffset != this.pageBuffer.capacity() - SchemaFileConfig.SEG_OFF_DIG) {
throw new MetadataException(
"SegmentedPage can share entire buffer slice only when it contains one MAX SIZE segment.");
}
// buffer may be modified, segment instance shall be abolished
segCacheMap.clear();
synchronized (this.pageBuffer) {
this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE);
this.pageBuffer.limit(this.pageBuffer.capacity() - SchemaFileConfig.SEG_OFF_DIG);
return this.pageBuffer.slice();
}
}
// endregion
// region Segment Getters
/**
* Retrieve leaf segment instance by index, instantiated and add to cache map if not yet.
*
* @param index index rather than offset of the segment
* @return null if InternalSegment, otherwise instance
*/
private ISegment<ByteBuffer, ICachedMNode> getSegment(short index)
throws SegmentNotFoundException {
if (segOffsetLst.size() <= index || segOffsetLst.get(index) < 0) {
throw new SegmentNotFoundException(pageIndex, index);
}
synchronized (segCacheMap) {
if (segCacheMap.containsKey(index)) {
return segCacheMap.get(index);
}
}
// duplicate for concurrent event
ByteBuffer bufferR = this.pageBuffer.duplicate();
bufferR.clear();
bufferR.position(getSegmentOffset(index));
bufferR.limit(bufferR.position() + WrappedSegment.getSegBufLen(bufferR));
ISegment<ByteBuffer, ICachedMNode> res;
try {
res = WrappedSegment.loadAsSegment(bufferR.slice());
} catch (RecordDuplicatedException e) {
throw new SegmentNotFoundException(e.getMessage());
}
synchronized (segCacheMap) {
if (segCacheMap.containsKey(index)) {
return segCacheMap.get(index);
}
segCacheMap.put(index, res);
return res;
}
}
private short getSegmentOffset(short index) throws SegmentNotFoundException {
if (index >= segOffsetLst.size() || segOffsetLst.get(index) < 0) {
throw new SegmentNotFoundException(pageIndex, index);
}
return segOffsetLst.get(index);
}
// endregion
// region Page Space Management
/**
* Allocate a new segment to extend specified segment, modify cache map and list.
*
* <p><b> The new segment could be allocated from spare space or rearranged space.</b>
*
* @param seg original segment instance
* @param segIdx original segment index
* @param newSize target segment size
* @return reallocated segment instance, null if no enough spare space
*/
private ISegment<ByteBuffer, ICachedMNode> relocateSegment(
ISegment<?, ?> seg, short segIdx, short newSize) throws MetadataException {
if (seg.size() == SchemaFileConfig.SEG_MAX_SIZ || getSpareSize() + seg.size() < newSize) {
return null;
}
// try to allocate space directly from spareOffset or rearrange and extend in place
ByteBuffer newBuffer = allocSpareBufferSlice(newSize);
if (newBuffer == null) {
rearrangeSegments(segIdx);
return extendSegmentInPlace(segIdx, seg.size(), newSize);
}
// allocate buffer slice successfully
seg.extendsTo(newBuffer);
ISegment<ByteBuffer, ICachedMNode> newSeg = WrappedSegment.loadAsSegment(newBuffer);
// since this buffer is allocated from pageSpareOffset, new spare offset can simply add size up
segOffsetLst.set(segIdx, spareOffset);
segCacheMap.put(segIdx, newSeg);
spareOffset += newSeg.size();
spareSize -= (short) (newSize - seg.size());
return newSeg;
}
private short maxAppendableSegmentSize() {
return (short)
(pageBuffer.limit() - SchemaFileConfig.SEG_OFF_DIG * (memberNum + 1) - spareOffset);
}
/**
* This method will allocate DIRECTLY from {@link #spareOffset} and return corresponding
* ByteBuffer slice. It will not update {@link #segOffsetLst} nor {@link #segCacheMap}, since no
* segment initiated here.
*
* @param size target size of the ByteBuffer
* @return ByteBuffer return null if {@linkplain SchemaPageOverflowException} to improve
* efficiency
*/
private ByteBuffer allocSpareBufferSlice(short size) {
// check whether enough space to be directly allocate
if (this.pageBuffer.capacity() - spareOffset - memberNum * SchemaFileConfig.SEG_OFF_DIG
< size + SchemaFileConfig.SEG_OFF_DIG) {
// since this may occur frequently, throw exception here may be inefficient
return null;
}
pageBuffer.clear();
pageBuffer.position(spareOffset);
pageBuffer.limit(spareOffset + size);
return pageBuffer.slice();
}
/**
* To compact segments further, set deleted segments offset to -1 It modifies pageSpareOffset if
* more space released. Over-write stash segments with existed segments.
*/
private void compactSegments() {
this.rearrangeSegments((short) -1);
}
/**
* Compact segments and move target segment (id at idx) to the tail of segments.<br>
* Since this method may overwrite segment buffer, all existed buffer instance shall be abolished.
*/
private synchronized void rearrangeSegments(short idx) {
// all segment instance shall be abolished
syncPageBuffer();
segCacheMap.clear();
ByteBuffer mirrorPage = ByteBuffer.allocate(this.pageBuffer.capacity());
this.pageBuffer.clear();
mirrorPage.put(this.pageBuffer);
this.pageBuffer.clear();
spareOffset = SchemaFileConfig.PAGE_HEADER_SIZE;
short offset, len;
for (short i = 0; i < segOffsetLst.size(); i++) {
if (segOffsetLst.get(i) >= 0 && i != idx) {
// except for target segment, compact other valid segment
offset = segOffsetLst.get(i);
mirrorPage.clear();
this.pageBuffer.clear();
mirrorPage.position(offset);
len = WrappedSegment.getSegBufLen(mirrorPage);
mirrorPage.limit(offset + len);
this.segOffsetLst.set(i, spareOffset);
this.pageBuffer.position(spareOffset);
this.pageBuffer.put(mirrorPage);
spareOffset += len;
}
}
// a negative idx meant for only compaction
if (idx >= 0) {
this.pageBuffer.clear();
this.pageBuffer.position(spareOffset);
mirrorPage.clear();
mirrorPage.position(segOffsetLst.get(idx));
len = WrappedSegment.getSegBufLen(mirrorPage);
mirrorPage.limit(mirrorPage.position() + len);
this.pageBuffer.put(mirrorPage);
segOffsetLst.set(idx, spareOffset);
spareOffset += len;
}
spareSize =
(short)
(this.pageBuffer.capacity() - spareOffset - memberNum * SchemaFileConfig.SEG_OFF_DIG);
}
/**
* This method checks and extends the last segment to a designated size.
*
* @param segId segment id
* @param oriSegSize size of the target segment
* @param newSize extended size
* @return extended segment based on page buffer
*/
private ISegment<ByteBuffer, ICachedMNode> extendSegmentInPlace(
short segId, short oriSegSize, short newSize) throws MetadataException {
// extend segment, modify pageSpareOffset, segCacheMap
short offset = getSegmentOffset(segId);
// only last segment could extend in-place
if (offset + oriSegSize != spareOffset) {
throw new SegmentNotFoundException(segId);
}
// extend to a temporary buffer
ByteBuffer newBuffer = ByteBuffer.allocate(newSize);
getSegment(segId).extendsTo(newBuffer);
// write back the buffer content
pageBuffer.clear();
pageBuffer.position(offset);
newBuffer.clear();
pageBuffer.put(newBuffer);
// pass page buffer slice to instantiate segment
pageBuffer.position(offset);
pageBuffer.limit(offset + newSize);
ISegment<ByteBuffer, ICachedMNode> newSeg = WrappedSegment.loadAsSegment(pageBuffer.slice());
// modify status
segOffsetLst.set(segId, offset);
segCacheMap.put(segId, newSeg);
spareOffset = (short) (offset + newSeg.size());
spareSize -= (newSize - oriSegSize);
return newSeg;
}
public void updateRecordSegAddr(short segId, String key, long newSegAddr)
throws SegmentNotFoundException {
ISegment<ByteBuffer, ICachedMNode> seg = getSegment(segId);
// TODO: add to interface
((WrappedSegment) seg).updateRecordSegAddr(key, newSegAddr);
}
/**
* Register segment instance to segCacheMap and segOffList, modify pageSpareOffset and segNum
* respectively
*
* @param seg the segment to register
* @return index of the segment
*/
private synchronized short registerNewSegment(ISegment<ByteBuffer, ICachedMNode> seg)
throws MetadataException {
short thisIndex = (short) segOffsetLst.size();
if (segCacheMap.containsKey(thisIndex)) {
throw new MetadataException(
String.format("Segment cache map inconsistent with segment list in page %d.", pageIndex));
}
segCacheMap.put(thisIndex, seg);
segOffsetLst.add(spareOffset);
spareOffset += seg.size();
spareSize -= seg.size() + 2;
memberNum += 1;
return thisIndex;
}
// endregion
// region Test Only Methos
@TestOnly
@Override
public WrappedSegment getSegmentOnTest(short idx) throws SegmentNotFoundException {
return (WrappedSegment) getSegment(idx);
}
// endregion
}