blob: babad024a26edf68ea36154a0635309af197d534 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.schemafile.SchemaFileNotExists;
import org.apache.iotdb.db.schemaengine.metric.SchemaRegionCachedMetric;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.ICachedMNode;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.mnode.container.ICachedMNodeContainer;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.BTreePageManager;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.IPageManager;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr.PageManager;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.loader.MNodeFactoryLoader;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Iterator;
/**
* This class is mainly aimed to manage space all over the file.
*
* <p>This class is meant to open a .pst(Persistent mTree) file, and maintains the header of the
* file. It Loads or writes a page length bytes at once, with an 32 bits int to index a page inside
* a file. Use SlottedFile to manipulate segment(sp) inside a page(an array of bytes).
*/
public class SchemaFile implements ISchemaFile {
private static final Logger logger = LoggerFactory.getLogger(SchemaFile.class);
// attributes for this pbtree file
private final String filePath;
private final String logPath;
private String storageGroupName;
private long dataTTL;
private boolean isEntity;
private int sgNodeTemplateIdWithState;
private ByteBuffer headerContent;
private int lastPageIndex; // last page index of the file, boundary to grow
private long lastSGAddr; // last segment of database node
private IPageManager pageManager;
// attributes for file
private File pmtFile;
private FileChannel channel;
private final IMNodeFactory<ICachedMNode> nodeFactory =
MNodeFactoryLoader.getInstance().getCachedMNodeIMNodeFactory();
private static String getDirPath(String sgName, int schemaRegionId) {
return SchemaFileConfig.SCHEMA_FOLDER
+ File.separator
+ sgName
+ File.separator
+ schemaRegionId;
}
// region Constructors
// todo refactor constructor for schema file in Jan.
private SchemaFile(
String sgName, int schemaRegionId, boolean override, long ttl, boolean isEntity)
throws IOException, MetadataException {
String dirPath = getDirPath(sgName, schemaRegionId);
this.storageGroupName = sgName;
this.filePath = dirPath + File.separator + SchemaConstant.PBTREE_FILE_NAME;
this.logPath = dirPath + File.separator + SchemaConstant.PBTREE_LOG_FILE_NAME;
pmtFile = SystemFileFactory.INSTANCE.getFile(filePath);
if (!pmtFile.exists() && !override) {
throw new SchemaFileNotExists(filePath);
}
if (pmtFile.exists() && override) {
logger.warn("PBTree File [{}] will be overwritten since already exists.", filePath);
Files.delete(Paths.get(pmtFile.toURI()));
pmtFile.createNewFile();
}
if (!pmtFile.exists() || !pmtFile.isFile()) {
File dir = SystemFileFactory.INSTANCE.getFile(dirPath);
dir.mkdirs();
pmtFile.createNewFile();
}
this.channel = new RandomAccessFile(pmtFile, "rw").getChannel();
this.headerContent = ByteBuffer.allocate(SchemaFileConfig.FILE_HEADER_SIZE);
// will be overwritten if to init
this.dataTTL = ttl;
this.isEntity = isEntity;
this.sgNodeTemplateIdWithState = -1;
initFileHeader();
}
private SchemaFile(File file) throws IOException, MetadataException {
// only used to sketch a pbtree file so a file object is necessary while
// components of log manipulations are not.
pmtFile = file;
filePath = pmtFile.getPath();
logPath = file.getParent() + File.separator + SchemaConstant.PBTREE_LOG_FILE_NAME;
channel = new RandomAccessFile(file, "rw").getChannel();
headerContent = ByteBuffer.allocate(SchemaFileConfig.FILE_HEADER_SIZE);
if (channel.size() <= 0) {
channel.close();
throw new SchemaFileNotExists(file.getAbsolutePath());
}
initFileHeader();
}
// load or init
public static SchemaFile initSchemaFile(String sgName, int schemaRegionId)
throws IOException, MetadataException {
File pmtFile =
SystemFileFactory.INSTANCE.getFile(
getDirPath(sgName, schemaRegionId) + File.separator + SchemaConstant.PBTREE_FILE_NAME);
return new SchemaFile(
sgName,
schemaRegionId,
!pmtFile.exists()
|| IoTDBDescriptor.getInstance()
.getConfig()
.getSchemaRegionConsensusProtocolClass()
.equals(ConsensusFactory.RATIS_CONSENSUS),
CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
false);
}
public static SchemaFile loadSchemaFile(String sgName, int schemaRegionId)
throws IOException, MetadataException {
return new SchemaFile(sgName, schemaRegionId, false, -1L, false);
}
public static SchemaFile loadSchemaFile(File file) throws IOException, MetadataException {
// only be called to sketch a PBTree File
return new SchemaFile(file);
}
// endregion
// region Interface Implementation
@Override
public ICachedMNode init() throws MetadataException {
ICachedMNode resNode;
String[] sgPathNodes =
storageGroupName == null
? new String[] {"noName"}
: PathUtils.splitPathToDetachedNodes(storageGroupName);
if (isEntity) {
resNode =
setNodeAddress(
nodeFactory.createDatabaseDeviceMNode(
null, sgPathNodes[sgPathNodes.length - 1], dataTTL),
0L);
resNode.getAsDeviceMNode().setSchemaTemplateId(sgNodeTemplateIdWithState);
resNode.getAsDeviceMNode().setUseTemplate(sgNodeTemplateIdWithState > -1);
} else {
resNode =
setNodeAddress(
nodeFactory
.createDatabaseMNode(null, sgPathNodes[sgPathNodes.length - 1], dataTTL)
.getAsMNode(),
0L);
}
resNode.setFullPath(storageGroupName);
return resNode;
}
@Override
public boolean updateDatabaseNode(IDatabaseMNode<ICachedMNode> sgNode) throws IOException {
this.dataTTL = sgNode.getDataTTL();
this.isEntity = sgNode.isDevice();
if (sgNode.isDevice()) {
this.sgNodeTemplateIdWithState = sgNode.getAsDeviceMNode().getSchemaTemplateIdWithState();
}
updateHeaderBuffer();
return true;
}
@Override
public void delete(ICachedMNode node) throws IOException, MetadataException {
if (node.isDatabase()) {
// should clear this file
clear();
} else {
pageManager.delete(node);
}
}
@Override
public void writeMNode(ICachedMNode node) throws MetadataException, IOException {
long curSegAddr = getNodeAddress(node);
if (node.isDatabase()) {
isEntity = node.isDevice();
setNodeAddress(node, lastSGAddr);
} else {
if (curSegAddr < 0L) {
if (node.isDevice() && node.getAsDeviceMNode().isUseTemplate()) {
throw new MetadataException(
String.format(
"Adding or updating children of device using template [%s] is NOT allowed.",
node.getFullPath()));
}
// now only 32 bits page index is allowed
throw new MetadataException(
String.format(
"Cannot flush any node with negative address [%s] except for DatabaseNode.",
node.getFullPath()));
}
}
pageManager.writeMNode(node);
updateHeaderBuffer();
}
@Override
public ICachedMNode getChildNode(ICachedMNode parent, String childName)
throws MetadataException, IOException {
return pageManager.getChildNode(parent, childName);
}
@Override
public Iterator<ICachedMNode> getChildren(ICachedMNode parent)
throws MetadataException, IOException {
if (parent.isMeasurement() || getNodeAddress(parent) < 0) {
throw new MetadataException(
String.format("Node [%s] has no child in pbtree file.", parent.getFullPath()));
}
return pageManager.getChildren(parent);
}
@Override
public void close() throws IOException {
updateHeaderBuffer();
pageManager.close();
forceChannel();
channel.close();
}
@Override
public void sync() throws IOException {
updateHeaderBuffer();
forceChannel();
}
@Override
public void clear() throws IOException, MetadataException {
pageManager.clear();
pageManager.close();
channel.close();
if (pmtFile.exists()) {
Files.delete(Paths.get(pmtFile.toURI()));
}
pmtFile.createNewFile();
channel = new RandomAccessFile(pmtFile, "rw").getChannel();
headerContent = ByteBuffer.allocate(SchemaFileConfig.FILE_HEADER_SIZE);
initFileHeader();
}
public String inspect() throws MetadataException, IOException {
return inspect(null);
}
public String inspect(PrintWriter pw) throws MetadataException, IOException {
String header =
String.format(
"=============================\n"
+ "== PBTree File Sketch Tool ==\n"
+ "=============================\n"
+ "== Notice: \n"
+ "== Internal/Entity presents as (name, is_aligned, child_segment_address)\n"
+ "== Measurement presents as (name, data_type, encoding, compressor, alias_if_exist)\n"
+ "=============================\n"
+ "Belong to StorageGroup: [%s], segment of SG:%s, total pages:%d\n",
storageGroupName == null ? "NOT SPECIFIED" : storageGroupName,
Long.toHexString(lastSGAddr),
lastPageIndex + 1);
if (pw == null) {
pw = new PrintWriter(System.out);
}
pw.print(header);
pageManager.inspect(pw);
return String.format("SchemaFile[%s] had been inspected.", this.filePath);
}
// endregion
// region File Operations
/**
* This method initiate file header buffer, with an empty file if meant to init. <br>
*
* <p><b>File Header Structure:</b>
*
* <ul>
* <li>1 int (4 bytes): last page index {@link #lastPageIndex}
* <li>var length: root(SG) node info
* <ul>
* <li><s>a. var length string (less than 200 bytes): path to root(SG) node</s>
* <li>a. 1 long (8 bytes): dataTTL {@link #dataTTL}
* <li>b. 1 bool (1 byte): isEntityStorageGroup {@link #isEntity}
* <li>c. 1 int (4 bytes): hash code of template name {@link #sgNodeTemplateIdWithState}
* <li>d. 1 long (8 bytes): last segment address of database {@link #lastSGAddr}
* <li>e. 1 int (4 bytes): version of pbtree file {@linkplain
* SchemaFileConfig#SCHEMA_FILE_VERSION}
* </ul>
* </ul>
*
* ... (Expected to extend for optimization) ...
*/
private void initFileHeader() throws IOException, MetadataException {
if (channel.size() == 0) {
// new pbtree file
lastPageIndex = 0;
ReadWriteIOUtils.write(lastPageIndex, headerContent);
ReadWriteIOUtils.write(dataTTL, headerContent);
ReadWriteIOUtils.write(isEntity, headerContent);
ReadWriteIOUtils.write(sgNodeTemplateIdWithState, headerContent);
ReadWriteIOUtils.write(SchemaFileConfig.SCHEMA_FILE_VERSION, headerContent);
lastSGAddr = 0L;
pageManager = new BTreePageManager(channel, pmtFile, -1, logPath);
} else {
channel.read(headerContent);
headerContent.clear();
lastPageIndex = ReadWriteIOUtils.readInt(headerContent);
dataTTL = ReadWriteIOUtils.readLong(headerContent);
isEntity = ReadWriteIOUtils.readBool(headerContent);
sgNodeTemplateIdWithState = ReadWriteIOUtils.readInt(headerContent);
lastSGAddr = ReadWriteIOUtils.readLong(headerContent);
if (ReadWriteIOUtils.readInt(headerContent) != SchemaFileConfig.SCHEMA_FILE_VERSION) {
channel.close();
throw new MetadataException("SchemaFile with wrong version, please check or upgrade.");
}
pageManager = new BTreePageManager(channel, pmtFile, lastPageIndex, logPath);
}
}
private void updateHeaderBuffer() throws IOException {
headerContent.clear();
ReadWriteIOUtils.write(pageManager.getLastPageIndex(), headerContent);
ReadWriteIOUtils.write(dataTTL, headerContent);
ReadWriteIOUtils.write(isEntity, headerContent);
ReadWriteIOUtils.write(sgNodeTemplateIdWithState, headerContent);
ReadWriteIOUtils.write(lastSGAddr, headerContent);
ReadWriteIOUtils.write(SchemaFileConfig.SCHEMA_FILE_VERSION, headerContent);
headerContent.flip();
channel.write(headerContent, 0);
}
private void forceChannel() throws IOException {
channel.force(true);
}
// endregion
// region Utilities
public static long getGlobalIndex(int pageIndex, short segIndex) {
return (((SchemaFileConfig.PAGE_INDEX_MASK & pageIndex) << SchemaFileConfig.SEG_INDEX_DIGIT)
| (segIndex & SchemaFileConfig.SEG_INDEX_MASK));
}
public static int getPageIndex(long globalIndex) {
return (int)
((globalIndex & (SchemaFileConfig.PAGE_INDEX_MASK << SchemaFileConfig.SEG_INDEX_DIGIT))
>> SchemaFileConfig.SEG_INDEX_DIGIT);
}
public static short getSegIndex(long globalIndex) {
return (short) (globalIndex & SchemaFileConfig.SEG_INDEX_MASK);
}
/** TODO: shall merge with PageManager#reEstimateSegSize */
static short reEstimateSegSize(int oldSize) {
for (short size : SchemaFileConfig.SEG_SIZE_LST) {
if (oldSize < size) {
return size;
}
}
return SchemaFileConfig.SEG_MAX_SIZ;
}
public static long getPageAddress(int pageIndex) {
return (SchemaFileConfig.PAGE_INDEX_MASK & pageIndex) * SchemaFileConfig.PAGE_LENGTH
+ SchemaFileConfig.FILE_HEADER_SIZE;
}
public static long getNodeAddress(ICachedMNode node) {
return ICachedMNodeContainer.getCachedMNodeContainer(node).getSegmentAddress();
}
public static ICachedMNode setNodeAddress(ICachedMNode node, long addr) {
ICachedMNodeContainer.getCachedMNodeContainer(node).setSegmentAddress(addr);
return node;
}
@TestOnly
public ISchemaPage getPageOnTest(int index) throws IOException, MetadataException {
return ((PageManager) pageManager).getPageInstanceOnTest(index);
}
@TestOnly
public long getTargetSegmentOnTest(long srcSegAddr, String key)
throws IOException, MetadataException {
return ((PageManager) pageManager).getTargetSegmentAddressOnTest(srcSegAddr, key);
}
public void setMetric(SchemaRegionCachedMetric metric) {
pageManager.setMetric(metric);
}
// endregion
// region Snapshot
@Override
public boolean createSnapshot(File snapshotDir) {
File schemaFileSnapshot =
SystemFileFactory.INSTANCE.getFile(snapshotDir, SchemaConstant.PBTREE_SNAPSHOT);
try {
sync();
if (schemaFileSnapshot.exists() && !schemaFileSnapshot.delete()) {
logger.error(
"Failed to delete old snapshot {} while creating pbtree file snapshot.",
schemaFileSnapshot.getName());
return false;
}
Files.copy(Paths.get(filePath), schemaFileSnapshot.toPath());
return true;
} catch (IOException e) {
logger.error("Failed to create SchemaFile snapshot due to {}", e.getMessage(), e);
schemaFileSnapshot.delete();
return false;
}
}
public static SchemaFile loadSnapshot(File snapshotDir, String sgName, int schemaRegionId)
throws IOException, MetadataException {
File snapshot = SystemFileFactory.INSTANCE.getFile(snapshotDir, SchemaConstant.PBTREE_SNAPSHOT);
if (!snapshot.exists()) {
throw new SchemaFileNotExists(snapshot.getPath());
}
File schemaFile =
SystemFileFactory.INSTANCE.getFile(
getDirPath(sgName, schemaRegionId), SchemaConstant.PBTREE_FILE_NAME);
Files.deleteIfExists(schemaFile.toPath());
if (!IoTDBDescriptor.getInstance()
.getConfig()
.getSchemaRegionConsensusProtocolClass()
.equals(ConsensusFactory.RATIS_CONSENSUS)) {
// schemaFileLog disabled with RATIS consensus
File schemaLogFile =
SystemFileFactory.INSTANCE.getFile(
getDirPath(sgName, schemaRegionId), SchemaConstant.PBTREE_LOG_FILE_NAME);
Files.deleteIfExists(schemaLogFile.toPath());
}
Files.copy(snapshot.toPath(), schemaFile.toPath());
return new SchemaFile(
sgName,
schemaRegionId,
false,
CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
false);
}
// endregion
}