blob: 067669a339cb0748de4975b8a2f7fa64f87082b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.state.pipe.plugin.CreatePipePluginState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* This class extends {@link AbstractNodeProcedure} to make sure that when a {@link
* CreatePipePluginProcedure} is executed, the {@link AddConfigNodeProcedure}, {@link
* RemoveConfigNodeProcedure} or {@link RemoveDataNodeProcedure} will not be executed at the same
* time.
*/
public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipePluginState> {
private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipePluginProcedure.class);
private static final int RETRY_THRESHOLD = 5;
private PipePluginMeta pipePluginMeta;
private byte[] jarFile;
public CreatePipePluginProcedure() {
super();
}
public CreatePipePluginProcedure(PipePluginMeta pipePluginMeta, byte[] jarFile) {
super();
this.pipePluginMeta = pipePluginMeta;
this.jarFile = jarFile;
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, CreatePipePluginState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
if (pipePluginMeta == null) {
return Flow.NO_MORE_STATE;
}
try {
switch (state) {
case LOCK:
return executeFromLock(env);
case CREATE_ON_CONFIG_NODES:
return executeFromCreateOnConfigNodes(env);
case CREATE_ON_DATA_NODES:
return executeFromCreateOnDataNodes(env);
case UNLOCK:
return executeFromUnlock(env);
default:
throw new UnsupportedOperationException(
String.format("Unknown state during executing createPipePluginProcedure, %s", state));
}
} catch (Exception e) {
if (isRollbackSupported(state)) {
LOGGER.error("CreatePipePluginProcedure failed in state {}, will rollback", state, e);
setFailure(new ProcedureException(e.getMessage()));
} else {
LOGGER.error(
"Retrievable error trying to create pipe plugin [{}], state: {}",
pipePluginMeta.getPluginName(),
state,
e);
if (getCycles() > RETRY_THRESHOLD) {
LOGGER.error(
"Fail to create pipe plugin [{}] after {} retries",
pipePluginMeta.getPluginName(),
getCycles());
setFailure(new ProcedureException(e.getMessage()));
}
}
}
return Flow.HAS_MORE_STATE;
}
private Flow executeFromLock(ConfigNodeProcedureEnv env) {
LOGGER.info("CreatePipePluginProcedure: executeFromLock({})", pipePluginMeta.getPluginName());
final PipePluginCoordinator pipePluginCoordinator =
env.getConfigManager().getPipeManager().getPipePluginCoordinator();
pipePluginCoordinator.lock();
try {
pipePluginCoordinator
.getPipePluginInfo()
.validateBeforeCreatingPipePlugin(
pipePluginMeta.getPluginName(),
pipePluginMeta.getJarName(),
pipePluginMeta.getJarMD5());
} catch (PipeException e) {
// The pipe plugin has already created, we should end the procedure
LOGGER.warn(
"Pipe plugin {} is already created, end the CreatePipePluginProcedure({})",
pipePluginMeta.getPluginName(),
pipePluginMeta.getPluginName());
setFailure(new ProcedureException(e.getMessage()));
pipePluginCoordinator.unlock();
return Flow.NO_MORE_STATE;
}
setNextState(CreatePipePluginState.CREATE_ON_CONFIG_NODES);
return Flow.HAS_MORE_STATE;
}
private Flow executeFromCreateOnConfigNodes(ConfigNodeProcedureEnv env) {
LOGGER.info(
"CreatePipePluginProcedure: executeFromCreateOnConfigNodes({})",
pipePluginMeta.getPluginName());
final ConfigManager configNodeManager = env.getConfigManager();
final boolean needToSaveJar =
configNodeManager
.getPipeManager()
.getPipePluginCoordinator()
.getPipePluginInfo()
.isJarNeededToBeSavedWhenCreatingPipePlugin(pipePluginMeta.getJarName());
final CreatePipePluginPlan createPluginPlan =
new CreatePipePluginPlan(pipePluginMeta, needToSaveJar ? new Binary(jarFile) : null);
TSStatus response;
try {
response = configNodeManager.getConsensusManager().write(createPluginPlan);
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e);
response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(response.getMessage());
}
setNextState(CreatePipePluginState.CREATE_ON_DATA_NODES);
return Flow.HAS_MORE_STATE;
}
private Flow executeFromCreateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
LOGGER.info(
"CreatePipePluginProcedure: executeFromCreateOnDataNodes({})",
pipePluginMeta.getPluginName());
if (RpcUtils.squashResponseStatusList(env.createPipePluginOnDataNodes(pipePluginMeta, jarFile))
.getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
setNextState(CreatePipePluginState.UNLOCK);
return Flow.HAS_MORE_STATE;
}
throw new PipeException(
String.format(
"Failed to create pipe plugin instance [%s] on data nodes",
pipePluginMeta.getPluginName()));
}
private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
LOGGER.info("CreatePipePluginProcedure: executeFromUnlock({})", pipePluginMeta.getPluginName());
env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
return Flow.NO_MORE_STATE;
}
@Override
protected void rollbackState(ConfigNodeProcedureEnv env, CreatePipePluginState state)
throws IOException, InterruptedException, ProcedureException {
switch (state) {
case LOCK:
rollbackFromLock(env);
break;
case CREATE_ON_CONFIG_NODES:
rollbackFromCreateOnConfigNodes(env);
break;
case CREATE_ON_DATA_NODES:
rollbackFromCreateOnDataNodes(env);
break;
}
}
private void rollbackFromLock(ConfigNodeProcedureEnv env) {
LOGGER.info("CreatePipePluginProcedure: rollbackFromLock({})", pipePluginMeta.getPluginName());
env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
}
private void rollbackFromCreateOnConfigNodes(ConfigNodeProcedureEnv env) {
LOGGER.info(
"CreatePipePluginProcedure: rollbackFromCreateOnConfigNodes({})",
pipePluginMeta.getPluginName());
try {
env.getConfigManager()
.getConsensusManager()
.write(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e);
}
}
private void rollbackFromCreateOnDataNodes(ConfigNodeProcedureEnv env) throws ProcedureException {
LOGGER.info(
"CreatePipePluginProcedure: rollbackFromCreateOnDataNodes({})",
pipePluginMeta.getPluginName());
if (RpcUtils.squashResponseStatusList(
env.dropPipePluginOnDataNodes(pipePluginMeta.getPluginName(), false))
.getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new ProcedureException(
String.format(
"Failed to rollback pipe plugin [%s] on data nodes", pipePluginMeta.getPluginName()));
}
}
@Override
protected boolean isRollbackSupported(CreatePipePluginState state) {
switch (state) {
case LOCK:
case CREATE_ON_CONFIG_NODES:
case CREATE_ON_DATA_NODES:
return true;
default:
return false;
}
}
@Override
protected CreatePipePluginState getState(int stateId) {
return CreatePipePluginState.values()[stateId];
}
@Override
protected int getStateId(CreatePipePluginState createPipePluginState) {
return createPipePluginState.ordinal();
}
@Override
protected CreatePipePluginState getInitialState() {
return CreatePipePluginState.LOCK;
}
@Override
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.CREATE_PIPE_PLUGIN_PROCEDURE.getTypeCode());
super.serialize(stream);
pipePluginMeta.serialize(stream);
ReadWriteIOUtils.write(ByteBuffer.wrap(jarFile), stream);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
pipePluginMeta = PipePluginMeta.deserialize(byteBuffer);
jarFile = ReadWriteIOUtils.readBinary(byteBuffer).getValues();
}
@Override
public boolean equals(Object that) {
if (that instanceof CreatePipePluginProcedure) {
CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) that;
return thatProcedure.getProcId() == getProcId()
&& thatProcedure.getCurrentState().equals(getCurrentState())
&& thatProcedure.getCycles() == getCycles()
&& thatProcedure.pipePluginMeta.equals(pipePluginMeta);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(getProcId(), getCurrentState(), getCycles(), pipePluginMeta);
}
@TestOnly
public byte[] getJarFile() {
return jarFile;
}
}