blob: 8038290add1f13dd4f781d325bbc5fd6042f1643 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TIOStreamTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
public class ProcedureInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureInfo.class);
private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
private final String OLD_PROCEDURE_WAL_DIR =
CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
new ConcurrentHashMap<>();
private final AtomicLong lastProcId = new AtomicLong(-1);
private final ProcedureFactory procedureFactory = ProcedureFactory.getInstance();
private final ConfigManager configManager;
public ProcedureInfo(ConfigManager configManager) {
this.configManager = configManager;
}
public boolean isOldVersion() {
return new File(OLD_PROCEDURE_WAL_DIR).exists();
}
public List<Procedure<ConfigNodeProcedureEnv>> oldLoad() {
List<Procedure<ConfigNodeProcedureEnv>> procedureList = new ArrayList<>();
try (Stream<Path> s = Files.list(Paths.get(OLD_PROCEDURE_WAL_DIR))) {
s.filter(path -> path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
.sorted(
(p1, p2) ->
Long.compareUnsigned(
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
.forEach(path -> loadProcedure(path).ifPresent(procedureList::add));
} catch (IOException e) {
LOGGER.error("Load procedure wal failed.", e);
}
procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(), procedure));
procedureList.forEach(
procedure -> lastProcId.set(Math.max(lastProcId.get(), procedure.getProcId())));
return procedureList;
}
public void upgrade() {
if (isOldVersion()) {
try {
LOGGER.info("Old procedure files have been loaded successfully, taking snapshot...");
configManager.getConsensusManager().manuallyTakeSnapshot();
} catch (ConsensusException e) {
LOGGER.warn("Taking snapshot fail, procedure upgrade fail", e);
return;
}
try {
FileUtils.recursivelyDeleteFolder(OLD_PROCEDURE_WAL_DIR);
} catch (IOException e) {
LOGGER.error("Delete useless procedure wal dir fail.", e);
LOGGER.error(
"You should manually delete the procedure wal dir before ConfigNode restart. {}",
OLD_PROCEDURE_WAL_DIR);
}
LOGGER.info(
"The Procedure framework has been successfully upgraded. Now it uses the consensus layer's services instead of maintaining the WAL itself.");
}
}
public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
Procedure<ConfigNodeProcedureEnv> procedure = updateProcedurePlan.getProcedure();
procedureMap.put(procedure.getProcId(), procedure);
lastProcId.set(Math.max(lastProcId.get(), procedure.getProcId()));
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@TestOnly
public TSStatus oldUpdateProcedure(UpdateProcedurePlan updateProcedurePlan) {
Procedure procedure = updateProcedurePlan.getProcedure();
long procId = procedure.getProcId();
Path path = Paths.get(OLD_PROCEDURE_WAL_DIR, procId + PROCEDURE_WAL_SUFFIX);
ProcedureWAL procedureWAL = new ProcedureWAL(path, procedureFactory);
try {
procedureWAL.save(procedure);
} catch (IOException e) {
LOGGER.error("Update Procedure (pid={}) wal failed", procedure.getProcId(), e);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
public TSStatus deleteProcedure(DeleteProcedurePlan deleteProcedurePlan) {
procedureMap.remove(deleteProcedurePlan.getProcId());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
private static Optional<Procedure> loadProcedure(Path procedureFilePath) {
try (FileInputStream fis = new FileInputStream(procedureFilePath.toFile())) {
Procedure procedure = null;
try (FileChannel channel = fis.getChannel()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
if (channel.read(byteBuffer) > 0) {
byteBuffer.flip();
procedure = ProcedureFactory.getInstance().create(byteBuffer);
byteBuffer.clear();
}
return Optional.ofNullable(procedure);
}
} catch (IOException e) {
LOGGER.error("Load {} failed, it will be deleted.", procedureFilePath, e);
if (!procedureFilePath.toFile().delete()) {
LOGGER.error("{} deleted failed; take appropriate action.", procedureFilePath, e);
}
}
return Optional.empty();
}
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
File procedureSnapshotDir = new File(snapshotDir, PROCEDURE_SNAPSHOT_DIR);
if (procedureSnapshotDir.exists()) {
LOGGER.error(
"Failed to take snapshot, because snapshot dir [{}] is already exist.",
procedureSnapshotDir.getAbsolutePath());
return false;
}
File tmpDir = new File(procedureSnapshotDir.getAbsolutePath() + "-" + UUID.randomUUID());
if (!tmpDir.mkdir()) {
LOGGER.error("Failed to take snapshot, because create tmp dir [{}] fail.", tmpDir);
return false;
}
// save lastProcId
File mainFile = new File(tmpDir.getAbsolutePath() + File.separator + MAIN_SNAPSHOT_FILENAME);
try (FileOutputStream fileOutputStream = new FileOutputStream(mainFile);
DataOutputStream dataOutputStream = new DataOutputStream(fileOutputStream);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileOutputStream)) {
ReadWriteIOUtils.write(lastProcId.get(), fileOutputStream);
tioStreamTransport.flush();
fileOutputStream.getFD().sync();
}
// save all procedures
AtomicBoolean snapshotAllSuccess = new AtomicBoolean(true);
procedureMap
.values()
.forEach(
procedure -> {
try {
new ProcedureWAL(
Paths.get(
tmpDir.getAbsolutePath()
+ File.separator
+ procedure.getProcId()
+ PROCEDURE_SNAPSHOT_FILE_SUFFIX),
procedureFactory)
.save(procedure);
} catch (IOException e) {
snapshotAllSuccess.set(false);
LOGGER.warn(
"{} id {} took snapshot fail", procedure.getClass(), procedure.getProcId(), e);
}
});
if (!snapshotAllSuccess.get()) {
return false;
}
return tmpDir.renameTo(procedureSnapshotDir);
}
@Override
public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
File procedureSnapshotDir = new File(snapshotDir, PROCEDURE_SNAPSHOT_DIR);
if (!procedureSnapshotDir.exists() || !procedureSnapshotDir.isDirectory()) {
LOGGER.error(
"Failed to load snapshot, because snapshot dir [{}] not exists.",
procedureSnapshotDir.getAbsolutePath());
return;
}
File mainFile =
new File(procedureSnapshotDir.getAbsolutePath() + File.separator + MAIN_SNAPSHOT_FILENAME);
try (FileInputStream fileInputStream = new FileInputStream(mainFile)) {
lastProcId.set(ReadWriteIOUtils.readLong(fileInputStream));
}
Arrays.stream(Objects.requireNonNull(procedureSnapshotDir.listFiles()))
.forEach(
procedureSnapshotFile -> {
if (!procedureSnapshotFile.getName().equals(MAIN_SNAPSHOT_FILENAME)) {
loadProcedure(procedureSnapshotFile.toPath())
.ifPresent(procedure -> procedureMap.put(procedure.getProcId(), procedure));
}
});
}
public List<Procedure<ConfigNodeProcedureEnv>> getProcedures() {
return new ArrayList<>(procedureMap.values());
}
public long getNextProcId() {
return this.lastProcId.incrementAndGet();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProcedureInfo procedureInfo = (ProcedureInfo) o;
return lastProcId.get() == procedureInfo.lastProcId.get()
&& procedureMap.equals(procedureInfo.procedureMap);
}
}