blob: da74389e1b8c8c32675e981b21b30118dd80be38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.trigger.service;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.trigger.api.Trigger;
import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.TriggerExecutionException;
import org.apache.iotdb.db.exception.TriggerManagementException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TRIGGER_ATTRIBUTES;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TRIGGER_CLASS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TRIGGER_EVENT;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TRIGGER_NAME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TRIGGER_PATH;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TRIGGER_STATUS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TRIGGER_STATUS_STARTED;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TRIGGER_STATUS_STOPPED;
public class TriggerRegistrationService implements IService {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerRegistrationService.class);
private static final String LOG_FILE_DIR =
IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ File.separator
+ "trigger"
+ File.separator;
private static final String LOG_FILE_NAME = LOG_FILE_DIR + "tlog.bin";
private static final String TEMPORARY_LOG_FILE_NAME = LOG_FILE_NAME + ".tmp";
private static final String LIB_ROOT = IoTDBDescriptor.getInstance().getConfig().getTriggerDir();
private final ConcurrentHashMap<String, TriggerExecutor> executors;
private TriggerLogWriter logWriter;
private TriggerRegistrationService() {
executors = new ConcurrentHashMap<>();
}
public synchronized void register(CreateTriggerPlan plan)
throws TriggerManagementException, TriggerExecutionException {
checkIfRegistered(plan);
IMeasurementMNode measurementMNode = tryGetMeasurementMNode(plan);
tryAppendRegistrationLog(plan);
doRegister(plan, measurementMNode);
}
private void checkIfRegistered(CreateTriggerPlan plan) throws TriggerManagementException {
TriggerExecutor executor = executors.get(plan.getTriggerName());
if (executor == null) {
return;
}
TriggerRegistrationInformation information = executor.getRegistrationInformation();
throw new TriggerManagementException(
information.getClassName().equals(plan.getClassName())
? String.format(
"Failed to register trigger %s(%s), because a trigger with the same trigger name and the class name has already been registered.",
plan.getTriggerName(), plan.getClassName())
: String.format(
"Failed to register trigger %s(%s), because a trigger %s(%s) with the same trigger name but a different class name has already been registered.",
plan.getTriggerName(),
plan.getClassName(),
information.getTriggerName(),
information.getClassName()));
}
private IMeasurementMNode tryGetMeasurementMNode(CreateTriggerPlan plan)
throws TriggerManagementException {
try {
return (IMeasurementMNode) IoTDB.metaManager.getNodeByPath(plan.getFullPath());
} catch (MetadataException e) {
throw new TriggerManagementException(e.getMessage(), e);
} catch (ClassCastException e) {
throw new TriggerManagementException("Triggers can only be registered on MeasurementMNode.");
}
}
private void tryAppendRegistrationLog(CreateTriggerPlan plan) throws TriggerManagementException {
try {
logWriter.write(plan);
} catch (IOException e) {
throw new TriggerManagementException(
String.format(
"Failed to append trigger management operation log when registering trigger %s(%s), because %s",
plan.getTriggerName(), plan.getClassName(), e));
}
}
private void doRegister(CreateTriggerPlan plan, IMeasurementMNode measurementMNode)
throws TriggerManagementException, TriggerExecutionException {
TriggerRegistrationInformation information = new TriggerRegistrationInformation(plan);
TriggerClassLoader classLoader =
TriggerClassLoaderManager.getInstance().register(plan.getClassName());
TriggerExecutor executor;
try {
executor = new TriggerExecutor(information, classLoader, measurementMNode);
executor.onCreate();
} catch (TriggerManagementException | TriggerExecutionException e) {
TriggerClassLoaderManager.getInstance().deregister(plan.getClassName());
throw e;
}
executors.put(plan.getTriggerName(), executor);
measurementMNode.setTriggerExecutor(executor);
}
public synchronized void deregister(DropTriggerPlan plan) throws TriggerManagementException {
getTriggerExecutorWithExistenceCheck(plan.getTriggerName());
tryAppendDeregistrationLog(plan);
doDeregister(plan);
}
private TriggerExecutor getTriggerExecutorWithExistenceCheck(String triggerName)
throws TriggerManagementException {
TriggerExecutor executor = executors.get(triggerName);
if (executor == null) {
throw new TriggerManagementException(
String.format("Trigger %s does not exist.", triggerName));
}
return executor;
}
private void tryAppendDeregistrationLog(DropTriggerPlan plan) throws TriggerManagementException {
try {
logWriter.write(plan);
} catch (IOException e) {
throw new TriggerManagementException(
String.format(
"Failed to drop trigger %s because the operation plan was failed to log: %s",
plan.getTriggerName(), e));
}
}
private void doDeregister(DropTriggerPlan plan) {
TriggerExecutor executor = executors.remove(plan.getTriggerName());
executor.getMeasurementMNode().setTriggerExecutor(null);
try {
executor.onDrop();
} catch (TriggerExecutionException e) {
LOGGER.warn(e.getMessage(), e);
}
TriggerClassLoaderManager.getInstance()
.deregister(executor.getRegistrationInformation().getClassName());
}
public void activate(StartTriggerPlan plan)
throws TriggerManagementException, TriggerExecutionException {
TriggerExecutor executor = getTriggerExecutorWithExistenceCheck(plan.getTriggerName());
if (!executor.getRegistrationInformation().isStopped()) {
throw new TriggerManagementException(
String.format("Trigger %s has already been started.", plan.getTriggerName()));
}
try {
logWriter.write(plan);
} catch (IOException e) {
throw new TriggerManagementException(
String.format(
"Failed to append trigger management operation log when starting trigger %s, because %s",
plan.getTriggerName(), e));
}
executor.onStart();
}
public void inactivate(StopTriggerPlan plan) throws TriggerManagementException {
TriggerExecutor executor = getTriggerExecutorWithExistenceCheck(plan.getTriggerName());
if (executor.getRegistrationInformation().isStopped()) {
throw new TriggerManagementException(
String.format("Trigger %s has already been stopped.", plan.getTriggerName()));
}
try {
logWriter.write(plan);
} catch (IOException e) {
throw new TriggerManagementException(
String.format(
"Failed to append trigger management operation log when stopping trigger %s, because %s",
plan.getTriggerName(), e));
}
try {
executor.onStop();
} catch (TriggerExecutionException e) {
LOGGER.warn(
"Failed to stop the executor of trigger {}({})",
executor.getRegistrationInformation().getTriggerName(),
executor.getRegistrationInformation().getClassName(),
e);
}
}
public QueryDataSet show() {
ListDataSet dataSet =
new ListDataSet(
Arrays.asList(
new PartialPath(COLUMN_TRIGGER_NAME, false),
new PartialPath(COLUMN_TRIGGER_STATUS, false),
new PartialPath(COLUMN_TRIGGER_EVENT, false),
new PartialPath(COLUMN_TRIGGER_PATH, false),
new PartialPath(COLUMN_TRIGGER_CLASS, false),
new PartialPath(COLUMN_TRIGGER_ATTRIBUTES, false)),
Arrays.asList(
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT,
TSDataType.TEXT));
putTriggerRecords(dataSet);
return dataSet;
}
private void putTriggerRecords(ListDataSet dataSet) {
for (TriggerExecutor executor : executors.values().toArray(new TriggerExecutor[0])) {
TriggerRegistrationInformation information = executor.getRegistrationInformation();
RowRecord rowRecord = new RowRecord(0); // ignore timestamp
rowRecord.addField(Binary.valueOf(information.getTriggerName()), TSDataType.TEXT);
rowRecord.addField(
Binary.valueOf(
information.isStopped()
? COLUMN_TRIGGER_STATUS_STOPPED
: COLUMN_TRIGGER_STATUS_STARTED),
TSDataType.TEXT);
rowRecord.addField(Binary.valueOf(information.getEvent().toString()), TSDataType.TEXT);
rowRecord.addField(Binary.valueOf(information.getFullPath().getFullPath()), TSDataType.TEXT);
rowRecord.addField(Binary.valueOf(information.getClassName()), TSDataType.TEXT);
rowRecord.addField(Binary.valueOf(information.getAttributes().toString()), TSDataType.TEXT);
dataSet.putRecord(rowRecord);
}
}
@Override
public void start() throws StartupException {
try {
makeDirIfNecessary(LIB_ROOT);
makeDirIfNecessary(LOG_FILE_DIR);
doRecovery();
logWriter = new TriggerLogWriter(LOG_FILE_NAME);
} catch (Exception e) {
throw new StartupException(e);
}
}
private static void makeDirIfNecessary(String dir) throws IOException {
File file = SystemFileFactory.INSTANCE.getFile(dir);
if (file.exists() && file.isDirectory()) {
return;
}
FileUtils.forceMkdir(file);
}
private void doRecovery() throws IOException, TriggerManagementException {
File temporaryLogFile = SystemFileFactory.INSTANCE.getFile(TEMPORARY_LOG_FILE_NAME);
File logFile = SystemFileFactory.INSTANCE.getFile(LOG_FILE_NAME);
if (temporaryLogFile.exists()) {
if (logFile.exists()) {
doRecoveryFromLogFile(logFile);
FileUtils.deleteQuietly(temporaryLogFile);
} else {
doRecoveryFromLogFile(temporaryLogFile);
FSFactoryProducer.getFSFactory().moveFile(temporaryLogFile, logFile);
}
} else if (logFile.exists()) {
doRecoveryFromLogFile(logFile);
}
}
private void doRecoveryFromLogFile(File logFile) throws IOException, TriggerManagementException {
for (CreateTriggerPlan createTriggerPlan : recoverCreateTriggerPlans(logFile)) {
try {
doRegister(createTriggerPlan, tryGetMeasurementMNode(createTriggerPlan));
if (createTriggerPlan.isStopped()) {
executors.get(createTriggerPlan.getTriggerName()).onStop();
}
} catch (TriggerExecutionException | TriggerManagementException e) {
LOGGER.error(
"Failed to register the trigger {}({}) during recovering.",
createTriggerPlan.getTriggerName(),
createTriggerPlan.getClassName());
}
}
}
private Collection<CreateTriggerPlan> recoverCreateTriggerPlans(File logFile)
throws IOException, TriggerManagementException {
Map<String, CreateTriggerPlan> recoveredCreateTriggerPlans = new HashMap<>();
try (TriggerLogReader reader = new TriggerLogReader(logFile)) {
while (reader.hasNext()) {
PhysicalPlan plan = reader.next();
CreateTriggerPlan createTriggerPlan;
switch (plan.getOperatorType()) {
case CREATE_TRIGGER:
recoveredCreateTriggerPlans.put(
((CreateTriggerPlan) plan).getTriggerName(), (CreateTriggerPlan) plan);
break;
case DROP_TRIGGER:
recoveredCreateTriggerPlans.remove(((DropTriggerPlan) plan).getTriggerName());
break;
case START_TRIGGER:
createTriggerPlan =
recoveredCreateTriggerPlans.get(((StartTriggerPlan) plan).getTriggerName());
if (createTriggerPlan != null) {
createTriggerPlan.markAsStarted();
}
break;
case STOP_TRIGGER:
createTriggerPlan =
recoveredCreateTriggerPlans.get(((StopTriggerPlan) plan).getTriggerName());
if (createTriggerPlan != null) {
createTriggerPlan.markAsStopped();
}
break;
default:
throw new TriggerManagementException(
"Unrecognized trigger management operation plan is recovered.");
}
}
}
return recoveredCreateTriggerPlans.values();
}
@Override
public void stop() {
try {
writeTemporaryLogFile();
logWriter.close();
logWriter.deleteLogFile();
File temporaryLogFile = SystemFileFactory.INSTANCE.getFile(TEMPORARY_LOG_FILE_NAME);
File logFile = SystemFileFactory.INSTANCE.getFile(LOG_FILE_NAME);
FSFactoryProducer.getFSFactory().moveFile(temporaryLogFile, logFile);
} catch (IOException ignored) {
// ignored
}
}
private void writeTemporaryLogFile() throws IOException {
try (TriggerLogWriter temporaryLogWriter = new TriggerLogWriter(TEMPORARY_LOG_FILE_NAME)) {
for (TriggerExecutor executor : executors.values()) {
TriggerRegistrationInformation information = executor.getRegistrationInformation();
temporaryLogWriter.write(information.convertToCreateTriggerPlan());
if (information.isStopped()) {
temporaryLogWriter.write(new StopTriggerPlan(information.getTriggerName()));
}
}
}
}
@TestOnly
public void deregisterAll() throws TriggerManagementException {
for (TriggerExecutor executor : executors.values()) {
deregister(new DropTriggerPlan(executor.getRegistrationInformation().getTriggerName()));
}
}
@TestOnly
public Trigger getTriggerInstance(String triggerName) throws TriggerManagementException {
return getTriggerExecutorWithExistenceCheck(triggerName).getTrigger();
}
@TestOnly
public TriggerRegistrationInformation getRegistrationInformation(String triggerName)
throws TriggerManagementException {
return getTriggerExecutorWithExistenceCheck(triggerName).getRegistrationInformation();
}
@Override
public ServiceType getID() {
return ServiceType.TRIGGER_REGISTRATION_SERVICE;
}
public static TriggerRegistrationService getInstance() {
return TriggerRegistrationService.TriggerRegistrationServiceHelper.INSTANCE;
}
private static class TriggerRegistrationServiceHelper {
private static final TriggerRegistrationService INSTANCE = new TriggerRegistrationService();
private TriggerRegistrationServiceHelper() {}
}
}