blob: c05bbe5005c1a9ef4b21f680682c64362913b182 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.flink.sql.function;
import org.apache.iotdb.flink.sql.client.IoTDBWebSocketClient;
import org.apache.iotdb.flink.sql.common.Options;
import org.apache.iotdb.flink.sql.common.Utils;
import org.apache.iotdb.flink.sql.exception.IllegalOptionException;
import org.apache.iotdb.flink.sql.exception.IllegalSchemaException;
import org.apache.iotdb.flink.sql.wrapper.SchemaWrapper;
import org.apache.iotdb.flink.sql.wrapper.TabletWrapper;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class IoTDBCDCSourceFunction extends RichSourceFunction<RowData> {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBCDCSourceFunction.class);
private final List<IoTDBWebSocketClient> socketClients = new ArrayList<>();
private final int cdcPort;
private final List<String> nodeUrls;
private final String taskName;
private final String pattern;
private final String user;
private final String password;
private final List<String> timeseriesList;
private final BlockingQueue<TabletWrapper> tabletWrappers;
private final List<Tuple2<String, DataType>> tableSchema;
private final String pipeName;
private final Options.CDCMode mode;
private transient ExecutorService consumeExecutor;
public IoTDBCDCSourceFunction(ReadableConfig options, SchemaWrapper schemaWrapper) {
tableSchema = schemaWrapper.getSchema();
pattern = options.get(Options.PATTERN);
cdcPort = options.get(Options.CDC_PORT);
nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
taskName = options.get(Options.CDC_TASK_NAME);
pipeName = String.format("flink_cdc_%s", taskName);
user = options.get(Options.USER);
password = options.get(Options.PASSWORD);
timeseriesList =
tableSchema.stream().map(field -> String.valueOf(field.f0)).collect(Collectors.toList());
mode = options.get(Options.CDC_MODE);
tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size() * 10);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Session session =
new Session.Builder().username(user).password(password).nodeUrls(nodeUrls).build();
session.open(false);
try (SessionDataSet dataSet =
session.executeQueryStatement(String.format("show pipe %s", pipeName))) {
if (!dataSet.hasNext()) {
String createPipeCommand;
if (Options.CDCMode.REALTIME.equals(mode)) {
createPipeCommand =
String.format(
"CREATE PIPE %s\n"
+ "WITH EXTRACTOR (\n"
+ "'extractor' = 'iotdb-extractor',\n"
+ "'extractor.history.enable' = 'false',\n"
+ "'extractor.pattern' = '%s',\n"
+ ") WITH CONNECTOR (\n"
+ "'connector' = 'websocket-connector',\n"
+ "'connector.websocket.port' = '%d',\n"
// avoid to reuse the pipe's connector
+ "'connector.websocket.id' = '%d'"
+ ")",
pipeName, pattern, cdcPort, System.currentTimeMillis());
} else {
createPipeCommand =
String.format(
"CREATE PIPE %s\n"
+ "WITH EXTRACTOR (\n"
+ "'extractor' = 'iotdb-extractor',\n"
+ "'extractor.pattern' = '%s',\n"
+ ") WITH CONNECTOR (\n"
+ "'connector' = 'websocket-connector',\n"
+ "'connector.websocket.port' = '%d',\n"
// avoid to reuse the pipe's connector
+ "'connector.websocket.id' = '%d'"
+ ")",
pipeName, pattern, cdcPort, System.currentTimeMillis());
}
session.executeNonQueryStatement(createPipeCommand);
session.executeNonQueryStatement(String.format("start pipe %s", pipeName));
} else {
RowRecord pipe = dataSet.next();
String pipePattern = pipe.getFields().get(3).getStringValue().split(",")[0].split("=")[1];
if (!pipePattern.equals(this.pattern)) {
throw new IllegalOptionException(
String.format(
"The CDC task `%s` has been created by pattern `%s`.",
this.taskName, this.pattern));
}
String status = pipe.getFields().get(2).getStringValue();
if ("STOPPED".equals(status)) {
session.executeNonQueryStatement(String.format("start pipe %s", pipeName));
}
}
}
session.close();
consumeExecutor = Executors.newFixedThreadPool(1);
for (String nodeUrl : nodeUrls) {
URI uri = new URI(String.format("ws://%s:%s", nodeUrl.split(":")[0], cdcPort));
socketClients.add(initAndGet(uri));
}
}
@Override
public void run(SourceContext<RowData> ctx) throws InterruptedException {
consumeExecutor.execute(new ConsumeRunnable(ctx));
consumeExecutor.shutdown();
while (true) {
if (consumeExecutor.isTerminated()) {
System.exit(1);
}
for (IoTDBWebSocketClient socketClient : socketClients) {
if (socketClient.getReadyState().equals(ReadyState.CLOSED)) {
while (!Utils.isURIAvailable(socketClient.getURI())) {
String log =
String.format(
"The URI %s:%d is not available now, sleep 5 seconds.",
socketClient.getURI().getHost(), socketClient.getURI().getPort());
LOGGER.warn(log);
Thread.sleep(5000);
}
socketClient.reconnect();
while (!socketClient.getReadyState().equals(ReadyState.OPEN)) {
Thread.sleep(1000);
}
socketClient.send(String.format("BIND:%s", pipeName));
} else {
Thread.sleep(1000);
}
}
}
}
@Override
public void cancel() {
socketClients.forEach(WebSocketClient::close);
}
public void addTabletWrapper(TabletWrapper tabletWrapper) {
try {
this.tabletWrappers.put(tabletWrapper);
} catch (InterruptedException e) {
String host = tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getHostName();
int port = tabletWrapper.getWebSocketClient().getRemoteSocketAddress().getPort();
String log =
String.format(
"The tablet from %s:%d can't be put into queue, because: %s",
host, port, e.getMessage());
LOGGER.warn(log);
Thread.currentThread().interrupt();
}
}
private IoTDBWebSocketClient initAndGet(URI uri) throws InterruptedException {
while (!Utils.isURIAvailable(uri)) {
String log =
String.format(
"The URI %s:%d is not available now, sleep 5 seconds.", uri.getHost(), uri.getPort());
LOGGER.warn(log);
Thread.sleep(5000);
}
IoTDBWebSocketClient client = new IoTDBWebSocketClient(uri, this);
client.connect();
while (!client.getReadyState().equals(ReadyState.OPEN)) {
Thread.sleep(1000);
}
client.send(String.format("BIND:%s", pipeName));
while (IoTDBWebSocketClient.Status.WAITING == client.getStatus()) {
Thread.sleep(1000);
}
if (IoTDBWebSocketClient.Status.ERROR == client.getStatus()) {
throw new IllegalOptionException(
"An exception occurred during binding. The CDC task is running. Please stop it first.");
}
return client;
}
public void collectTablet(Tablet tablet, SourceContext<RowData> ctx) {
List<MeasurementSchema> schemas = tablet.getSchemas();
int rowSize = tablet.rowSize;
HashMap<String, Pair<BitMap, List<Object>>> values = new HashMap<>();
for (MeasurementSchema schema : schemas) {
String timeseries = String.format("%s.%s", tablet.deviceId, schema.getMeasurementId());
TSDataType iotdbType = schema.getType();
int index = timeseriesList.indexOf(timeseries);
if (index == -1) {
return;
}
DataType flinkType = tableSchema.get(index).f1;
if (!Utils.isTypeEqual(iotdbType, flinkType)) {
throw new IllegalSchemaException(
String.format(
"The data type of column `%s` is different in IoTDB and Flink", timeseries));
}
values.put(
timeseries,
new Pair<>(
tablet.bitMaps[schemas.indexOf(schema)],
Utils.object2List(tablet.values[schemas.indexOf(schema)], iotdbType)));
}
for (int i = 0; i < rowSize; i++) {
ArrayList<Object> row = new ArrayList<>();
row.add(tablet.timestamps[i]);
for (String timeseries : timeseriesList) {
if (values.containsKey(timeseries)
&& (values.get(timeseries).getLeft() == null
|| !values.get(timeseries).getLeft().isMarked(i))) {
row.add(values.get(timeseries).getRight().get(i));
} else {
row.add(null);
}
}
RowData rowData = GenericRowData.of(row.toArray());
ctx.collect(rowData);
}
}
private class ConsumeRunnable implements Runnable {
SourceContext<RowData> context;
public ConsumeRunnable(SourceContext<RowData> context) {
this.context = context;
}
@Override
public void run() {
while (true) {
try {
TabletWrapper tabletWrapper = tabletWrappers.take();
collectTablet(tabletWrapper.getTablet(), context);
tabletWrapper
.getWebSocketClient()
.send(String.format("ACK:%d", tabletWrapper.getCommitId()));
} catch (InterruptedException e) {
LOGGER.warn("The tablet can't be taken from queue!");
Thread.currentThread().interrupt();
}
}
}
}
}