blob: ce60cdba0d2a544fd2a548457d3ad48cf5687917 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.modification;
import org.apache.iotdb.db.storageengine.dataregion.modification.io.LocalTextModificationAccessor;
import org.apache.iotdb.db.storageengine.dataregion.modification.io.ModificationReader;
import org.apache.iotdb.db.storageengine.dataregion.modification.io.ModificationWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same
* directory. Methods in this class are highly synchronized for concurrency safety.
*/
public class ModificationFile implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ModificationFile.class);
public static final String FILE_SUFFIX = ".mods";
public static final String COMPACT_SUFFIX = ".settle";
public static final String COMPACTION_FILE_SUFFIX = ".compaction.mods";
// whether to verify the last line, it may be incomplete in extreme cases
private boolean needVerify = true;
private final ModificationWriter writer;
private final ModificationReader reader;
private String filePath;
private final SecureRandom random = new SecureRandom();
private static final long COMPACT_THRESHOLD = 1024 * 1024;
private boolean hasCompacted = false;
/**
* Construct a ModificationFile using a file as its storage.
*
* @param filePath the path of the storage file.
*/
public ModificationFile(String filePath) {
LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(filePath);
this.writer = accessor;
this.reader = accessor;
this.filePath = filePath;
}
/** Release resources such as streams and caches. */
@Override
public void close() throws IOException {
synchronized (this) {
writer.close();
}
}
/**
* Write a modification in this file. The modification will first be written to the persistent
* store then the memory cache.
*
* @param mod the modification to be written.
* @throws IOException if IOException is thrown when writing the modification to the store.
*/
public void write(Modification mod) throws IOException {
synchronized (this) {
if (needVerify && new File(filePath).exists()) {
writer.mayTruncateLastLine();
needVerify = false;
}
writer.write(mod);
}
}
/**
* Write a modification in this file. The modification will first be written to the persistent
* store then the memory cache. Notice that this method does not synchronize to physical disk
* after
*
* @param mod the modification to be written.
* @throws IOException if IOException is thrown when writing the modification to the store.
*/
public void writeWithoutSync(Modification mod) throws IOException {
synchronized (this) {
if (needVerify && new File(filePath).exists()) {
writer.mayTruncateLastLine();
needVerify = false;
}
writer.writeWithOutSync(mod);
}
}
@GuardedBy("TsFileResource-WriteLock")
public void truncate(long size) {
writer.truncate(size);
}
/**
* Get all modifications stored in this file.
*
* @return an ArrayList of modifications.
*/
public Collection<Modification> getModifications() {
synchronized (this) {
return reader.read();
}
}
public Iterable<Modification> getModificationsIter() {
return reader::getModificationIterator;
}
public String getFilePath() {
return filePath;
}
public void setFilePath(String filePath) {
this.filePath = filePath;
}
public void remove() throws IOException {
close();
boolean deleted = FSFactoryProducer.getFSFactory().getFile(filePath).delete();
if (!deleted) {
logger.warn("Delete ModificationFile {} failed.", filePath);
}
}
public boolean exists() {
return new File(filePath).exists();
}
/**
* Create a hardlink for the modification file. The hardlink with have a suffix like
* ".{sysTime}_{randomLong}"
*
* @return a new ModificationFile with its path changed to the hardlink, or null if the origin
* file does not exist or the hardlink cannot be created.
*/
public ModificationFile createHardlink() {
if (!exists()) {
return null;
}
while (true) {
String hardlinkSuffix =
TsFileConstant.PATH_SEPARATOR + System.currentTimeMillis() + "_" + random.nextLong();
File hardlink = new File(filePath + hardlinkSuffix);
try {
Files.createLink(Paths.get(hardlink.getAbsolutePath()), Paths.get(filePath));
return new ModificationFile(hardlink.getAbsolutePath());
} catch (FileAlreadyExistsException e) {
// retry a different name if the file is already created
} catch (IOException e) {
logger.error("Cannot create hardlink for {}", filePath, e);
return null;
}
}
}
public static ModificationFile getNormalMods(TsFileResource tsFileResource) {
return new ModificationFile(tsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX);
}
public static ModificationFile getCompactionMods(TsFileResource tsFileResource) {
return new ModificationFile(
tsFileResource.getTsFilePath() + ModificationFile.COMPACTION_FILE_SUFFIX);
}
public long getSize() {
File file = new File(filePath);
if (file.exists()) {
return file.length();
} else {
return 0;
}
}
public void compact() {
long originFileSize = getSize();
if (originFileSize > COMPACT_THRESHOLD && !hasCompacted) {
Map<String, List<Modification>> pathModificationMap =
getModifications().stream().collect(Collectors.groupingBy(Modification::getPathString));
String newModsFileName = filePath + COMPACT_SUFFIX;
List<Modification> allSettledModifications = new ArrayList<>();
try (ModificationFile compactedModificationFile = new ModificationFile(newModsFileName)) {
Set<Map.Entry<String, List<Modification>>> modificationsEntrySet =
pathModificationMap.entrySet();
for (Map.Entry<String, List<Modification>> modificationEntry : modificationsEntrySet) {
List<Modification> settledModifications = sortAndMerge(modificationEntry.getValue());
for (Modification settledModification : settledModifications) {
compactedModificationFile.write(settledModification);
}
allSettledModifications.addAll(settledModifications);
}
} catch (IOException e) {
logger.error("compact mods file exception of {}", filePath, e);
}
try {
// remove origin mods file
this.remove();
// rename new mods file to origin name
Files.move(new File(newModsFileName).toPath(), new File(filePath).toPath());
logger.info("{} settle successful", filePath);
if (getSize() > COMPACT_THRESHOLD) {
logger.warn(
"After the mod file is settled, the file size is still greater than 1M,the size of the file before settle is {},after settled the file size is {}",
originFileSize,
getSize());
}
} catch (IOException e) {
logger.error("remove origin file or rename new mods file error.", e);
}
hasCompacted = true;
}
}
public static List<Modification> sortAndMerge(List<Modification> modifications) {
modifications.sort(
(o1, o2) -> {
if (!o1.getType().equals(o2.getType())) {
return o1.getType().compareTo(o2.getType());
} else if (!o1.getPath().equals(o2.getPath())) {
return o1.getPath().compareTo(o2.getPath());
} else if (o1.getFileOffset() != o2.getFileOffset()) {
return (int) (o1.getFileOffset() - o2.getFileOffset());
} else {
if (o1.getType() == Modification.Type.DELETION) {
Deletion del1 = (Deletion) o1;
Deletion del2 = (Deletion) o2;
return del1.getTimeRange().compareTo(del2.getTimeRange());
}
throw new IllegalArgumentException();
}
});
List<Modification> result = new ArrayList<>();
if (!modifications.isEmpty()) {
Deletion current = ((Deletion) modifications.get(0)).clone();
for (int i = 1; i < modifications.size(); i++) {
Deletion del = (Deletion) modifications.get(i);
if (current.intersects(del)) {
current.merge(del);
} else {
result.add(current);
current = del.clone();
}
}
result.add(current);
}
return result;
}
}