blob: 9fb379169c9ba1e4c92f9adde28eb4e97ea5ea05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.sort.hive.filesystem;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.PartitionPolicy;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.SerializableConfiguration;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import javax.annotation.Nullable;
import java.io.IOException;
/** Buckets builder to create buckets that use {@link HadoopPathBasedPartFileWriter}. */
public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPathBasedBulkFormatBuilder<IN, BucketID, T>>
extends
StreamingFileSink.BucketsBuilder<IN, BucketID, T> {
private static final long serialVersionUID = 1L;
private final Path basePath;
private final HadoopPathBasedBulkWriter.Factory<IN> writerFactory;
private final HadoopFileCommitterFactory fileCommitterFactory;
private SerializableConfiguration serializableConfiguration;
private BucketAssigner<IN, BucketID> bucketAssigner;
private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
private BucketFactory<IN, BucketID> bucketFactory;
private OutputFileConfig outputFileConfig;
@Nullable
private transient SinkTableMetricData metricData;
private final DirtyOptions dirtyOptions;
private final @Nullable DirtySink<Object> dirtySink;
private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private final PartitionPolicy partitionPolicy;
private final HiveShim hiveShim;
private final String hiveVersion;
private final String inputFormat;
private final String outputFormat;
private final String serializationLib;
public HadoopPathBasedBulkFormatBuilder(
org.apache.hadoop.fs.Path basePath,
HadoopPathBasedBulkWriter.Factory<IN> writerFactory,
Configuration configuration,
BucketAssigner<IN, BucketID> assigner,
DirtyOptions dirtyOptions,
@Nullable DirtySink<Object> dirtySink,
SchemaUpdateExceptionPolicy schemaUpdatePolicy,
PartitionPolicy partitionPolicy,
HiveShim hiveShim,
String hiveVersion,
boolean sinkMultipleEnable,
String inputFormat,
String outputFormat,
String serializationLib) {
this(
basePath,
writerFactory,
new DefaultHadoopFileCommitterFactory(sinkMultipleEnable),
configuration,
assigner,
OnCheckpointRollingPolicy.build(),
new DefaultBucketFactoryImpl<>(),
OutputFileConfig.builder().build(),
dirtyOptions,
dirtySink,
schemaUpdatePolicy,
partitionPolicy,
hiveShim,
hiveVersion,
inputFormat,
outputFormat,
serializationLib);
}
public HadoopPathBasedBulkFormatBuilder(
org.apache.hadoop.fs.Path basePath,
HadoopPathBasedBulkWriter.Factory<IN> writerFactory,
HadoopFileCommitterFactory fileCommitterFactory,
Configuration configuration,
BucketAssigner<IN, BucketID> assigner,
CheckpointRollingPolicy<IN, BucketID> policy,
BucketFactory<IN, BucketID> bucketFactory,
OutputFileConfig outputFileConfig,
DirtyOptions dirtyOptions,
@Nullable DirtySink<Object> dirtySink,
SchemaUpdateExceptionPolicy schemaUpdatePolicy,
PartitionPolicy partitionPolicy,
HiveShim hiveShim,
String hiveVersion,
String inputFormat,
String outputFormat,
String serializationLib) {
this.basePath = new Path(Preconditions.checkNotNull(basePath).toString());
this.writerFactory = writerFactory;
this.fileCommitterFactory = fileCommitterFactory;
this.serializableConfiguration = new SerializableConfiguration(configuration);
this.bucketAssigner = Preconditions.checkNotNull(assigner);
this.rollingPolicy = Preconditions.checkNotNull(policy);
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
this.schemaUpdatePolicy = schemaUpdatePolicy;
this.partitionPolicy = partitionPolicy;
this.hiveShim = hiveShim;
this.hiveVersion = hiveVersion;
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.serializationLib = serializationLib;
}
public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
this.bucketAssigner = Preconditions.checkNotNull(assigner);
return self();
}
public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
return self();
}
public T withBucketFactory(BucketFactory<IN, BucketID> factory) {
this.bucketFactory = Preconditions.checkNotNull(factory);
return self();
}
public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
this.outputFileConfig = outputFileConfig;
return self();
}
public T withConfiguration(Configuration configuration) {
this.serializableConfiguration = new SerializableConfiguration(configuration);
return self();
}
@Override
public BucketWriter<IN, BucketID> createBucketWriter() {
return new HadoopPathBasedPartFileWriter.HadoopPathBasedBucketWriter<>(
serializableConfiguration.getConfiguration(),
writerFactory,
fileCommitterFactory,
metricData,
dirtyOptions,
dirtySink,
schemaUpdatePolicy,
partitionPolicy,
hiveShim,
hiveVersion,
inputFormat,
outputFormat,
serializationLib);
}
@Override
public Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
return new Buckets<>(
basePath,
bucketAssigner,
bucketFactory,
createBucketWriter(),
rollingPolicy,
subtaskIndex,
outputFileConfig);
}
public void setMetricData(@Nullable SinkTableMetricData metricData) {
this.metricData = metricData;
}
}