blob: 27396480fa9d85dd8aa6363f391b8dc787d01283 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.schemaengine.schemaregion.tag;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.commons.io.FileUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class TagLogFile implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(TagLogFile.class);
private File tagFile;
private FileChannel fileChannel;
private static final String LENGTH_EXCEED_MSG =
"Tag/Attribute exceeds the max length limit. "
+ "Please enlarge tag_attribute_total_size in iotdb-common.properties";
private static final int MAX_LENGTH =
CommonDescriptor.getInstance().getConfig().getTagAttributeTotalSize();
private static final int MAX_ENTRY_NUM =
CommonDescriptor.getInstance().getConfig().getTagAttributeMaxNum();
private static final int MAX_ENTRY_Size =
CommonDescriptor.getInstance().getConfig().getTagAttributeEntryMaxSize();
private static final int RECORD_FLUSH_INTERVAL =
IoTDBDescriptor.getInstance().getConfig().getTagAttributeFlushInterval();
private int unFlushedRecordNum = 0;
public TagLogFile(String schemaDir, String logFileName) throws IOException {
File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
if (!metadataDir.exists()) {
if (metadataDir.mkdirs()) {
logger.info("create schema folder {}.", metadataDir);
} else {
logger.info("create schema folder {} failed.", metadataDir);
}
}
tagFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
this.fileChannel =
FileChannel.open(
tagFile.toPath(),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
// move the current position to the tail of the file
try {
this.fileChannel.position(fileChannel.size());
} catch (ClosedByInterruptException e) {
// ignore
}
}
public synchronized void copyTo(File targetFile) throws IOException {
// flush os buffer
fileChannel.force(true);
FileUtils.copyFile(tagFile, targetFile);
}
/**
* Read tags and attributes from tag file.
*
* @return tags map, attributes map
* @throws IOException error occurred when reading disk
*/
public Pair<Map<String, String>, Map<String, String>> read(long position) throws IOException {
if (position < 0) {
return new Pair<>(Collections.emptyMap(), Collections.emptyMap());
}
ByteBuffer byteBuffer = parseByteBuffer(fileChannel, position);
return new Pair<>(ReadWriteIOUtils.readMap(byteBuffer), ReadWriteIOUtils.readMap(byteBuffer));
}
public Map<String, String> readTag(long position) throws IOException {
ByteBuffer byteBuffer = parseByteBuffer(fileChannel, position);
return ReadWriteIOUtils.readMap(byteBuffer);
}
public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position)
throws IOException {
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
fileChannel.read(byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
byteBuffer.position(0);
if (firstInt < -1) { // This position is blockNum, the original data occupies multiple blocks
int blockNum = -firstInt;
ByteBuffer byteBuffers = ByteBuffer.allocate(blockNum * MAX_LENGTH);
byteBuffers.put(byteBuffer);
byteBuffers.position(4); // Skip blockNum
for (int i = 1; i < blockNum; i++) {
long nextPosition = ReadWriteIOUtils.readLong(byteBuffers);
// read one offset, then use filechannel's read to read it
byteBuffers.position(MAX_LENGTH * i);
byteBuffers.limit(MAX_LENGTH * (i + 1));
fileChannel.read(byteBuffers, nextPosition);
byteBuffers.position(4 + i * Long.BYTES);
}
byteBuffers.limit(byteBuffers.capacity());
return byteBuffers;
}
}
return byteBuffer;
}
private List<Long> parseOffsetList(long position) throws IOException {
List<Long> blockOffset = new ArrayList<>();
blockOffset.add(position);
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
fileChannel.read(byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
byteBuffer.position(0);
if (firstInt < -1) { // This position is blockNum, the original data occupies multiple blocks
int blockNum = -firstInt;
int blockOffsetStoreLen =
(((blockNum - 1) * Long.BYTES + 4) / MAX_LENGTH + 1)
* MAX_LENGTH; // blockOffset storage length
ByteBuffer blockBuffer = ByteBuffer.allocate(blockOffsetStoreLen);
blockBuffer.put(byteBuffer);
blockBuffer.position(4); // Skip blockNum
for (int i = 1; i < blockNum; i++) {
blockOffset.add(ReadWriteIOUtils.readLong(blockBuffer));
// Every time you read an offset, use filechannel's read to read it
if (MAX_LENGTH * (i + 1)
<= blockOffsetStoreLen) { // Compared with directly reading bytebuffer, some reading
// operations are reduced, only the content of offset is
// read
blockBuffer.position(MAX_LENGTH * i);
blockBuffer.limit(MAX_LENGTH * (i + 1));
fileChannel.read(blockBuffer, blockOffset.get(i));
blockBuffer.position(4 + i * Long.BYTES);
}
}
}
}
return blockOffset;
}
public long write(Map<String, String> tagMap, Map<String, String> attributeMap)
throws IOException, MetadataException {
ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap);
return write(byteBuffer, -1);
}
/**
* This method does not modify this file's current position.
*
* @throws IOException IOException
* @throws MetadataException metadata exception
*/
public void write(Map<String, String> tagMap, Map<String, String> attributeMap, long position)
throws IOException, MetadataException {
ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap);
write(byteBuffer, position);
}
/**
* @param byteBuffer the data of record to be persisted
* @param position the target position to store the record in tagFile
* @return beginning position of the record in tagFile
*/
private synchronized long write(ByteBuffer byteBuffer, long position) throws IOException {
// Correct the initial offset of the write
if (position < 0) {
// append the record to file tail
position = fileChannel.size();
}
// Read the original data to get the original space offset
List<Long> blockOffset = parseOffsetList(position);
// write read data
int blockNumReal = byteBuffer.capacity() / MAX_LENGTH;
if (blockNumReal < 1) {
throw new RuntimeException(
"ByteBuffer capacity is smaller than tagAttributeTotalSize, which is not allowed.");
}
if (blockNumReal == 1 && blockOffset.size() == 1) {
// If the original data occupies only one block and the new data occupies only one block, the
// original space is used
fileChannel.write(byteBuffer, blockOffset.get(0));
} else {
// if the original space is larger than the new space, the original space is used
if (blockOffset.size() >= blockNumReal) {
ByteBuffer byteBufferFinal = ByteBuffer.allocate(blockOffset.size() * MAX_LENGTH);
byteBufferFinal.putInt(-blockOffset.size());
for (int i = 1; i < blockOffset.size(); i++) {
byteBufferFinal.putLong(blockOffset.get(i));
}
if (blockNumReal > 1) { // real data occupies multiple blocks
byteBuffer.position((blockNumReal - 1) * Long.BYTES + 4);
} else { // real data occupies only one block
byteBuffer.position(0);
}
byteBufferFinal.put(byteBuffer);
for (int i = 0; i < blockOffset.size(); i++) {
byteBufferFinal.position(i * MAX_LENGTH);
byteBufferFinal.limit((i + 1) * MAX_LENGTH);
fileChannel.write(byteBufferFinal, blockOffset.get(i));
}
} else {
// if the original space is smaller than the new space, the new space is used
// prepare the bytebuffer to store the offset of each block
int blockOffsetStoreLen = (blockNumReal - 1) * Long.BYTES + 4;
ByteBuffer byteBufferOffset = ByteBuffer.allocate(blockOffsetStoreLen);
byteBufferOffset.putInt(-blockNumReal);
// write the data to the file
for (int i = 0; i < blockNumReal; i++) {
byteBuffer.position(i * MAX_LENGTH);
byteBuffer.limit((i + 1) * MAX_LENGTH);
// what we want to write is the original space
if (i < blockOffset.size()) {
if (i > 0) { // first block has been written
byteBufferOffset.putLong(blockOffset.get(i));
}
fileChannel.write(byteBuffer, blockOffset.get(i));
} else {
// what we want to write is the new space
// TODO
byteBufferOffset.putLong(fileChannel.size());
blockOffset.add(fileChannel.size());
fileChannel.write(byteBuffer, fileChannel.size());
}
}
// write the offset of each block to the file
byteBufferOffset.flip();
for (int i = 0; i < blockOffset.size(); i++) {
byteBufferOffset.position(i * MAX_LENGTH);
if ((i + 1) * MAX_LENGTH > byteBufferOffset.capacity()) {
byteBufferOffset.limit(byteBufferOffset.capacity());
fileChannel.write(byteBufferOffset, blockOffset.get(i));
break;
} else {
byteBufferOffset.limit((i + 1) * MAX_LENGTH);
fileChannel.write(byteBufferOffset, blockOffset.get(i));
}
}
}
}
unFlushedRecordNum++;
if (unFlushedRecordNum >= RECORD_FLUSH_INTERVAL) {
fileChannel.force(true);
unFlushedRecordNum = 0;
}
return position;
}
private ByteBuffer convertMapToByteBuffer(
Map<String, String> tagMap, Map<String, String> attributeMap) throws MetadataException {
int totalMapSize = calculateMapSize(tagMap) + calculateMapSize(attributeMap);
ByteBuffer byteBuffer;
if (totalMapSize <= MAX_LENGTH) {
byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
} else {
// get the minSolution from Num*MAX_LENGTH < TotalMapSize + 4 + Long.BYTES*Num <=
// MAX_LENGTH*(Num + 1)
double NumMinLimit = (totalMapSize + 4 - MAX_LENGTH) / (double) (MAX_LENGTH - Long.BYTES);
// blockNum = num + 1
int blockNum = (int) Math.ceil(NumMinLimit) + 1;
byteBuffer = ByteBuffer.allocate(blockNum * MAX_LENGTH);
// 4 bytes for blockNumSize, blockNum*Long.BYTES for blockOffset
byteBuffer.position(4 + (blockNum - 1) * Long.BYTES);
}
serializeMap(tagMap, byteBuffer);
serializeMap(attributeMap, byteBuffer);
// set position to 0 and the content in this buffer could be read
byteBuffer.position(0);
return byteBuffer;
}
public static int calculateMapSize(Map<String, String> map) throws MetadataException {
int length = 0;
if (map != null) {
if (map.size() > MAX_ENTRY_NUM) {
throw new MetadataException("the emtry num of the map is over the tagAttributeMaxNum");
}
length += 4; // mapSize is 4 byte
// while mapSize is 0, this for loop will not be executed
for (Map.Entry<String, String> entry : map.entrySet()) {
int entryLength = 0;
String key = entry.getKey();
String value = entry.getValue();
entryLength += 4; // keySize is 4 byte
if (key != null) {
entryLength +=
key.getBytes()
.length; // only key is not null then add key length, while key is null, only
// store the keySize marker which is -1 (4 bytes)
}
entryLength += 4; // valueSize is 4 byte
if (value != null) {
entryLength +=
value.getBytes()
.length; // only value is not null then add value length, while value is null,
// only store the valueSize marker which is -1 (4 bytes)
}
if (entryLength > MAX_ENTRY_Size) {
throw new MetadataException(
"An emtry of the map has a size which is over the tagAttributeMaxSize");
}
length += entryLength;
}
} else {
length += 4; // while map is null, the mapSize is writed to -1 which is 4 byte
}
return length;
}
private void serializeMap(Map<String, String> map, ByteBuffer byteBuffer)
throws MetadataException {
try {
if (map == null) {
ReadWriteIOUtils.write(0, byteBuffer);
} else {
ReadWriteIOUtils.write(map, byteBuffer);
}
} catch (BufferOverflowException e) {
throw new MetadataException(LENGTH_EXCEED_MSG);
}
}
@Override
public void close() throws IOException {
fileChannel.force(true);
fileChannel.close();
fileChannel = null;
}
}