blob: 48a0d166a496053d75e586e533c7fb2b2f6825a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.trigger.executor;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.exception.TriggerExecutionException;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerFireVisitor.class);
private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
/**
* How many times should we retry when error occurred during firing a trigger on another datanode.
*/
private static final int FIRE_RETRY_NUM =
IoTDBDescriptor.getInstance().getConfig().getRetryNumToFindStatefulTrigger();
@Override
public TriggerFireResult process(PlanNode node, TriggerEvent context) {
if (TriggerManagementService.getInstance().isTriggerTableEmpty()) {
return TriggerFireResult.SUCCESS;
}
return node.accept(this, context);
}
@Override
public TriggerFireResult visitPlan(PlanNode node, TriggerEvent context) {
return TriggerFireResult.SUCCESS;
}
@Override
public TriggerFireResult visitInsertRow(InsertRowNode node, TriggerEvent context) {
Map<String, List<String>> triggerNameToMeasurementList =
constructTriggerNameToMeasurementListMap(node, context);
// return success if no trigger is found
if (triggerNameToMeasurementList.isEmpty()) {
return TriggerFireResult.SUCCESS;
}
MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
Map<String, Integer> measurementToSchemaIndexMap =
constructMeasurementToSchemaIndexMap(node.getMeasurements(), measurementSchemas);
Object[] values = node.getValues();
long time = node.getTime();
boolean hasFailedTrigger = false;
for (Map.Entry<String, List<String>> entry : triggerNameToMeasurementList.entrySet()) {
List<MeasurementSchema> schemas =
entry.getValue().stream()
.map(measurement -> measurementSchemas[measurementToSchemaIndexMap.get(measurement)])
.collect(Collectors.toList());
// only one row
Tablet tablet = new Tablet(node.getDevicePath().getFullPath(), schemas, 1);
// add one row
tablet.rowSize++;
tablet.addTimestamp(0, time);
for (String measurement : entry.getValue()) {
tablet.addValue(measurement, 0, values[measurementToSchemaIndexMap.get(measurement)]);
}
TriggerFireResult result = fire(entry.getKey(), tablet, context);
// Terminate if a trigger with pessimistic strategy messes up
if (result.equals(TriggerFireResult.TERMINATION)) {
return result;
}
if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
hasFailedTrigger = true;
}
}
return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
}
@Override
public TriggerFireResult visitInsertTablet(InsertTabletNode node, TriggerEvent context) {
// group Triggers and measurements
Map<String, List<String>> triggerNameToMeasurementList =
constructTriggerNameToMeasurementListMap(node, context);
// return success if no trigger is found
if (triggerNameToMeasurementList.isEmpty()) {
return TriggerFireResult.SUCCESS;
}
MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
Map<String, Integer> measurementToSchemaIndexMap =
constructMeasurementToSchemaIndexMap(node.getMeasurements(), measurementSchemas);
Object[] columns = node.getColumns();
BitMap[] bitMaps = node.getBitMaps();
long[] timestamps = node.getTimes();
int rowCount = node.getRowCount();
boolean hasFailedTrigger = false;
for (Map.Entry<String, List<String>> entry : triggerNameToMeasurementList.entrySet()) {
Tablet tablet;
if (entry.getValue().size() == measurementSchemas.length) {
// all measurements are included
tablet =
new Tablet(
node.getDevicePath().getFullPath(),
Arrays.asList(measurementSchemas),
timestamps,
columns,
bitMaps,
rowCount);
} else {
// choose specified columns
List<MeasurementSchema> schemas =
entry.getValue().stream()
.map(
measurement -> measurementSchemas[measurementToSchemaIndexMap.get(measurement)])
.collect(Collectors.toList());
Object[] columnsOfNewTablet =
entry.getValue().stream()
.map(measurement -> columns[measurementToSchemaIndexMap.get(measurement)])
.toArray();
BitMap[] bitMapsOfNewTablet = new BitMap[entry.getValue().size()];
if (bitMaps != null) {
for (int i = 0; i < entry.getValue().size(); i++) {
bitMapsOfNewTablet[i] =
bitMaps[measurementToSchemaIndexMap.get(entry.getValue().get(i))];
}
}
tablet =
new Tablet(
node.getDevicePath().getFullPath(),
schemas,
timestamps,
columnsOfNewTablet,
bitMapsOfNewTablet,
rowCount);
}
TriggerFireResult result = fire(entry.getKey(), tablet, context);
// Terminate if a trigger with pessimistic strategy messes up
if (result.equals(TriggerFireResult.TERMINATION)) {
return result;
}
if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
hasFailedTrigger = true;
}
}
return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
}
@Override
public TriggerFireResult visitInsertRows(InsertRowsNode node, TriggerEvent context) {
boolean hasFailedTrigger = false;
for (InsertRowNode insertRowNode : node.getInsertRowNodeList()) {
TriggerFireResult result = visitInsertRow(insertRowNode, context);
if (result.equals(TriggerFireResult.TERMINATION)) {
return result;
}
if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
hasFailedTrigger = true;
}
}
return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
}
@Override
public TriggerFireResult visitInsertMultiTablets(
InsertMultiTabletsNode node, TriggerEvent context) {
boolean hasFailedTrigger = false;
for (InsertTabletNode insertTabletNode : node.getInsertTabletNodeList()) {
TriggerFireResult result = visitInsertTablet(insertTabletNode, context);
if (result.equals(TriggerFireResult.TERMINATION)) {
return result;
}
if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
hasFailedTrigger = true;
}
}
return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
}
@Override
public TriggerFireResult visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceNode node, TriggerEvent context) {
boolean hasFailedTrigger = false;
for (InsertRowNode insertRowNode : node.getInsertRowNodeList()) {
TriggerFireResult result = visitInsertRow(insertRowNode, context);
if (result.equals(TriggerFireResult.TERMINATION)) {
return result;
}
if (result.equals(TriggerFireResult.FAILED_NO_TERMINATION)) {
hasFailedTrigger = true;
}
}
return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION : TriggerFireResult.SUCCESS;
}
@Override
public TriggerFireResult visitPipeEnrichedInsertNode(
PipeEnrichedInsertNode node, TriggerEvent context) {
return node.getInsertNode().accept(this, context);
}
private Map<String, Integer> constructMeasurementToSchemaIndexMap(
String[] measurements, MeasurementSchema[] schemas) {
// The index of measurement and schema is the same now.
// However, in case one day the order changes, we need to construct an index map.
Map<String, Integer> indexMap = new HashMap<>();
for (int i = 0, n = measurements.length; i < n; i++) {
if (measurements[i] == null) {
continue;
}
// It is the same now
if (schemas[i] != null && schemas[i].getMeasurementId().equals(measurements[i])) {
indexMap.put(measurements[i], i);
continue;
}
for (int j = 0, m = schemas.length; j < m; j++) {
if (schemas[j] != null && schemas[j].getMeasurementId().equals(measurements[i])) {
indexMap.put(measurements[i], j);
break;
}
}
}
return indexMap;
}
private Map<String, List<String>> constructTriggerNameToMeasurementListMap(
InsertNode node, TriggerEvent event) {
PartialPath device = node.getDevicePath();
List<String> measurements = new ArrayList<>();
for (String measurement : node.getMeasurements()) {
if (measurement != null) {
measurements.add(measurement);
}
}
List<List<String>> triggerNameLists =
TriggerManagementService.getInstance().getMatchedTriggerListForPath(device, measurements);
boolean isAllEmpty = true;
for (List<String> triggerNameList : triggerNameLists) {
if (!triggerNameList.isEmpty()) {
isAllEmpty = false;
break;
}
}
if (isAllEmpty) {
return Collections.emptyMap();
}
Map<String, List<String>> triggerNameToPaths = new HashMap<>();
TriggerTable triggerTable = TriggerManagementService.getInstance().getTriggerTable();
for (int i = 0, n = measurements.size(); i < n; i++) {
for (String triggerName : triggerNameLists.get(i)) {
TriggerInformation triggerInformation = triggerTable.getTriggerInformation(triggerName);
if (triggerInformation.getEvent().equals(event)
&& triggerInformation.getTriggerState().equals(TTriggerState.ACTIVE)) {
triggerNameToPaths
.computeIfAbsent(triggerName, k -> new ArrayList<>())
.add(measurements.get(i));
}
}
}
return triggerNameToPaths;
}
private TriggerFireResult fire(String triggerName, Tablet tablet, TriggerEvent event) {
TriggerFireResult result = TriggerFireResult.SUCCESS;
for (int i = 0; i < FIRE_RETRY_NUM; i++) {
if (TriggerManagementService.getInstance().needToFireOnAnotherDataNode(triggerName)) {
TDataNodeLocation tDataNodeLocation =
TriggerManagementService.getInstance()
.getDataNodeLocationOfStatefulTrigger(triggerName);
try (SyncDataNodeInternalServiceClient client =
Coordinator.getInstance()
.getInternalServiceClientManager()
.borrowClient(tDataNodeLocation.getInternalEndPoint())) {
TFireTriggerReq req = new TFireTriggerReq(triggerName, tablet.serialize(), event.getId());
TFireTriggerResp resp = client.fireTrigger(req);
if (resp.foundExecutor) {
// we successfully found an executor on another data node
return TriggerFireResult.construct(resp.getFireResult());
} else {
// update TDataNodeLocation of stateful trigger through config node
if (!updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId())) {
// if TDataNodeLocation is still the same, sleep 1s and before the retry
Thread.sleep(4000);
}
}
} catch (ClientManagerException | TException e) {
// IOException means that we failed to borrow client, possibly because corresponding
// DataNode is down.
// TException means there's a timeout or broken connection.
// We need to update local TriggerTable with the new TDataNodeLocation of the stateful
// trigger.
LOGGER.warn(
"Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
triggerName,
tDataNodeLocation.getInternalEndPoint(),
e);
// update TDataNodeLocation of stateful trigger through config node
updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId());
} catch (InterruptedException e) {
LOGGER.warn("{} interrupted when sleep", triggerName);
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.warn(
"Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
triggerName,
tDataNodeLocation.getInternalEndPoint(),
e);
// do not retry if it is not due to bad network or no executor found
return TriggerManagementService.getInstance()
.getTriggerInformation(triggerName)
.getFailureStrategy()
.equals(FailureStrategy.OPTIMISTIC)
? TriggerFireResult.FAILED_NO_TERMINATION
: TriggerFireResult.TERMINATION;
}
} else {
TriggerExecutor executor = TriggerManagementService.getInstance().getExecutor(triggerName);
if (executor == null) {
return TriggerManagementService.getInstance()
.getTriggerInformation(triggerName)
.getFailureStrategy()
.equals(FailureStrategy.PESSIMISTIC)
? TriggerFireResult.TERMINATION
: TriggerFireResult.FAILED_NO_TERMINATION;
}
try {
boolean fireResult = executor.fire(tablet, event);
if (!fireResult) {
result =
executor.getFailureStrategy().equals(FailureStrategy.PESSIMISTIC)
? TriggerFireResult.TERMINATION
: TriggerFireResult.FAILED_NO_TERMINATION;
}
} catch (TriggerExecutionException e) {
result =
executor.getFailureStrategy().equals(FailureStrategy.PESSIMISTIC)
? TriggerFireResult.TERMINATION
: TriggerFireResult.FAILED_NO_TERMINATION;
}
return result;
}
}
return result;
}
/** Return true if the config node returns a new TDataNodeLocation. */
private boolean updateLocationOfStatefulTrigger(String triggerName, int currentDataNodeId) {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TDataNodeLocation newTDataNodeLocation =
configNodeClient.getLocationOfStatefulTrigger(triggerName).getDataNodeLocation();
if (newTDataNodeLocation != null
&& currentDataNodeId != newTDataNodeLocation.getDataNodeId()) {
// indicates that the location of this stateful trigger has changed
TriggerManagementService.getInstance()
.updateLocationOfStatefulTrigger(triggerName, newTDataNodeLocation);
return true;
}
return false;
} catch (ClientManagerException | TException | IOException e) {
LOGGER.error(
"Failed to update location of stateful trigger({}) through config node. The cause is {}.",
triggerName,
e);
return false;
}
}
}