blob: d90d4636e68fb31238128dfd634f04c9e7143e2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.sort.hive.filesystem;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
import org.apache.flink.table.filesystem.stream.compact.CompactBucketWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactCoordinator;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorOutput;
import org.apache.flink.table.filesystem.stream.compact.CompactOperator;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.table.filesystem.stream.compact.CompactWriter;
import org.apache.flink.util.function.SupplierWithException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
/** Helper for creating streaming file sink. */
public class StreamingSink {
private StreamingSink() {
}
/**
* Create a file writer by input stream. This is similar to {@link StreamingFileSink}, in
* addition, it can emit {@link PartitionCommitInfo} to down stream.
*/
public static <T> DataStream<PartitionCommitInfo> writer(
DataStream<T> inputStream,
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder,
int parallelism,
String inlongMetric,
String auditHostAndPorts,
DirtyOptions dirtyOptions,
@Nullable DirtySink<Object> dirtySink) {
StreamingFileWriter<T> fileWriter =
new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder,
inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink);
return inputStream
.transform(
StreamingFileWriter.class.getSimpleName(),
TypeInformation.of(PartitionCommitInfo.class),
fileWriter)
.setParallelism(parallelism);
}
/**
* Create a file writer with compaction operators by input stream. In addition, it can emit
* {@link PartitionCommitInfo} to down stream.
*/
public static <T> DataStream<PartitionCommitInfo> compactionWriter(
DataStream<T> inputStream,
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder,
FileSystemFactory fsFactory,
Path path,
CompactReader.Factory<T> readFactory,
long targetFileSize,
int parallelism,
String inlongMetric,
String auditHostAndPorts,
DirtyOptions dirtyOptions,
@Nullable DirtySink<Object> dirtySink) {
CompactFileWriter<T> writer = new CompactFileWriter<>(
bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink);
SupplierWithException<FileSystem, IOException> fsSupplier =
(SupplierWithException<FileSystem, IOException> & Serializable) () -> fsFactory.create(path.toUri());
CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);
SingleOutputStreamOperator<CoordinatorOutput> coordinatorOp =
inputStream
.transform(
"streaming-writer",
TypeInformation.of(CoordinatorInput.class),
writer)
.setParallelism(parallelism)
.transform(
"compact-coordinator",
TypeInformation.of(CoordinatorOutput.class),
coordinator)
.setParallelism(1)
.setMaxParallelism(1);
CompactWriter.Factory<T> writerFactory =
CompactBucketWriter.factory(
(SupplierWithException<BucketWriter<T, String>, IOException> & Serializable) bucketsBuilder::createBucketWriter);
CompactOperator<T> compacter =
new CompactOperator<>(fsSupplier, readFactory, writerFactory);
return coordinatorOp
.broadcast()
.transform(
"compact-operator",
TypeInformation.of(PartitionCommitInfo.class),
compacter)
.setParallelism(parallelism);
}
/**
* Create a sink from file writer. Decide whether to add the node to commit partitions according
* to options.
*/
public static DataStreamSink<?> sink(
DataStream<PartitionCommitInfo> writer,
Path locationPath,
ObjectIdentifier identifier,
List<String> partitionKeys,
TableMetaStoreFactory msFactory,
FileSystemFactory fsFactory,
Configuration options) {
DataStream<?> stream = writer;
if (options.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
PartitionCommitter committer = new PartitionCommitter(locationPath, identifier, partitionKeys, msFactory,
fsFactory, options);
stream =
writer.transform(
PartitionCommitter.class.getSimpleName(), Types.VOID, committer)
.setParallelism(1)
.setMaxParallelism(1);
}
return stream.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}
}