blob: 0555774b65636f4b04a69cd5a493095b661f698a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.file.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.StreamingSink;
import org.apache.flink.connector.file.table.stream.compact.CompactBulkReader;
import org.apache.flink.connector.file.table.stream.compact.CompactReader;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.AUTO_COMPACTION;
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.COMPACTION_FILE_SIZE;
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL;
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_ROLLING_POLICY_INACTIVITY_INTERVAL;
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL;
import static org.apache.flink.connector.file.table.stream.compact.CompactOperator.convertToUncompacted;
/** File system {@link DynamicTableSink}. */
@Internal
public class FileSystemTableSink extends AbstractFileSystemTable
implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
// For compaction reading
@Nullable private final DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat;
@Nullable private final DecodingFormat<DeserializationSchema<RowData>> deserializationFormat;
// For Writing
@Nullable private final EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat;
@Nullable private final EncodingFormat<SerializationSchema<RowData>> serializationFormat;
private boolean overwrite = false;
private boolean dynamicGrouping = false;
private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>();
@Nullable private Integer configuredParallelism;
FileSystemTableSink(
ObjectIdentifier tableIdentifier,
DataType physicalRowDataType,
List<String> partitionKeys,
ReadableConfig tableOptions,
@Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat,
@Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat,
@Nullable EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat,
@Nullable EncodingFormat<SerializationSchema<RowData>> serializationFormat) {
super(tableIdentifier, physicalRowDataType, partitionKeys, tableOptions);
this.bulkReaderFormat = bulkReaderFormat;
this.deserializationFormat = deserializationFormat;
if (Stream.of(bulkWriterFormat, serializationFormat).allMatch(Objects::isNull)) {
String identifier = tableOptions.get(FactoryUtil.FORMAT);
throw new ValidationException(
String.format(
"Could not find any format factory for identifier '%s' in the classpath.",
identifier));
}
this.bulkWriterFormat = bulkWriterFormat;
this.serializationFormat = serializationFormat;
this.configuredParallelism =
this.tableOptions.get(FileSystemConnectorOptions.SINK_PARALLELISM);
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return consume(providerContext, dataStream, sinkContext);
}
};
}
private DataStreamSink<?> consume(
ProviderContext providerContext, DataStream<RowData> dataStream, Context sinkContext) {
final int inputParallelism = dataStream.getParallelism();
final int parallelism = Optional.ofNullable(configuredParallelism).orElse(inputParallelism);
boolean parallelismConfigued = configuredParallelism != null;
if (sinkContext.isBounded()) {
return createBatchSink(dataStream, sinkContext, parallelism, parallelismConfigued);
} else {
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
return createStreamingSink(
providerContext, dataStream, sinkContext, parallelism, parallelismConfigued);
}
}
private RowDataPartitionComputer partitionComputer() {
return new RowDataPartitionComputer(
defaultPartName,
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
partitionKeys.toArray(new String[0]));
}
private DataStreamSink<RowData> createBatchSink(
DataStream<RowData> inputStream,
Context sinkContext,
final int parallelism,
boolean parallelismConfigured) {
FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(partitionComputer())
.setDynamicGrouped(dynamicGrouping)
.setPartitionColumns(partitionKeys.toArray(new String[0]))
.setFormatFactory(createOutputFormatFactory(sinkContext))
.setMetaStoreFactory(new EmptyMetaStoreFactory(path))
.setOverwrite(overwrite)
.setStaticPartitions(staticPartitions)
.setTempPath(toStagingPath())
.setOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID())
.build())
.setPartitionCommitPolicyFactory(
new PartitionCommitPolicyFactory(
tableOptions.get(
FileSystemConnectorOptions
.SINK_PARTITION_COMMIT_POLICY_KIND),
tableOptions.get(
FileSystemConnectorOptions
.SINK_PARTITION_COMMIT_POLICY_CLASS),
tableOptions.get(
FileSystemConnectorOptions
.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
tableOptions.get(
FileSystemConnectorOptions
.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS)));
DataStreamSink<RowData> sink = inputStream.writeUsingOutputFormat(builder.build());
sink.getTransformation().setParallelism(parallelism, parallelismConfigured);
return sink.name("Filesystem");
}
private DataStreamSink<?> createStreamingSink(
ProviderContext providerContext,
DataStream<RowData> dataStream,
Context sinkContext,
final int parallelism,
boolean parallelismConfigured) {
FileSystemFactory fsFactory = FileSystem::get;
RowDataPartitionComputer computer = partitionComputer();
boolean autoCompaction = tableOptions.getBoolean(AUTO_COMPACTION);
Object writer = createWriter(sinkContext);
boolean isEncoder = writer instanceof Encoder;
TableBucketAssigner assigner = new TableBucketAssigner(computer);
TableRollingPolicy rollingPolicy =
new TableRollingPolicy(
!isEncoder || autoCompaction,
tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis(),
tableOptions.get(SINK_ROLLING_POLICY_INACTIVITY_INTERVAL).toMillis());
String randomPrefix = "part-" + UUID.randomUUID().toString();
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = OutputFileConfig.builder();
fileNamingBuilder =
autoCompaction
? fileNamingBuilder.withPartPrefix(convertToUncompacted(randomPrefix))
: fileNamingBuilder.withPartPrefix(randomPrefix);
OutputFileConfig fileNamingConfig = fileNamingBuilder.build();
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
if (isEncoder) {
//noinspection unchecked
bucketsBuilder =
StreamingFileSink.forRowFormat(
path,
new ProjectionEncoder((Encoder<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(fileNamingConfig)
.withRollingPolicy(rollingPolicy);
} else {
//noinspection unchecked
bucketsBuilder =
StreamingFileSink.forBulkFormat(
path,
new ProjectionBulkFactory(
(BulkWriter.Factory<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(fileNamingConfig)
.withRollingPolicy(rollingPolicy);
}
long bucketCheckInterval = tableOptions.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
DataStream<PartitionCommitInfo> writerStream;
if (autoCompaction) {
long compactionSize =
tableOptions
.getOptional(COMPACTION_FILE_SIZE)
.orElse(tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE))
.getBytes();
CompactReader.Factory<RowData> reader =
createCompactReaderFactory(sinkContext)
.orElseThrow(
() ->
new TableException(
"Please implement available reader for compaction:"
+ " BulkFormat, FileInputFormat."));
writerStream =
StreamingSink.compactionWriter(
providerContext,
dataStream,
bucketCheckInterval,
bucketsBuilder,
fsFactory,
path,
reader,
compactionSize,
parallelism,
parallelismConfigured);
} else {
writerStream =
StreamingSink.writer(
providerContext,
dataStream,
bucketCheckInterval,
bucketsBuilder,
parallelism,
partitionKeys,
tableOptions,
parallelismConfigured);
}
return StreamingSink.sink(
providerContext,
writerStream,
path,
tableIdentifier,
partitionKeys,
new EmptyMetaStoreFactory(path),
fsFactory,
tableOptions);
}
private Optional<CompactReader.Factory<RowData>> createCompactReaderFactory(Context context) {
// Compute producedDataType (including partition fields) and physicalDataType (excluding
// partition fields)
final DataType producedDataType = physicalRowDataType;
final DataType physicalDataType =
DataType.getFields(producedDataType).stream()
.filter(field -> !partitionKeys.contains(field.getName()))
.collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
if (bulkReaderFormat != null) {
final BulkFormat<RowData, FileSourceSplit> format =
new FileInfoExtractorBulkFormat(
bulkReaderFormat.createRuntimeDecoder(
createSourceContext(context), physicalDataType),
producedDataType,
context.createTypeInformation(producedDataType),
Collections.emptyMap(),
partitionKeys,
defaultPartName);
return Optional.of(CompactBulkReader.factory(format));
} else if (deserializationFormat != null) {
final DeserializationSchema<RowData> decoder =
deserializationFormat.createRuntimeDecoder(
createSourceContext(context), physicalDataType);
final BulkFormat<RowData, FileSourceSplit> format =
new FileInfoExtractorBulkFormat(
new DeserializationSchemaAdapter(decoder),
producedDataType,
context.createTypeInformation(producedDataType),
Collections.emptyMap(),
partitionKeys,
defaultPartName);
return Optional.of(CompactBulkReader.factory(format));
}
return Optional.empty();
}
private DynamicTableSource.Context createSourceContext(Context context) {
return new DynamicTableSource.Context() {
@Override
public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
return context.createTypeInformation(producedDataType);
}
@Override
public <T> TypeInformation<T> createTypeInformation(LogicalType producedLogicalType) {
return context.createTypeInformation(producedLogicalType);
}
@Override
public DynamicTableSource.DataStructureConverter createDataStructureConverter(
DataType producedDataType) {
// This method cannot be implemented without changing the
// DynamicTableSink.DataStructureConverter interface
throw new UnsupportedOperationException(
"Compaction reader not support DataStructure converter.");
}
};
}
private Path toStagingPath() {
Path stagingDir = new Path(path, ".staging_" + System.currentTimeMillis());
try {
FileSystem fs = stagingDir.getFileSystem();
Preconditions.checkState(
fs.exists(stagingDir) || fs.mkdirs(stagingDir),
"Failed to create staging dir " + stagingDir);
return stagingDir;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
private OutputFormatFactory<RowData> createOutputFormatFactory(Context sinkContext) {
Object writer = createWriter(sinkContext);
return writer instanceof Encoder
? path -> createEncoderOutputFormat((Encoder<RowData>) writer, path)
: path -> createBulkWriterOutputFormat((BulkWriter.Factory<RowData>) writer, path);
}
private Object createWriter(Context sinkContext) {
DataType physicalDataTypeWithoutPartitionColumns =
DataType.getFields(physicalRowDataType).stream()
.filter(field -> !partitionKeys.contains(field.getName()))
.collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
if (bulkWriterFormat != null) {
return bulkWriterFormat.createRuntimeEncoder(
sinkContext, physicalDataTypeWithoutPartitionColumns);
} else if (serializationFormat != null) {
return new SerializationSchemaAdapter(
serializationFormat.createRuntimeEncoder(
sinkContext, physicalDataTypeWithoutPartitionColumns));
} else {
throw new TableException("Can not find format factory.");
}
}
private void checkConfiguredParallelismAllowed(ChangelogMode requestChangelogMode) {
final Integer parallelism = this.configuredParallelism;
if (parallelism == null) {
return;
}
if (!requestChangelogMode.containsOnly(RowKind.INSERT)) {
throw new ValidationException(
String.format(
"Currently, filesystem sink doesn't support setting parallelism (%d) by '%s' "
+ "when the input stream is not INSERT only. The row kinds of input stream are [%s]",
parallelism,
FileSystemConnectorOptions.SINK_PARALLELISM.key(),
requestChangelogMode.getContainedKinds().stream()
.map(RowKind::shortString)
.collect(Collectors.joining(","))));
}
}
private static OutputFormat<RowData> createBulkWriterOutputFormat(
BulkWriter.Factory<RowData> factory, Path path) {
return new OutputFormat<RowData>() {
private static final long serialVersionUID = 1L;
private transient BulkWriter<RowData> writer;
private transient FSDataOutputStream stream;
@Override
public void configure(Configuration parameters) {}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.stream = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
this.writer = factory.create(stream);
}
@Override
public void writeRecord(RowData record) throws IOException {
writer.addElement(record);
}
@Override
public void close() throws IOException {
writer.flush();
writer.finish();
stream.close();
}
};
}
private static OutputFormat<RowData> createEncoderOutputFormat(
Encoder<RowData> encoder, Path path) {
return new OutputFormat<RowData>() {
private static final long serialVersionUID = 1L;
private transient FSDataOutputStream output;
@Override
public void configure(Configuration parameters) {}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.output = path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE);
}
@Override
public void writeRecord(RowData record) throws IOException {
encoder.encode(record, output);
}
@Override
public void close() throws IOException {
this.output.flush();
this.output.close();
}
};
}
private LinkedHashMap<String, String> toPartialLinkedPartSpec(Map<String, String> part) {
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
for (String partitionKey : partitionKeys) {
if (part.containsKey(partitionKey)) {
partSpec.put(partitionKey, part.get(partitionKey));
}
}
return partSpec;
}
@Override
public boolean requiresPartitionGrouping(boolean supportsGrouping) {
this.dynamicGrouping = supportsGrouping;
return dynamicGrouping;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
checkConfiguredParallelismAllowed(requestedMode);
if (bulkWriterFormat != null) {
return bulkWriterFormat.getChangelogMode();
} else if (serializationFormat != null) {
return serializationFormat.getChangelogMode();
} else {
throw new TableException("Can not find format factory.");
}
}
@Override
public DynamicTableSink copy() {
FileSystemTableSink sink =
new FileSystemTableSink(
tableIdentifier,
physicalRowDataType,
partitionKeys,
tableOptions,
bulkReaderFormat,
deserializationFormat,
bulkWriterFormat,
serializationFormat);
sink.overwrite = overwrite;
sink.dynamicGrouping = dynamicGrouping;
sink.staticPartitions = staticPartitions;
return sink;
}
@Override
public String asSummaryString() {
return "Filesystem";
}
@Override
public void applyOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
@Override
public void applyStaticPartition(Map<String, String> partition) {
this.staticPartitions = toPartialLinkedPartSpec(partition);
}
/** Table bucket assigner, wrap {@link PartitionComputer}. */
public static class TableBucketAssigner implements BucketAssigner<RowData, String> {
private final PartitionComputer<RowData> computer;
public TableBucketAssigner(PartitionComputer<RowData> computer) {
this.computer = computer;
}
@Override
public String getBucketId(RowData element, Context context) {
try {
return PartitionPathUtils.generatePartitionPath(
computer.generatePartValues(element));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
/** Table {@link RollingPolicy}, it extends {@link CheckpointRollingPolicy} for bulk writers. */
public static class TableRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
private final boolean rollOnCheckpoint;
private final long rollingFileSize;
private final long rollingTimeInterval;
private final long inactivityInterval;
public TableRollingPolicy(
boolean rollOnCheckpoint,
long rollingFileSize,
long rollingTimeInterval,
long inactivityInterval) {
this.rollOnCheckpoint = rollOnCheckpoint;
Preconditions.checkArgument(rollingFileSize > 0L);
Preconditions.checkArgument(rollingTimeInterval > 0L);
Preconditions.checkArgument(inactivityInterval > 0L);
this.rollingFileSize = rollingFileSize;
this.rollingTimeInterval = rollingTimeInterval;
this.inactivityInterval = inactivityInterval;
}
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
try {
return rollOnCheckpoint || partFileState.getSize() > rollingFileSize;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element)
throws IOException {
return partFileState.getSize() > rollingFileSize;
}
@Override
public boolean shouldRollOnProcessingTime(
PartFileInfo<String> partFileState, long currentTime) {
return currentTime - partFileState.getCreationTime() >= rollingTimeInterval
|| currentTime - partFileState.getLastUpdateTime() >= inactivityInterval;
}
}
private static class ProjectionEncoder implements Encoder<RowData> {
private final Encoder<RowData> encoder;
private final RowDataPartitionComputer computer;
private ProjectionEncoder(Encoder<RowData> encoder, RowDataPartitionComputer computer) {
this.encoder = encoder;
this.computer = computer;
}
@Override
public void encode(RowData element, OutputStream stream) throws IOException {
encoder.encode(computer.projectColumnsToWrite(element), stream);
}
}
/** Project row to non-partition fields. */
public static class ProjectionBulkFactory implements BulkWriter.Factory<RowData> {
private final BulkWriter.Factory<RowData> factory;
private final RowDataPartitionComputer computer;
public ProjectionBulkFactory(
BulkWriter.Factory<RowData> factory, RowDataPartitionComputer computer) {
this.factory = factory;
this.computer = computer;
}
@Override
public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException {
BulkWriter<RowData> writer = factory.create(out);
return new BulkWriter<RowData>() {
@Override
public void addElement(RowData element) throws IOException {
writer.addElement(computer.projectColumnsToWrite(element));
}
@Override
public void flush() throws IOException {
writer.flush();
}
@Override
public void finish() throws IOException {
writer.finish();
}
};
}
}
}