| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.schemaengine.schemaregion.tag; |
| |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.file.SystemFileFactory; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.tsfile.utils.Pair; |
| import org.apache.tsfile.utils.ReadWriteIOUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.BufferOverflowException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ClosedByInterruptException; |
| import java.nio.channels.FileChannel; |
| import java.nio.file.StandardOpenOption; |
| import java.util.Collections; |
| import java.util.Map; |
| |
| public class TagLogFile implements AutoCloseable { |
| |
| private static final Logger logger = LoggerFactory.getLogger(TagLogFile.class); |
| private File tagFile; |
| private FileChannel fileChannel; |
| private static final String LENGTH_EXCEED_MSG = |
| "Tag/Attribute exceeds the max length limit. " |
| + "Please enlarge tag_attribute_total_size in iotdb-common.properties"; |
| |
| private static final int MAX_LENGTH = |
| CommonDescriptor.getInstance().getConfig().getTagAttributeTotalSize(); |
| |
| private static final int RECORD_FLUSH_INTERVAL = |
| IoTDBDescriptor.getInstance().getConfig().getTagAttributeFlushInterval(); |
| private int unFlushedRecordNum = 0; |
| |
| public TagLogFile(String schemaDir, String logFileName) throws IOException { |
| |
| File metadataDir = SystemFileFactory.INSTANCE.getFile(schemaDir); |
| if (!metadataDir.exists()) { |
| if (metadataDir.mkdirs()) { |
| logger.info("create schema folder {}.", metadataDir); |
| } else { |
| logger.info("create schema folder {} failed.", metadataDir); |
| } |
| } |
| |
| tagFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName); |
| |
| this.fileChannel = |
| FileChannel.open( |
| tagFile.toPath(), |
| StandardOpenOption.READ, |
| StandardOpenOption.WRITE, |
| StandardOpenOption.CREATE); |
| // move the current position to the tail of the file |
| try { |
| this.fileChannel.position(fileChannel.size()); |
| } catch (ClosedByInterruptException e) { |
| // ignore |
| } |
| } |
| |
| public synchronized void copyTo(File targetFile) throws IOException { |
| // flush os buffer |
| fileChannel.force(true); |
| FileUtils.copyFile(tagFile, targetFile); |
| } |
| |
| /** |
| * Read tags and attributes from tag file. |
| * |
| * @return tags map, attributes map |
| * @throws IOException error occurred when reading disk |
| */ |
| public Pair<Map<String, String>, Map<String, String>> read(int size, long position) |
| throws IOException { |
| if (position < 0) { |
| return new Pair<>(Collections.emptyMap(), Collections.emptyMap()); |
| } |
| ByteBuffer byteBuffer = ByteBuffer.allocate(size); |
| fileChannel.read(byteBuffer, position); |
| byteBuffer.flip(); |
| return new Pair<>(ReadWriteIOUtils.readMap(byteBuffer), ReadWriteIOUtils.readMap(byteBuffer)); |
| } |
| |
| public Map<String, String> readTag(int size, long position) throws IOException { |
| ByteBuffer byteBuffer = ByteBuffer.allocate(size); |
| fileChannel.read(byteBuffer, position); |
| byteBuffer.flip(); |
| return ReadWriteIOUtils.readMap(byteBuffer); |
| } |
| |
| public long write(Map<String, String> tagMap, Map<String, String> attributeMap) |
| throws IOException, MetadataException { |
| ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap); |
| return write(byteBuffer, -1); |
| } |
| |
| /** |
| * This method does not modify this file's current position. |
| * |
| * @throws IOException IOException |
| * @throws MetadataException metadata exception |
| */ |
| public void write(Map<String, String> tagMap, Map<String, String> attributeMap, long position) |
| throws IOException, MetadataException { |
| ByteBuffer byteBuffer = convertMapToByteBuffer(tagMap, attributeMap); |
| write(byteBuffer, position); |
| } |
| |
| /** |
| * @param byteBuffer the data of record to be persisted |
| * @param position the target position to store the record in tagFile |
| * @return beginning position of the record in tagFile |
| */ |
| private synchronized long write(ByteBuffer byteBuffer, long position) throws IOException { |
| if (position < 0) { |
| // append the record to file tail |
| position = fileChannel.size(); |
| } |
| fileChannel.write(byteBuffer, position); |
| unFlushedRecordNum++; |
| if (unFlushedRecordNum >= RECORD_FLUSH_INTERVAL) { |
| fileChannel.force(true); |
| unFlushedRecordNum = 0; |
| } |
| return position; |
| } |
| |
| private ByteBuffer convertMapToByteBuffer( |
| Map<String, String> tagMap, Map<String, String> attributeMap) throws MetadataException { |
| ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH); |
| serializeMap(tagMap, byteBuffer); |
| serializeMap(attributeMap, byteBuffer); |
| |
| // set position to 0 and the content in this buffer could be read |
| byteBuffer.position(0); |
| return byteBuffer; |
| } |
| |
| private void serializeMap(Map<String, String> map, ByteBuffer byteBuffer) |
| throws MetadataException { |
| try { |
| if (map == null) { |
| ReadWriteIOUtils.write(0, byteBuffer); |
| } else { |
| ReadWriteIOUtils.write(map, byteBuffer); |
| } |
| } catch (BufferOverflowException e) { |
| throw new MetadataException(LENGTH_EXCEED_MSG); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| fileChannel.force(true); |
| fileChannel.close(); |
| fileChannel = null; |
| } |
| } |