blob: 6627bf2948a6b3fdf83c3a9bb7ac4dbb5ddd7fb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.subscription.task.subtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.SUBSCRIPTION_SINK;
public class SubscriptionConnectorSubtaskManager {
private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
"Failed to deregister PipeConnectorSubtask. No such subtask: ";
// one SubscriptionConnectorSubtask for one subscription (one consumer group with one topic)
private final Map<String, PipeConnectorSubtaskLifeCycle>
attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
public synchronized String register(
PipeConnectorSubtaskExecutor executor,
PipeParameters pipeConnectorParameters,
PipeTaskConnectorRuntimeEnvironment environment) {
final String connectorKey =
pipeConnectorParameters
.getStringOrDefault(
Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, PipeConnectorConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
// Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
// for matching in `CONNECTOR_CONSTRUCTORS`
.toLowerCase();
if (!SUBSCRIPTION_SINK.getPipePluginName().equals(connectorKey)) {
throw new SubscriptionException(
"The SubscriptionConnectorSubtaskManager only supports subscription-sink.");
}
PipeEventCommitManager.getInstance()
.register(
environment.getPipeName(),
environment.getCreationTime(),
environment.getRegionId(),
connectorKey);
String attributeSortedString = new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
attributeSortedString = "__subscription_" + attributeSortedString;
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
final BoundedBlockingPendingQueue<Event> pendingQueue =
new BoundedBlockingPendingQueue<>(
PipeConfig.getInstance().getPipeConnectorPendingQueueSize(),
new PipeDataRegionEventCounter());
final PipeConnector pipeConnector =
PipeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters);
// 1. Construct, validate and customize PipeConnector, and then handshake (create connection)
// with the target
try {
pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
pipeConnector.customize(
pipeConnectorParameters, new PipeTaskRuntimeConfiguration(environment));
pipeConnector.handshake();
} catch (Exception e) {
throw new PipeException(
"Failed to construct PipeConnector, because of " + e.getMessage(), e);
}
// 2. Fetch topic and consumer group id from connector parameters
final String topicName =
pipeConnectorParameters.getString(PipeConnectorConstant.SINK_TOPIC_KEY);
final String consumerGroupId =
pipeConnectorParameters.getString(PipeConnectorConstant.SINK_CONSUMER_GROUP_KEY);
if (Objects.isNull(topicName) || Objects.isNull(consumerGroupId)) {
throw new SubscriptionException(
String.format(
"Failed to construct subscription connector, because of %s or %s does not exist in pipe connector parameters",
PipeConnectorConstant.SINK_TOPIC_KEY,
PipeConnectorConstant.SINK_CONSUMER_GROUP_KEY));
}
// 3. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
final PipeConnectorSubtask subtask =
new SubscriptionConnectorSubtask(
String.format("%s_%s", attributeSortedString, environment.getCreationTime()),
environment.getCreationTime(),
attributeSortedString,
0,
pendingQueue,
pipeConnector,
topicName,
consumerGroupId);
final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
new SubscriptionConnectorSubtaskLifeCycle(executor, subtask, pendingQueue);
attributeSortedString2SubtaskLifeCycleMap.put(
attributeSortedString, pipeConnectorSubtaskLifeCycle);
}
PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
lifeCycle.register();
return attributeSortedString;
}
public synchronized void deregister(
String pipeName, long creationTime, int dataRegionId, String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
}
PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
if (lifeCycle.deregister(pipeName)) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
PipeEventCommitManager.getInstance().deregister(pipeName, creationTime, dataRegionId);
}
public synchronized void start(String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
}
PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
lifeCycle.start();
}
public synchronized void stop(String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString);
}
PipeConnectorSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
lifeCycle.stop();
}
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(
"Failed to get PendingQueue. No such subtask: " + attributeSortedString);
}
return attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).getPendingQueue();
}
///////////////////////// Singleton Instance Holder /////////////////////////
private SubscriptionConnectorSubtaskManager() {
// Do nothing
}
private static class SubscriptionConnectorSubtaskManagerHolder {
private static final SubscriptionConnectorSubtaskManager INSTANCE =
new SubscriptionConnectorSubtaskManager();
}
public static SubscriptionConnectorSubtaskManager instance() {
return SubscriptionConnectorSubtaskManager.SubscriptionConnectorSubtaskManagerHolder.INSTANCE;
}
}