blob: 0b2b9ace2018c8af69a66ec530c54d07a4776877 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.metadata.schemafile.RecordDuplicatedException;
import org.apache.iotdb.db.exception.metadata.schemafile.SegmentOverflowException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* This page stores measurement alias as key and name as record, aiming to provide a mapping from
* alias to name so that secondary index could be built upon efficient structure.
*
* <p>Since it is not necessary to shift size of it, extending a {@linkplain SchemaPage} might be
* more reasonable for this structure.
*
* <p>TODO: Another abstract class for this and InternalPage is expected
*/
public class AliasIndexPage extends SchemaPage implements ISegment<String, String> {
private static final int OFFSET_LEN = 2;
long nextPage;
String penultKey = null;
String lastKey = null;
/**
* <b>Page Header Structure: (19 bytes used, 13 bytes reserved)</b> As in {@linkplain
* ISchemaPage}.
*
* <p>Page Body Structure:
*
* <ul>
* <li>2 bytes * memberNum: offset of (alias, name) pair, to locate string content
* <li>... spare space...
* <li>var length(4 byte, var length, 4 bytes, var length): (alias, name) pairs, ordered by
* alias
* </ul>
*/
protected AliasIndexPage(ByteBuffer pageBuffer) {
super(pageBuffer);
nextPage = ReadWriteIOUtils.readLong(this.pageBuffer);
}
// region Interface Implementation
@Override
public int insertRecord(String alias, String name) throws RecordDuplicatedException {
if (spareSize < OFFSET_LEN + 8 + alias.getBytes().length + name.getBytes().length) {
return -1;
}
// check whether key already exists
int pos = getIndexByKey(alias);
if (memberNum > 0 && pos < memberNum && getKeyByIndex(pos).equals(alias)) {
// not insert duplicated alias
return spareSize;
}
if (SchemaFileConfig.PAGE_HEADER_SIZE
+ OFFSET_LEN * (memberNum + 1)
+ 8
+ alias.getBytes().length
+ name.getBytes().length
> spareOffset) {
compactKeys();
}
// append key
this.pageBuffer.clear();
this.spareOffset =
(short) (this.spareOffset - alias.getBytes().length - name.getBytes().length - 8);
this.pageBuffer.position(spareOffset);
ReadWriteIOUtils.write(alias, this.pageBuffer);
ReadWriteIOUtils.write(name, this.pageBuffer);
int migNum = memberNum - pos;
if (migNum > 0) {
// move compound pointers
ByteBuffer buf = ByteBuffer.allocate(migNum * OFFSET_LEN);
this.pageBuffer.limit(SchemaFileConfig.PAGE_HEADER_SIZE + OFFSET_LEN * memberNum);
this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + OFFSET_LEN * pos);
buf.put(this.pageBuffer);
this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + OFFSET_LEN * pos);
ReadWriteIOUtils.write(spareOffset, this.pageBuffer);
buf.clear();
this.pageBuffer.limit(this.pageBuffer.limit() + OFFSET_LEN);
this.pageBuffer.put(buf);
} else {
// append compound pointer
this.pageBuffer.limit(this.pageBuffer.capacity());
this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + memberNum * OFFSET_LEN);
ReadWriteIOUtils.write(spareOffset, this.pageBuffer);
}
spareSize -= (alias.getBytes().length + name.getBytes().length + 8 + OFFSET_LEN);
memberNum++;
penultKey = lastKey;
lastKey = alias;
return spareSize;
}
@Override
public int updateRecord(String key, String buffer) throws MetadataException {
return -1;
}
@Override
public int removeRecord(String key) {
if (memberNum <= 0) {
return spareSize;
}
int pos = getIndexByKey(key);
if (!key.equals(getKeyByIndex(pos))) {
return spareSize;
}
int migNum = memberNum - pos - 1;
if (migNum > 0) {
ByteBuffer temBuf = ByteBuffer.allocate(migNum * OFFSET_LEN);
this.pageBuffer.clear().limit(SchemaFileConfig.PAGE_HEADER_SIZE + OFFSET_LEN * memberNum);
this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + pos * OFFSET_LEN + OFFSET_LEN);
temBuf.put(this.pageBuffer).clear();
this.pageBuffer.clear().position(SchemaFileConfig.PAGE_HEADER_SIZE + pos * OFFSET_LEN);
this.pageBuffer.put(temBuf);
}
memberNum--;
spareSize += OFFSET_LEN + 8 + key.getBytes().length + getNameByIndex(pos).getBytes().length;
return spareSize;
}
@Override
public String getRecordByKey(String key) throws MetadataException {
int pos = getIndexByKey(key);
if (memberNum <= 0 || pos > memberNum) {
return null;
}
// to tell whether pos is position to insert
if (pos == 0 || pos == memberNum) {
pos = pos == memberNum ? pos - 1 : pos;
}
return key.equals(getKeyByIndex(pos)) ? getNameByIndex(pos) : null;
}
@Override
public String getRecordByAlias(String alias) throws MetadataException {
return getRecordByKey(alias);
}
@Override
public boolean hasRecordKey(String key) {
int idx = getIndexByKey(key);
return memberNum > 0 && idx < memberNum && key.equals(getKeyByIndex(idx));
}
@Override
public boolean hasRecordAlias(String alias) {
return hasRecordKey(alias);
}
@Override
public Queue<String> getAllRecords() throws MetadataException {
Queue<String> res = new ArrayDeque<>(memberNum);
for (int i = 0; i < memberNum; i++) {
res.add(getNameByIndex(i));
}
return res;
}
@Override
public void syncPageBuffer() {
super.syncPageBuffer();
ReadWriteIOUtils.write(this.nextPage, this.pageBuffer);
}
@Override
public void syncBuffer() {
super.syncPageBuffer();
ReadWriteIOUtils.write(this.nextPage, this.pageBuffer);
}
@Override
public short size() {
return (short) pageBuffer.capacity();
}
@Override
public short getSpareSize() {
return spareSize;
}
@Override
public void delete() {
// Do nothing
}
@Override
public long getNextSegAddress() {
return nextPage;
}
@Override
public void setNextSegAddress(long nextSegAddress) {
nextPage = nextSegAddress;
}
@Override
public void extendsTo(ByteBuffer newBuffer) throws MetadataException {
if (newBuffer.capacity() != this.pageBuffer.capacity()) {
throw new MetadataException("AliasIndexPage can only extend to buffer with same capacity.");
}
syncPageBuffer();
this.pageBuffer.clear();
newBuffer.clear();
newBuffer.put(this.pageBuffer);
}
@Override
public String splitByKey(String key, String entry, ByteBuffer dstBuffer, boolean inclineSplit)
throws MetadataException {
if (this.pageBuffer.capacity() != dstBuffer.capacity()) {
throw new MetadataException("Segments only splits with same capacity.");
}
if (memberNum == 0) {
throw new MetadataException("Segment can not be split with no records.");
}
if (key == null && memberNum <= 1) {
throw new MetadataException("Segment can not be split with only one record.");
}
// notice that key can be null here
boolean monotonic =
penultKey != null
&& key != null
&& lastKey != null
&& inclineSplit
&& (key.compareTo(lastKey)) * (lastKey.compareTo(penultKey)) > 0;
int n = memberNum;
// actual index of key just smaller than the insert, -2 for null key
int pos = key != null ? getIndexByKey(key) : -2;
int sp; // virtual index to split
if (monotonic) {
// New entry into part with more space
sp = key.compareTo(lastKey) > 0 ? Math.max(pos, n >> 1) : Math.min(pos, n >> 1);
} else {
sp = n >> 1;
}
// Little different from InternalSegment, only the front edge key can not split
sp = sp <= 0 ? 1 : sp;
// This method BREAKS envelop of the passing in buffer to be more efficient
// attributes for dstBuffer
short memberNum = 0;
short spareOffset = (short) dstBuffer.capacity();
short spareSize = (short) (spareOffset - SchemaFileConfig.PAGE_HEADER_SIZE);
dstBuffer.clear();
short offset;
int recSize;
String mKey, sKey = null;
int aix; // aix for actual index on keyAddressList
n = key == null ? n - 1 : n; // null key
// TODO: implement bulk split further
for (int ix = sp; ix <= n; ix++) {
if (ix == pos) {
// Migrate newly insert
mKey = key;
recSize = 8 + entry.getBytes().length + mKey.getBytes().length;
spareOffset -= recSize;
dstBuffer.position(spareOffset);
ReadWriteIOUtils.write(mKey, dstBuffer);
ReadWriteIOUtils.write(entry, dstBuffer);
} else {
// Pos equals -2 if key is null
aix = (ix > pos) && (pos != -2) ? ix - 1 : ix;
mKey = getKeyByIndex(aix);
offset = getOffsetByIndex(aix);
pageBuffer.clear().position(offset);
recSize = ReadWriteIOUtils.readInt(pageBuffer);
pageBuffer.position(pageBuffer.position() + recSize);
recSize += ReadWriteIOUtils.readInt(pageBuffer) + 8;
pageBuffer.position(offset);
pageBuffer.limit(offset + recSize);
spareOffset -= recSize;
dstBuffer.position(spareOffset);
dstBuffer.put(this.pageBuffer);
}
dstBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + memberNum * OFFSET_LEN);
ReadWriteIOUtils.write(spareOffset, dstBuffer);
if (ix == sp) {
// Search key is the first key in split segment
sKey = mKey;
}
memberNum++;
spareSize -= recSize + 2;
}
this.memberNum -= memberNum;
if (key != null && sp <= pos) {
// extra key has inserted into split buffer
this.memberNum++;
}
// compact and update status
compactKeys();
if (key != null && sp > pos) {
// new insert shall be in this
if (insertRecord(key, entry) < 0) {
throw new SegmentOverflowException(key);
}
}
// flush dstBuffer header
dstBuffer.clear();
ReadWriteIOUtils.write(SchemaFileConfig.ALIAS_PAGE, dstBuffer);
ReadWriteIOUtils.write(-1, dstBuffer);
ReadWriteIOUtils.write(spareOffset, dstBuffer);
ReadWriteIOUtils.write(spareSize, dstBuffer);
ReadWriteIOUtils.write(memberNum, dstBuffer);
ReadWriteIOUtils.write(nextPage, dstBuffer);
penultKey = null;
lastKey = null;
return sKey;
}
@Override
public ByteBuffer resetBuffer(int ptr) {
return null;
}
@Override
public String inspect() {
syncPageBuffer();
ByteBuffer bufferR = this.pageBuffer.asReadOnlyBuffer();
StringBuilder builder =
new StringBuilder(
String.format(
"page_id:%d, total_seg:%d, spare_before:%d%n", pageIndex, 1, spareOffset));
builder.append(
String.format(
"[AliasIndexSegment, total_alias: %d, spare_size:%d,", this.memberNum, this.spareSize));
bufferR.clear();
for (int i = 0; i < memberNum; i++) {
builder.append(String.format("(%s, %s),", getKeyByIndex(i), getNameByIndex(i)));
}
builder.append("]\n");
return builder.toString();
}
@Override
public String toString() {
return inspect();
}
@Override
public ISegment<String, String> getAsAliasIndexPage() {
return this;
}
// Endregion
private void compactKeys() {
ByteBuffer tempBuffer = ByteBuffer.allocate(this.pageBuffer.capacity() - this.spareOffset);
tempBuffer.position(tempBuffer.capacity());
this.spareOffset = (short) this.pageBuffer.capacity();
String key;
String name;
int accSiz = 0;
for (int i = 0; i < this.memberNum; i++) {
// this.pageBuffer will not be overridden immediately
key = getKeyByIndex(i);
name = getNameByIndex(i);
accSiz += key.getBytes().length + name.getBytes().length + 8;
this.spareOffset = (short) (this.pageBuffer.capacity() - accSiz);
this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + OFFSET_LEN * i);
ReadWriteIOUtils.write(this.spareOffset, this.pageBuffer);
// write tempBuffer backward
tempBuffer.position(tempBuffer.capacity() - accSiz);
ReadWriteIOUtils.write(key, tempBuffer);
ReadWriteIOUtils.write(name, tempBuffer);
}
tempBuffer.position(tempBuffer.capacity() - accSiz);
this.pageBuffer.position(this.spareOffset);
this.pageBuffer.put(tempBuffer);
this.spareSize =
(short)
(this.spareOffset - SchemaFileConfig.PAGE_HEADER_SIZE - OFFSET_LEN * this.memberNum);
}
/**
* Binary search implementation.
*
* @param key to be searched or inserted.
* @return the position of the smallest key larger than or equal to the passing in.
*/
private int getIndexByKey(String key) {
if (memberNum == 0 || key.compareTo(getKeyByIndex(0)) <= 0) {
return 0;
} else if (key.compareTo(getKeyByIndex(memberNum - 1)) >= 0) {
return memberNum;
}
int head = 0;
int tail = memberNum - 1;
int pivot = 0;
String pk;
while (head != tail) {
pivot = (head + tail) / 2;
pk = getKeyByIndex(pivot);
if (key.compareTo(pk) == 0) {
return pivot;
}
if (key.compareTo(pk) > 0) {
head = pivot;
} else {
tail = pivot;
}
if (head == tail - 1) {
// notice that getKey(tail) always greater than key
return key.compareTo(getKeyByIndex(head)) <= 0 ? head : tail;
}
}
return pivot;
}
private short getOffsetByIndex(int index) {
if (index < 0 || index >= memberNum) {
// TODO: check whether reasonable to throw an unchecked
throw new IndexOutOfBoundsException();
}
synchronized (pageBuffer) {
this.pageBuffer.limit(this.pageBuffer.capacity());
this.pageBuffer.position(SchemaFileConfig.PAGE_HEADER_SIZE + index * OFFSET_LEN);
return ReadWriteIOUtils.readShort(this.pageBuffer);
}
}
private String getKeyByOffset(short offset) {
synchronized (this.pageBuffer) {
this.pageBuffer.limit(this.pageBuffer.capacity());
this.pageBuffer.position(offset);
return ReadWriteIOUtils.readString(this.pageBuffer);
}
}
private String getNameByOffset(short offset) {
synchronized (this.pageBuffer) {
this.pageBuffer.limit(this.pageBuffer.capacity());
this.pageBuffer.position(offset);
this.pageBuffer.position(offset + 4 + ReadWriteIOUtils.readInt(this.pageBuffer));
return ReadWriteIOUtils.readString(this.pageBuffer);
}
}
private String getKeyByIndex(int index) {
if (index < 0 || index > memberNum) {
throw new IndexOutOfBoundsException();
}
return getKeyByOffset(getOffsetByIndex(index));
}
private String getNameByIndex(int index) {
if (index < 0 || index >= memberNum) {
throw new IndexOutOfBoundsException();
}
return getNameByOffset(getOffsetByIndex(index));
}
}