blob: e2731f4af79945285b231b871a80a4c064e55292 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata.logfile;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.Map;
public class TagLogFile implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(TagLogFile.class);
private FileChannel fileChannel;
private static final String LENGTH_EXCEED_MSG =
"Tag/Attribute exceeds the max length limit. "
+ "Please enlarge tag_attribute_total_size in iotdb-engine.properties";
private static final int MAX_LENGTH =
IoTDBDescriptor.getInstance().getConfig().getTagAttributeTotalSize();
private static final byte FILL_BYTE = 0;
public TagLogFile(String schemaDir, String logFileName) throws IOException {
File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir);
if (!metadataDir.exists()) {
if (metadataDir.mkdirs()) {
logger.info("create schema folder {}.", metadataDir);
} else {
logger.info("create schema folder {} failed.", metadataDir);
}
}
File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
this.fileChannel =
FileChannel.open(
logFile.toPath(),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.DSYNC);
// move the current position to the tail of the file
this.fileChannel.position(fileChannel.size());
}
/** @return tags map, attributes map */
public Pair<Map<String, String>, Map<String, String>> read(int size, long position)
throws IOException {
if (position < 0) {
return new Pair<>(Collections.emptyMap(), Collections.emptyMap());
}
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
fileChannel.read(byteBuffer, position);
byteBuffer.flip();
return new Pair<>(ReadWriteIOUtils.readMap(byteBuffer), ReadWriteIOUtils.readMap(byteBuffer));
}
public Map<String, String> readTag(int size, long position) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
fileChannel.read(byteBuffer, position);
byteBuffer.flip();
return ReadWriteIOUtils.readMap(byteBuffer);
}
public long write(Map<String, String> tagMap, Map<String, String> attributeMap)
throws IOException, MetadataException {
long offset = fileChannel.position();
ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap);
fileChannel.write(byteBuffer);
return offset;
}
/** This method does not modify this file's current position. */
public void write(Map<String, String> tagMap, Map<String, String> attributeMap, long position)
throws IOException, MetadataException {
ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap);
fileChannel.write(byteBuffer, position);
}
private ByteBuffer convertMapToByteBuffer(
Map<String, String> tagMap, Map<String, String> attributeMap) throws MetadataException {
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
int length = serializeMap(tagMap, byteBuffer, 0);
length = serializeMap(attributeMap, byteBuffer, length);
// fill the remaining space
for (int i = length + 1; i <= MAX_LENGTH; i++) {
byteBuffer.put(FILL_BYTE);
}
// persist to the disk
byteBuffer.flip();
return byteBuffer;
}
private int serializeMap(Map<String, String> map, ByteBuffer byteBuffer, int length)
throws MetadataException {
if (map == null) {
length += Integer.BYTES;
if (length > MAX_LENGTH) {
throw new MetadataException(LENGTH_EXCEED_MSG);
}
ReadWriteIOUtils.write(0, byteBuffer);
return length;
}
length += Integer.BYTES;
if (length > MAX_LENGTH) {
throw new MetadataException(LENGTH_EXCEED_MSG);
}
ReadWriteIOUtils.write(map.size(), byteBuffer);
byte[] bytes;
for (Map.Entry<String, String> entry : map.entrySet()) {
// serialize key
bytes = entry.getKey().getBytes();
length += (4 + bytes.length);
if (length > MAX_LENGTH) {
throw new MetadataException(LENGTH_EXCEED_MSG);
}
ReadWriteIOUtils.write(bytes.length, byteBuffer);
byteBuffer.put(bytes);
// serialize value
bytes = entry.getValue().getBytes();
length += (4 + bytes.length);
if (length > MAX_LENGTH) {
throw new MetadataException(LENGTH_EXCEED_MSG);
}
ReadWriteIOUtils.write(bytes.length, byteBuffer);
byteBuffer.put(bytes);
}
return length;
}
@Override
public void close() throws IOException {
fileChannel.force(true);
fileChannel.close();
fileChannel = null;
}
}