blob: 22a4859aa5d4d1ee2c9684108da17c4eabd3bc79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.translation.spark.source;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
import org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroBatchReadSupport, DataSourceRegister {
private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelSourceSupport.class);
public static final String SEA_TUNNEL_SOURCE_NAME = "SeaTunnelSource";
public static final Integer CHECKPOINT_INTERVAL_DEFAULT = 10000;
@Override
public String shortName() {
return SEA_TUNNEL_SOURCE_NAME;
}
@Override
public DataSourceReader createReader(StructType rowType, DataSourceOptions options) {
return createReader(options);
}
@Override
public DataSourceReader createReader(DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
int parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
return new BatchSourceReader(seaTunnelSource, parallelism);
}
@Override
public MicroBatchReader createMicroBatchReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
Integer checkpointInterval = options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state");
Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
String hdfsRoot = options.get(Constants.HDFS_ROOT).orElse(FileSystem.getDefaultUri(configuration).toString());
String hdfsUser = options.get(Constants.HDFS_USER).orElse("");
Integer checkpointId = options.getInt(Constants.CHECKPOINT_ID, 1);
return new MicroBatchSourceReader(seaTunnelSource, parallelism, checkpointId, checkpointInterval, checkpointPath, hdfsRoot, hdfsUser);
}
private SeaTunnelSource<SeaTunnelRow, ?, ?> getSeaTunnelSource(DataSourceOptions options) {
return SerializationUtils.stringToObject(options.get(Constants.SOURCE_SERIALIZATION)
.orElseThrow(() -> new UnsupportedOperationException("Serialization information for the SeaTunnelSource is required")));
}
}