| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile; |
| |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException; |
| import org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException; |
| |
| import org.apache.tsfile.utils.ReadWriteIOUtils; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayDeque; |
| import java.util.Queue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.ReadWriteLock; |
| |
| /** |
| * A page which acts like a segment, manages index entry of the b+ tree constructed by MNode with |
| * massive children. <br> |
| * Notice that {@link #spareOffset} in this class means start offset of keys. |
| */ |
| public class InternalPage extends SchemaPage implements ISegment<Integer, Integer> { |
| |
| protected static int COMPOUND_POINT_LENGTH = 8; |
| |
| private long firstLeaf; |
| private int subIndexPage; |
| |
| private String penultKey, lastKey; |
| |
| /** |
| * <b>Compound pointers will not be deserialized as any Java Objects since it may contains massive |
| * entries (maybe more than 500), and binary search and insert will be conducted DIRECTLY on |
| * {@link #pageBuffer}.</b> |
| * |
| * <p>Logically, pointers and keys are like: <br> |
| * P_0, K_1, P_1, K_2, P_2, ... K_n-1, P_n-1<br> |
| * Obviously Pi and Ki share the index, and P0 has no corresponding key for which it should be |
| * initiated when constructed. By addition, n above is denoted by {@link #memberNum}. |
| * |
| * <p><b>Page Header Structure as in {@linkplain ISchemaPage}.</b> |
| * |
| * <p>Page Body Structure: |
| * |
| * <ul> |
| * <li>8 bytes * memberNum: compound pointer as {@linkplain #compoundPointer} mentioned. |
| * <li>... spare space... |
| * <li>var length * (memberNum-1): keys corresponding to the pointers |
| * </ul> |
| */ |
| public InternalPage(ByteBuffer pageBuffer) { |
| super(pageBuffer); |
| firstLeaf = ReadWriteIOUtils.readLong(pageBuffer); |
| subIndexPage = ReadWriteIOUtils.readInt(pageBuffer); |
| } |
| |
| /** compatible constructor for replacement */ |
| public InternalPage(ByteBuffer pageBuffer, AtomicInteger ai, ReadWriteLock rwl) { |
| super(pageBuffer, ai, rwl); |
| firstLeaf = ReadWriteIOUtils.readLong(pageBuffer); |
| subIndexPage = ReadWriteIOUtils.readInt(pageBuffer); |
| } |
| |
| @Override |
| public int insertRecord(String key, Integer pointer) throws RecordDuplicatedException { |
| // TODO: remove debug parameter INTERNAL_SPLIT_VALVE |
| if (spareSize |
| < COMPOUND_POINT_LENGTH |
| + 4 |
| + key.getBytes().length |
| + SchemaFileConfig.INTERNAL_SPLIT_VALVE) { |
| return -1; |
| } |
| |
| // check whether key already exists |
| int pos = getIndexByKey(key); |
| if (pos != 0 && getKeyByIndex(pos).equals(key)) { |
| return spareSize; |
| } |
| |
| if (SchemaFileConfig.PAGE_HEADER_SIZE |
| + COMPOUND_POINT_LENGTH * (memberNum + 1) |
| + 4 |
| + key.getBytes().length |
| > spareOffset) { |
| compactKeys(); |
| } |
| |
| // append key |
| this.pageBuffer.clear(); |
| this.spareOffset = (short) (this.spareOffset - key.getBytes().length - 4); |
| this.pageBuffer.position(spareOffset); |
| ReadWriteIOUtils.write(key, this.pageBuffer); |
| |
| int migNum = memberNum - pos - 1; |
| if (migNum > 0) { |
| // move compound pointers |
| ByteBuffer buf = ByteBuffer.allocate(migNum * COMPOUND_POINT_LENGTH); |
| this.pageBuffer.limit(SchemaFileConfig.PAGE_HEADER_SIZE + COMPOUND_POINT_LENGTH * memberNum); |
| this.pageBuffer.position( |
| SchemaFileConfig.PAGE_HEADER_SIZE + COMPOUND_POINT_LENGTH * (pos + 1)); |
| buf.put(this.pageBuffer); |
| |
| this.pageBuffer.position( |
| SchemaFileConfig.PAGE_HEADER_SIZE + COMPOUND_POINT_LENGTH * (pos + 1)); |
| ReadWriteIOUtils.write(compoundPointer(pointer, spareOffset), this.pageBuffer); |
| |
| buf.clear(); |
| this.pageBuffer.limit(this.pageBuffer.limit() + COMPOUND_POINT_LENGTH); |
| this.pageBuffer.put(buf); |
| } else { |
| // append compound pointer |
| this.pageBuffer.limit(this.pageBuffer.capacity()); |
| this.pageBuffer.position( |
| SchemaFileConfig.PAGE_HEADER_SIZE + memberNum * COMPOUND_POINT_LENGTH); |
| ReadWriteIOUtils.write(compoundPointer(pointer, spareOffset), this.pageBuffer); |
| } |
| |
| spareSize -= (key.getBytes().length + 4 + COMPOUND_POINT_LENGTH); |
| memberNum++; |
| |
| penultKey = lastKey; |
| lastKey = key; |
| return spareSize; |
| } |
| |
| @Override |
| public int updateRecord(String key, Integer buffer) throws MetadataException { |
| return 0; |
| } |
| |
| @Override |
| public int removeRecord(String key) { |
| return 0; |
| } |
| |
| @Override |
| public Integer getRecordByKey(String key) { |
| return pageIndex(getPointerByIndex(getIndexByKey(key))); |
| } |
| |
| @Override |
| public Integer getRecordByAlias(String alias) { |
| return null; |
| } |
| |
| @Override |
| public boolean hasRecordKey(String key) { |
| int pos = getIndexByKey(key); |
| return (pos != 0) && key.equals(getKeyByIndex(pos)); |
| } |
| |
| @Override |
| public boolean hasRecordAlias(String alias) { |
| return false; |
| } |
| |
| @Override |
| public Queue<Integer> getAllRecords() throws MetadataException { |
| Queue<Integer> res = new ArrayDeque<>(this.memberNum); |
| for (int i = 0; i < this.memberNum; i++) { |
| res.add(pageIndex(getPointerByIndex(i))); |
| } |
| return res; |
| } |
| |
| @Override |
| public void syncBuffer() { |
| syncPageBuffer(); |
| } |
| |
| @Override |
| public short size() { |
| return (short) this.pageBuffer.capacity(); |
| } |
| |
| @Override |
| public short getSpareSize() { |
| return spareSize; |
| } |
| |
| @Override |
| public void delete() {} |
| |
| @Override |
| public long getNextSegAddress() { |
| return firstLeaf; |
| } |
| |
| @Override |
| public void setNextSegAddress(long nextSegAddress) { |
| firstLeaf = nextSegAddress; |
| } |
| |
| @Override |
| public int getSubIndex() { |
| return subIndexPage; |
| } |
| |
| @Override |
| public void setSubIndex(int pid) { |
| this.subIndexPage = pid; |
| syncPageBuffer(); |
| } |
| |
| @Override |
| public void extendsTo(ByteBuffer newBuffer) throws MetadataException { |
| if (newBuffer.capacity() != this.pageBuffer.capacity()) { |
| throw new MetadataException("InternalPage can only extend to buffer with same capacity."); |
| } |
| |
| syncPageBuffer(); |
| this.pageBuffer.clear(); |
| newBuffer.clear(); |
| |
| newBuffer.put(this.pageBuffer); |
| } |
| |
| @Override |
| public String splitByKey(String key, Integer tPk, ByteBuffer dstBuffer, boolean inclineSplit) |
| throws MetadataException { |
| // TODO: initiation and registration methods in SchemaFile |
| if (dstBuffer.capacity() != this.pageBuffer.capacity()) { |
| throw new MetadataException("Segments only split with same capacity."); |
| } |
| |
| if (key == null || tPk == null) { |
| throw new MetadataException("Internal Segment cannot split without insert key"); |
| } |
| |
| if (this.memberNum < 2) { |
| throw new MetadataException("Segment has less than 2 pointers can not be split."); |
| } |
| |
| int pk = tPk; |
| // whether to implement inclined split |
| boolean monotonic = |
| inclineSplit |
| && (lastKey != null) |
| && (penultKey != null) |
| && ((key.compareTo(lastKey)) * (lastKey.compareTo(penultKey)) > 0); |
| |
| // search key for split segment |
| String searchKey = null; |
| |
| // this method BREAKS envelop of the passing in buffer to be more efficient |
| // attributes for dstBuffer |
| short spareOffset, memberNum, spareSize; |
| long firstLeaf = this.firstLeaf; |
| int subIdx = this.subIndexPage; |
| |
| int pos = getIndexByKey(key); |
| |
| dstBuffer.clear(); |
| this.pageBuffer.clear(); |
| // bulk split will not compact buffer immediately, thus save some time |
| if (SchemaFileConfig.BULK_SPLIT && pos == 0 && monotonic) { |
| // insert key is the smallest key, migrate all existed keys |
| spareOffset = this.spareOffset; |
| dstBuffer.position(this.spareOffset); |
| this.pageBuffer.position(this.spareOffset); |
| dstBuffer.put(this.pageBuffer); |
| |
| // migrate p1 to p_n-1, offset of each pointer shall not be modified |
| dstBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE); |
| this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + COMPOUND_POINT_LENGTH); |
| this.pageBuffer.limit( |
| SchemaFileConfig.PAGE_HEADER_SIZE + COMPOUND_POINT_LENGTH * this.memberNum); |
| dstBuffer.put(this.pageBuffer); |
| |
| // k1 is the search key for split segment, and not valid key in split segment |
| searchKey = getKeyByIndex(1); |
| memberNum = (short) (this.memberNum - 1); |
| spareSize = |
| (short) |
| (spareOffset |
| - SchemaFileConfig.PAGE_HEADER_SIZE |
| - COMPOUND_POINT_LENGTH * memberNum |
| + searchKey.getBytes().length |
| + 4); |
| |
| // only key in this.pageBuffer |
| this.memberNum = 2; |
| this.spareOffset = (short) (this.pageBuffer.capacity() - key.getBytes().length - 4); |
| this.spareSize = |
| (short) |
| (this.spareOffset |
| - SchemaFileConfig.PAGE_HEADER_SIZE |
| - this.memberNum * COMPOUND_POINT_LENGTH); |
| |
| this.pageBuffer.clear(); |
| this.pageBuffer.position(this.spareOffset); |
| ReadWriteIOUtils.write(key, this.pageBuffer); |
| this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + COMPOUND_POINT_LENGTH); |
| ReadWriteIOUtils.write(compoundPointer(pk, this.spareOffset), this.pageBuffer); |
| } else if (SchemaFileConfig.BULK_SPLIT && pos == this.memberNum - 1 && monotonic) { |
| // only p_n-1 and key will be written into split segment |
| spareOffset = (short) (dstBuffer.capacity() - key.getBytes().length - 4); |
| dstBuffer.position(spareOffset); |
| ReadWriteIOUtils.write(key, dstBuffer); |
| dstBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE); |
| ReadWriteIOUtils.write(getPointerByIndex(this.memberNum - 1), dstBuffer); |
| ReadWriteIOUtils.write(compoundPointer(pk, spareOffset), dstBuffer); |
| |
| memberNum = 2; |
| spareSize = |
| (short) |
| (spareOffset - SchemaFileConfig.PAGE_HEADER_SIZE - memberNum * COMPOUND_POINT_LENGTH); |
| |
| // remove k_n-1 and p_n-1 from this.pageBuffer |
| String removedKey = getKeyByIndex(this.memberNum - 1); |
| searchKey = removedKey; |
| this.memberNum -= 1; |
| this.spareSize += (short) (removedKey.getBytes().length + 4 + COMPOUND_POINT_LENGTH); |
| } else { |
| // supposing splitPos is an index of a virtual array of ordered keys |
| // the virtual array includes the insert, indexed from 1 to n (n for this.memberNum) |
| |
| int splitPos; |
| // insert key always belongs to the bigger part |
| if (monotonic) { |
| splitPos = |
| key.compareTo(lastKey) > 0 |
| ? Math.max(pos, (this.memberNum + 1) / 2) |
| : Math.min(pos + 1, (this.memberNum + 1) / 2); |
| } else { |
| splitPos = (this.memberNum + 1) / 2; |
| } |
| |
| // since an edge key cannot be split, it shall not be 1 or n |
| if (splitPos <= 1 || splitPos == this.memberNum) { |
| splitPos = splitPos <= 1 ? 2 : this.memberNum - 1; |
| } |
| |
| // prepare to migrate split segment |
| ByteBuffer tempPtrBuffer = ByteBuffer.allocate(COMPOUND_POINT_LENGTH * (this.memberNum + 1)); |
| spareOffset = (short) dstBuffer.capacity(); |
| memberNum = 0; |
| |
| // ptr and key to be migrated |
| long mPtr; |
| String mKey; |
| int ai; |
| for (int vi = splitPos; vi <= this.memberNum; vi++) { |
| if (vi == pos + 1) { |
| // directly points to the new key, do nothing |
| // offset of mPtr always be corrected below, MIN_VALUE as placeholder |
| mPtr = compoundPointer(pk, Short.MIN_VALUE); |
| mKey = key; |
| } else { |
| // vi for virtual index of the above virtual array, ai for actual index of existed keys |
| ai = vi > pos ? vi - 1 : vi; |
| mPtr = getPointerByIndex(ai); |
| mKey = getKeyByIndex(ai); |
| // this.spareSize and this.memNumber will always be corrected below during compaction, |
| // therefore unnecessary to count here |
| } |
| |
| memberNum++; |
| // mPtr has an invalid offset, needs correction except that stores as first ptr |
| if (vi == splitPos) { |
| // split key will not be migrated |
| searchKey = mKey; |
| ReadWriteIOUtils.write(mPtr, tempPtrBuffer); |
| } else { |
| spareOffset -= mKey.getBytes().length + 4; |
| dstBuffer.position(spareOffset); |
| ReadWriteIOUtils.write(mKey, dstBuffer); |
| ReadWriteIOUtils.write(compoundPointer(pageIndex(mPtr), spareOffset), tempPtrBuffer); |
| } |
| } |
| tempPtrBuffer.flip(); |
| dstBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE); |
| dstBuffer.put(tempPtrBuffer); |
| spareSize = |
| (short) |
| (spareOffset - SchemaFileConfig.PAGE_HEADER_SIZE - COMPOUND_POINT_LENGTH * memberNum); |
| |
| // compact this buffer |
| if (pos < splitPos - 1) { |
| this.memberNum -= memberNum; |
| compactKeys(); |
| // need to be inserted |
| if (insertRecord(key, pk) < 0) { |
| throw new SegmentOverflowException(key); |
| } |
| } else { |
| // one of split segment ptr comes from new key |
| this.memberNum -= memberNum - 1; |
| compactKeys(); |
| } |
| } |
| |
| dstBuffer.clear(); |
| ReadWriteIOUtils.write(SchemaFileConfig.INTERNAL_PAGE, dstBuffer); |
| ReadWriteIOUtils.write(-1, dstBuffer); |
| ReadWriteIOUtils.write(spareOffset, dstBuffer); |
| ReadWriteIOUtils.write(spareSize, dstBuffer); |
| ReadWriteIOUtils.write(memberNum, dstBuffer); |
| ReadWriteIOUtils.write(firstLeaf, dstBuffer); |
| ReadWriteIOUtils.write(subIdx, dstBuffer); |
| |
| this.syncPageBuffer(); |
| |
| penultKey = null; |
| lastKey = null; |
| return searchKey; |
| } |
| |
| @Override |
| public synchronized void syncPageBuffer() { |
| super.syncPageBuffer(); |
| ReadWriteIOUtils.write(firstLeaf, pageBuffer); |
| ReadWriteIOUtils.write(subIndexPage, pageBuffer); |
| } |
| |
| @Override |
| public String inspect() { |
| ByteBuffer bufferR = this.pageBuffer.asReadOnlyBuffer(); |
| StringBuilder builder = |
| new StringBuilder( |
| String.format( |
| "page_id:%d, spare_offset:%d, spare_size:%d%n", pageIndex, spareOffset, spareSize)); |
| builder.append( |
| String.format( |
| "[IndexEntrySegment, total_ptrs:%d, spare_size:%d, sub_index:%d, ", |
| this.memberNum, this.spareSize, this.subIndexPage)); |
| bufferR.clear(); |
| builder.append(String.format("(MIN_POINT, %s),", pageIndex(getPointerByIndex(0)))); |
| |
| for (int i = 1; i < memberNum; i++) { |
| builder.append( |
| String.format( |
| "(%s, %s, %s),", |
| getKeyByIndex(i), keyOffset(getPointerByIndex(i)), pageIndex(getPointerByIndex(i)))); |
| } |
| builder.append("]\n"); |
| return builder.toString(); |
| } |
| |
| @Override |
| public String toString() { |
| return inspect(); |
| } |
| |
| @Override |
| public ISegment<Integer, Integer> getAsInternalPage() { |
| return this; |
| } |
| |
| @Override |
| public ByteBuffer resetBuffer(int ptr) { |
| memberNum = 1; |
| spareSize = |
| (short) |
| (this.pageBuffer.capacity() |
| - SchemaFileConfig.PAGE_HEADER_SIZE |
| - COMPOUND_POINT_LENGTH); |
| spareOffset = (short) this.pageBuffer.capacity(); |
| firstLeaf = ptr; |
| |
| this.pageBuffer.clear(); |
| this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE); |
| ReadWriteIOUtils.write(compoundPointer(ptr, (short) 0), this.pageBuffer); |
| syncPageBuffer(); |
| this.pageBuffer.clear(); |
| |
| return this.pageBuffer.slice(); |
| } |
| |
| // region Compound Pointer Utility |
| |
| private void compactKeys() { |
| ByteBuffer tempBuffer = ByteBuffer.allocate(this.pageBuffer.capacity() - this.spareOffset); |
| tempBuffer.position(tempBuffer.capacity()); |
| this.spareOffset = (short) this.pageBuffer.capacity(); |
| String key; |
| int accSiz = 0; |
| for (int i = 1; i < this.memberNum; i++) { |
| // this.pageBuffer will not be overridden immediately |
| key = getKeyByIndex(i); |
| accSiz += key.getBytes().length + 4; |
| this.spareOffset = (short) (this.pageBuffer.capacity() - accSiz); |
| |
| // for lowest 2 bytes denote key offset, FIXME: '+6' is dependent on encoding of |
| // ReadWriteIOUtils |
| this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + COMPOUND_POINT_LENGTH * i + 6); |
| ReadWriteIOUtils.write(this.spareOffset, this.pageBuffer); |
| |
| // write tempBuffer backward |
| tempBuffer.position(tempBuffer.capacity() - accSiz); |
| ReadWriteIOUtils.write(key, tempBuffer); |
| } |
| tempBuffer.position(tempBuffer.capacity() - accSiz); |
| this.pageBuffer.position(this.spareOffset); |
| this.pageBuffer.put(tempBuffer); |
| this.spareSize = |
| (short) |
| (this.spareOffset |
| - SchemaFileConfig.PAGE_HEADER_SIZE |
| - COMPOUND_POINT_LENGTH * this.memberNum); |
| } |
| |
| /** |
| * Find suitable position to find or insert key. Notice that index ranges from 0 to memberNum-1. |
| * |
| * @param key to be searched or inserted. |
| * @return position where the key is the biggest one smaller or equals to parameter. |
| */ |
| private int getIndexByKey(String key) { |
| // TODO: before leaf node implement cascade delete, |
| // RecordDuplicatedException will only be thrown from leaf node |
| |
| // notice that memberNum always bigger than 2 in a valid Internal Segment |
| if (memberNum == 1 || key.compareTo(getKeyByIndex(1)) < 0) { |
| return 0; |
| } else if (key.compareTo(getKeyByIndex(memberNum - 1)) >= 0) { |
| return memberNum - 1; |
| } |
| |
| int head = 1; |
| int tail = memberNum - 1; |
| int pivot = (head + tail) / 2; |
| |
| // breaking condition: pivot smaller than key, but (pivot+1) bigger than key |
| while (!((key.compareTo(getKeyByIndex(pivot)) >= 0) |
| && (key.compareTo(getKeyByIndex(pivot + 1)) < 0))) { |
| if (key.compareTo(getKeyByIndex(pivot)) < 0) { |
| tail = pivot; |
| } else if (key.compareTo(getKeyByIndex(pivot + 1)) == 0) { |
| return pivot + 1; |
| } else if (key.compareTo(getKeyByIndex(pivot + 1)) > 0) { |
| head = pivot; |
| } |
| |
| // it can be proved that pivot <= n-2 |
| pivot = (head + tail) / 2; |
| } |
| return pivot; |
| } |
| |
| /** |
| * CompoundPointer structure (from high bits to low): |
| * |
| * <ul> |
| * <li>16 bits: reserved |
| * <li>32 bits: page index, which points to target content |
| * <li>16 bits: key offset, which denotes where key is in this page/segment. |
| * </ul> |
| */ |
| private long compoundPointer(int pageIndex, short offset) { |
| return (((SchemaFileConfig.PAGE_INDEX_MASK & pageIndex) |
| << SchemaFileConfig.COMP_POINTER_OFFSET_DIGIT) |
| | (offset & SchemaFileConfig.SEG_INDEX_MASK)); |
| } |
| |
| private int pageIndex(long pointer) { |
| return (int) |
| ((pointer |
| & (SchemaFileConfig.PAGE_INDEX_MASK << SchemaFileConfig.COMP_POINTER_OFFSET_DIGIT)) |
| >> SchemaFileConfig.COMP_POINTER_OFFSET_DIGIT); |
| } |
| |
| private short keyOffset(long pointer) { |
| return (short) (pointer & SchemaFileConfig.COMP_PTR_OFFSET_MASK); |
| } |
| |
| private long getPointerByIndex(int index) { |
| if (index < 0 || index >= memberNum) { |
| // TODO: check whether reasonable to throw an unchecked |
| throw new IndexOutOfBoundsException(); |
| } |
| synchronized (pageBuffer) { |
| this.pageBuffer.limit(this.pageBuffer.capacity()); |
| this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + index * COMPOUND_POINT_LENGTH); |
| return ReadWriteIOUtils.readLong(this.pageBuffer); |
| } |
| } |
| |
| private String getKeyByIndex(int index) { |
| if (index <= 0 || index >= memberNum) { |
| throw new IndexOutOfBoundsException(); |
| } |
| synchronized (pageBuffer) { |
| this.pageBuffer.limit(this.pageBuffer.capacity()); |
| this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + index * COMPOUND_POINT_LENGTH); |
| short ofs = (short) (this.pageBuffer.getLong() & SchemaFileConfig.COMP_PTR_OFFSET_MASK); |
| this.pageBuffer.position(ofs); |
| return ReadWriteIOUtils.readString(this.pageBuffer); |
| } |
| } |
| // endregion |
| |
| } |