blob: e324c6a7c4db7ba208beb5e53071129b0d175f61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
import org.apache.iotdb.confignode.consensus.response.JarResp;
import org.apache.iotdb.confignode.consensus.response.trigger.TransferringTriggersResp;
import org.apache.iotdb.confignode.consensus.response.trigger.TriggerLocationResp;
import org.apache.iotdb.confignode.consensus.response.trigger.TriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
public class TriggerInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerInfo.class);
private static final ConfigNodeConfig CONFIG_NODE_CONF =
ConfigNodeDescriptor.getInstance().getConf();
private final TriggerTable triggerTable;
private final Map<String, String> existedJarToMD5;
private final TriggerExecutableManager triggerExecutableManager;
private final ReentrantLock triggerTableLock = new ReentrantLock();
private static final String SNAPSHOT_FILENAME = "trigger_info.bin";
public TriggerInfo() throws IOException {
triggerTable = new TriggerTable();
existedJarToMD5 = new HashMap<>();
triggerExecutableManager =
TriggerExecutableManager.setupAndGetInstance(
CONFIG_NODE_CONF.getTriggerTemporaryLibDir(), CONFIG_NODE_CONF.getTriggerDir());
}
public void acquireTriggerTableLock() {
LOGGER.info("acquire TriggerTableLock");
triggerTableLock.lock();
}
public void releaseTriggerTableLock() {
LOGGER.info("release TriggerTableLock");
triggerTableLock.unlock();
}
/** Validate whether the trigger can be created. */
public void validate(String triggerName, String jarName, String jarMD5)
throws TriggerManagementException {
if (triggerTable.containsTrigger(triggerName)) {
throw new TriggerManagementException(
String.format(
"Failed to create trigger [%s], the same name trigger has been created",
triggerName));
}
if (existedJarToMD5.containsKey(jarName) && !existedJarToMD5.get(jarName).equals(jarMD5)) {
throw new TriggerManagementException(
String.format(
"Failed to create trigger [%s], the same name Jar [%s] but different MD5 [%s] has existed",
triggerName, jarName, jarMD5));
}
}
/** Validate whether the trigger can be dropped. */
public void validate(String triggerName) throws TriggerManagementException {
if (triggerTable.containsTrigger(triggerName)) {
return;
}
throw new TriggerManagementException(
String.format(
"Failed to drop trigger [%s], this trigger has not been created", triggerName));
}
public boolean needToSaveJar(String jarName) {
return !existedJarToMD5.containsKey(jarName);
}
public TSStatus addTriggerInTable(AddTriggerInTablePlan physicalPlan) {
try {
TriggerInformation triggerInformation = physicalPlan.getTriggerInformation();
triggerTable.addTriggerInformation(triggerInformation.getTriggerName(), triggerInformation);
if (triggerInformation.isUsingURI()) {
existedJarToMD5.put(triggerInformation.getJarName(), triggerInformation.getJarFileMD5());
if (physicalPlan.getJarFile() != null) {
triggerExecutableManager.saveToInstallDir(
ByteBuffer.wrap(physicalPlan.getJarFile().getValues()),
triggerInformation.getJarName());
}
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
final String errorMessage =
String.format(
"Failed to add trigger [%s] in TriggerTable on Config Nodes, because of %s",
physicalPlan.getTriggerInformation().getTriggerName(), e);
LOGGER.warn(errorMessage, e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(errorMessage);
}
}
public TSStatus deleteTriggerInTable(DeleteTriggerInTablePlan physicalPlan) {
String triggerName = physicalPlan.getTriggerName();
if (triggerTable.containsTrigger(triggerName)) {
existedJarToMD5.remove(triggerTable.getTriggerInformation(triggerName).getJarName());
triggerTable.deleteTriggerInformation(triggerName);
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
public TSStatus updateTriggerStateInTable(UpdateTriggerStateInTablePlan physicalPlan) {
triggerTable.setTriggerState(physicalPlan.getTriggerName(), physicalPlan.getTriggerState());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
public TriggerTableResp getTriggerTable(GetTriggerTablePlan req) {
if (req.isOnlyStateful()) {
return new TriggerTableResp(
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
triggerTable.getAllStatefulTriggerInformation());
} else {
return new TriggerTableResp(
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
triggerTable.getAllTriggerInformation());
}
}
public DataSet getTriggerLocation(GetTriggerLocationPlan req) {
TDataNodeLocation dataNodeLocation = triggerTable.getTriggerLocation(req.getTriggerName());
if (dataNodeLocation != null) {
return new TriggerLocationResp(
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), dataNodeLocation);
} else {
return new TriggerLocationResp(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(String.format("Fail to get Location trigger[%s]", req.getTriggerName())),
null);
}
}
public JarResp getTriggerJar(GetTriggerJarPlan physicalPlan) {
List<ByteBuffer> jarList = new ArrayList<>();
try {
for (String jarName : physicalPlan.getJarNames()) {
jarList.add(
ExecutableManager.transferToBytebuffer(
TriggerExecutableManager.getInstance().getFileStringUnderInstallByName(jarName)));
}
} catch (Exception e) {
LOGGER.error("Get TriggerJar failed", e);
return new JarResp(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage("Get TriggerJar failed, because " + e.getMessage()),
Collections.emptyList());
}
return new JarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
}
public TransferringTriggersResp getTransferringTriggers() {
return new TransferringTriggersResp(triggerTable.getTransferringTriggers());
}
public TSStatus updateTriggersOnTransferNodes(UpdateTriggersOnTransferNodesPlan physicalPlan) {
triggerTable.updateTriggersOnTransferNodes(physicalPlan.getDataNodeLocations());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
public TSStatus updateTriggerLocation(UpdateTriggerLocationPlan physicalPlan) {
triggerTable.updateTriggerLocation(
physicalPlan.getTriggerName(), physicalPlan.getDataNodeLocation());
triggerTable.setTriggerState(physicalPlan.getTriggerName(), TTriggerState.ACTIVE);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@TestOnly
public Map<String, TriggerInformation> getRawTriggerTable() {
return triggerTable.getTable();
}
@TestOnly
public Map<String, String> getRawExistedJarToMD5() {
return existedJarToMD5;
}
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
if (snapshotFile.exists() && snapshotFile.isFile()) {
LOGGER.error(
"Failed to take snapshot, because snapshot file [{}] is already exist.",
snapshotFile.getAbsolutePath());
return false;
}
acquireTriggerTableLock();
try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
serializeExistedJarToMD5(fileOutputStream);
triggerTable.serializeTriggerTable(fileOutputStream);
// fsync
fileOutputStream.getFD().sync();
return true;
} finally {
releaseTriggerTableLock();
}
}
@Override
public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
if (!snapshotFile.exists() || !snapshotFile.isFile()) {
LOGGER.error(
"Failed to load snapshot,snapshot file [{}] is not exist.",
snapshotFile.getAbsolutePath());
return;
}
acquireTriggerTableLock();
try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
clear();
deserializeExistedJarToMD5(fileInputStream);
triggerTable.deserializeTriggerTable(fileInputStream);
} finally {
releaseTriggerTableLock();
}
}
public void serializeExistedJarToMD5(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(existedJarToMD5.size(), outputStream);
for (Map.Entry<String, String> entry : existedJarToMD5.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
}
public void deserializeExistedJarToMD5(InputStream inputStream) throws IOException {
int size = ReadWriteIOUtils.readInt(inputStream);
while (size > 0) {
existedJarToMD5.put(
ReadWriteIOUtils.readString(inputStream), ReadWriteIOUtils.readString(inputStream));
size--;
}
}
public void clear() {
existedJarToMD5.clear();
triggerTable.clear();
}
}