blob: b9792d4b1b54945d6da78f070b13f1c7e1c523e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.sort.hive;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.sink.PartitionPolicy;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.hive.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.inlong.sort.hive.filesystem.StreamingSink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.hive.shaded.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.orc.TypeDescription;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import static org.apache.flink.table.catalog.hive.util.HiveTableUtil.checkAcidTable;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL;
import static org.apache.flink.table.filesystem.stream.compact.CompactOperator.convertToUncompacted;
import static org.apache.inlong.sort.hive.HiveOptions.HIVE_IGNORE_ALL_CHANGELOG;
/**
* Table sink to write to Hive tables.
*/
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);
private final ReadableConfig flinkConf;
private final JobConf jobConf;
private final CatalogTable catalogTable;
private final ObjectIdentifier identifier;
private final TableSchema tableSchema;
private final String hiveVersion;
private final HiveShim hiveShim;
@Nullable
private final Integer configuredParallelism;
private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();
private boolean overwrite = false;
private boolean dynamicGrouping = false;
private final String inlongMetric;
private final String auditHostAndPorts;
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private final PartitionPolicy partitionPolicy;
private final String partitionField;
private final String timePattern;
private final boolean sinkMultipleEnable;
private final String inputFormat;
private final String outputFormat;
private final String serializationLib;
public HiveTableSink(
ReadableConfig flinkConf,
JobConf jobConf,
ObjectIdentifier identifier,
CatalogTable table,
@Nullable Integer configuredParallelism,
final String inlongMetric,
final String auditHostAndPorts,
DirtyOptions dirtyOptions,
@Nullable DirtySink<Object> dirtySink,
SchemaUpdateExceptionPolicy schemaUpdatePolicy,
PartitionPolicy partitionPolicy,
String partitionField,
String timePattern,
boolean sinkMultipleEnable,
String inputFormat,
String outputFormat,
String serializationLib) {
this.flinkConf = flinkConf;
this.jobConf = jobConf;
this.identifier = identifier;
this.catalogTable = table;
hiveVersion =
Preconditions.checkNotNull(
jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
"Hive version is not defined");
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
this.configuredParallelism = configuredParallelism;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
this.schemaUpdatePolicy = schemaUpdatePolicy;
this.partitionPolicy = partitionPolicy;
this.partitionField = partitionField;
this.timePattern = timePattern;
this.sinkMultipleEnable = sinkMultipleEnable;
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.serializationLib = serializationLib;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStructureConverter converter =
context.createDataStructureConverter(tableSchema.toRowDataType());
return (DataStreamSinkProvider) dataStream -> consume(dataStream, context.isBounded(), converter);
}
private DataStreamSink<?> consume(
DataStream<RowData> dataStream, boolean isBounded, DataStructureConverter converter) {
checkAcidTable(catalogTable, identifier.toObjectPath());
// Get dbName from with properties first.
String dbName = jobConf.get(HiveValidator.CONNECTOR_HIVE_DATABASE);
if (dbName == null) {
dbName = identifier.getDatabaseName();
}
try (HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
StorageDescriptor sd;
Properties tableProps = new Properties();
Class hiveOutputFormatClz;
boolean isCompressed =
jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
if (sinkMultipleEnable) {
sd = new StorageDescriptor();
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(this.serializationLib);
sd.setSerdeInfo(serDeInfo);
String defaultFs = jobConf.get("fs.defaultFS", "");
sd.setLocation(defaultFs + "/tmp");
hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(HiveIgnoreKeyTextOutputFormat.class);
} else {
Table table = client.getTable(dbName, identifier.getObjectName());
sd = table.getSd();
tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
hiveOutputFormatClz = hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
}
HiveWriterFactory writerFactory = new HiveWriterFactory(
jobConf,
hiveOutputFormatClz,
sd,
tableSchema,
getPartitionKeyArray(),
tableProps,
hiveShim,
isCompressed,
sinkMultipleEnable);
String extension =
Utilities.getFileExtension(
jobConf,
isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID().toString())
.withPartSuffix(extension == null ? "" : extension);
final int parallelism =
Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
if (isBounded) {
OutputFileConfig fileNaming = fileNamingBuilder.build();
return createBatchSink(dataStream, converter, sd, writerFactory, fileNaming, parallelism);
} else {
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
return createStreamSink(
dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
}
} catch (TException e) {
throw new CatalogException("Failed to query Hive metaStore", e);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to create staging dir", e);
} catch (ClassNotFoundException e) {
throw new FlinkHiveException("Failed to get output format class", e);
} catch (IllegalAccessException | InstantiationException e) {
throw new FlinkHiveException("Failed to instantiate output format instance", e);
}
}
private DataStreamSink<Row> createBatchSink(
DataStream<RowData> dataStream,
DataStructureConverter converter,
StorageDescriptor sd,
HiveWriterFactory recordWriterFactory,
OutputFileConfig fileNaming,
final int parallelism)
throws IOException {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(
new HiveRowPartitionComputer(
hiveShim,
JobConfUtils.getDefaultPartitionName(jobConf),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
getPartitionKeyArray()));
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(getPartitionKeyArray());
builder.setFileSystemFactory(fsFactory());
builder.setFormatFactory(new HiveOutputFormatFactory(recordWriterFactory));
builder.setMetaStoreFactory(msFactory());
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitionSpec);
builder.setTempPath(
new org.apache.flink.core.fs.Path(toStagingDir(sd.getLocation(), jobConf)));
builder.setOutputFileConfig(fileNaming);
return dataStream
.map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value))
.writeUsingOutputFormat(builder.build())
.setParallelism(parallelism);
}
private DataStreamSink<?> createStreamSink(
DataStream<RowData> dataStream,
StorageDescriptor sd,
Properties tableProps,
HiveWriterFactory recordWriterFactory,
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,
final int parallelism) {
org.apache.flink.configuration.Configuration conf =
new org.apache.flink.configuration.Configuration();
catalogTable.getOptions().forEach(conf::setString);
String commitPolicies = conf.getString(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
if (!getPartitionKeys().isEmpty() && StringUtils.isNullOrWhitespaceOnly(commitPolicies)) {
throw new FlinkHiveException(
String.format(
"Streaming write to partitioned hive table %s without providing a commit policy. "
+ "Make sure to set a proper value for %s",
identifier, FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
}
HiveRowDataPartitionComputer partComputer;
partComputer = new HiveRowDataPartitionComputer(
jobConf,
hiveShim,
hiveVersion,
JobConfUtils.getDefaultPartitionName(jobConf),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
getPartitionKeyArray(),
partitionPolicy,
partitionField,
timePattern,
inputFormat,
outputFormat,
serializationLib);
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
HiveRollingPolicy rollingPolicy =
new HiveRollingPolicy(
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
if (autoCompaction) {
fileNamingBuilder.withPartPrefix(
convertToUncompacted(fileNamingBuilder.build().getPartPrefix()));
}
OutputFileConfig outputFileConfig = fileNamingBuilder.build();
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
Optional<BulkWriter.Factory<RowData>> bulkFactory =
createBulkWriterFactory(getPartitionKeyArray(), sd);
if (bulkFactory.isPresent()) {
builder =
StreamingFileSink.forBulkFormat(
path,
new FileSystemTableSink.ProjectionBulkFactory(
bulkFactory.get(), partComputer))
.withBucketAssigner(assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet&orc writer.");
} else {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info(
"Hive streaming sink: Use MapReduce RecordWriter "
+ "writer because BulkWriter Factory not available.");
}
}
long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
DataStream<PartitionCommitInfo> writerStream;
if (autoCompaction) {
long compactionSize =
conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
.orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
.getBytes();
writerStream =
StreamingSink.compactionWriter(
dataStream,
bucketCheckInterval,
builder,
fsFactory(),
path,
createCompactReaderFactory(sd, tableProps),
compactionSize,
parallelism,
inlongMetric,
auditHostAndPorts,
dirtyOptions,
dirtySink);
} else {
writerStream =
StreamingSink.writer(
dataStream,
bucketCheckInterval,
builder,
parallelism,
inlongMetric,
auditHostAndPorts,
dirtyOptions,
dirtySink);
}
return StreamingSink.sink(
writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
}
private CompactReader.Factory<RowData> createCompactReaderFactory(
StorageDescriptor sd, Properties properties) {
return new HiveCompactReaderFactory(
sd,
properties,
jobConf,
catalogTable,
hiveVersion,
(RowType) tableSchema.toRowDataType().getLogicalType(),
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
}
private HiveTableMetaStoreFactory msFactory() {
// Get dbName from with properties first.
String dbName = jobConf.get(HiveValidator.CONNECTOR_HIVE_DATABASE);
if (dbName == null) {
dbName = identifier.getDatabaseName();
}
return new HiveTableMetaStoreFactory(jobConf, hiveVersion, dbName, identifier.getObjectName());
}
private HadoopFileSystemFactory fsFactory() {
return new HadoopFileSystemFactory(jobConf);
}
private BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilderForMRWriter(
HiveWriterFactory recordWriterFactory,
StorageDescriptor sd,
TableBucketAssigner assigner,
HiveRollingPolicy rollingPolicy,
OutputFileConfig outputFileConfig) {
HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
return new HadoopPathBasedBulkFormatBuilder<>(new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner,
dirtyOptions, dirtySink, schemaUpdatePolicy, partitionPolicy, hiveShim, hiveVersion,
sinkMultipleEnable, inputFormat, outputFormat, serializationLib)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
}
private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(
String[] partitionColumns, StorageDescriptor sd) {
String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
int formatFieldCount = tableSchema.getFieldCount() - partitionColumns.length;
String[] formatNames = new String[formatFieldCount];
LogicalType[] formatTypes = new LogicalType[formatFieldCount];
for (int i = 0; i < formatFieldCount; i++) {
formatNames[i] = tableSchema.getFieldName(i).get();
formatTypes[i] = tableSchema.getFieldDataType(i).get().getLogicalType();
}
RowType formatType = RowType.of(formatTypes, formatNames);
if (serLib.contains("parquet")) {
Configuration formatConf = new Configuration(jobConf);
sd.getSerdeInfo().getParameters().forEach(formatConf::set);
return Optional.of(
ParquetRowDataBuilder.createWriterFactory(
formatType, formatConf, hiveVersion.startsWith("3.")));
} else if (serLib.contains("orc")) {
Configuration formatConf = new ThreadLocalClassLoaderConfiguration(jobConf);
sd.getSerdeInfo().getParameters().forEach(formatConf::set);
TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType(formatType);
return Optional.of(
hiveShim.createOrcBulkWriterFactory(
formatConf, typeDescription.toString(), formatTypes));
} else {
return Optional.empty();
}
}
@Override
public boolean requiresPartitionGrouping(boolean supportsGrouping) {
this.dynamicGrouping = supportsGrouping;
return supportsGrouping;
}
// get a staging dir associated with a final dir
private String toStagingDir(String finalDir, Configuration conf) throws IOException {
String res = finalDir;
if (!finalDir.endsWith(Path.SEPARATOR)) {
res += Path.SEPARATOR;
}
// TODO: may append something more meaningful than a timestamp, like query ID
res += ".staging_" + System.currentTimeMillis();
Path path = new Path(res);
FileSystem fs = path.getFileSystem(conf);
Preconditions.checkState(
fs.exists(path) || fs.mkdirs(path), "Failed to create staging dir " + path);
fs.deleteOnExit(path);
return res;
}
private List<String> getPartitionKeys() {
/*
* if (table != null) { return
* table.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); }
*/
return catalogTable.getPartitionKeys();
}
private String[] getPartitionKeyArray() {
return getPartitionKeys().toArray(new String[0]);
}
@Override
public void applyStaticPartition(Map<String, String> partition) {
// make it a LinkedHashMap to maintain partition column order
staticPartitionSpec = new LinkedHashMap<>();
for (String partitionCol : getPartitionKeys()) {
if (partition.containsKey(partitionCol)) {
staticPartitionSpec.put(partitionCol, partition.get(partitionCol));
}
}
}
@Override
public void applyOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
if (org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
.get(HIVE_IGNORE_ALL_CHANGELOG)) {
LOG.warn("Hive sink receive all changelog record. "
+ "Regard any other record as insert-only record.");
return ChangelogMode.all();
}
return ChangelogMode.insertOnly();
}
@Override
public DynamicTableSink copy() {
HiveTableSink sink =
new HiveTableSink(
flinkConf,
jobConf,
identifier,
catalogTable,
configuredParallelism,
inlongMetric,
auditHostAndPorts,
dirtyOptions,
dirtySink,
schemaUpdatePolicy,
partitionPolicy,
partitionField,
timePattern,
sinkMultipleEnable,
inputFormat,
outputFormat,
serializationLib);
sink.staticPartitionSpec = staticPartitionSpec;
sink.overwrite = overwrite;
sink.dynamicGrouping = dynamicGrouping;
return sink;
}
@Override
public String asSummaryString() {
return "HiveSink";
}
/**
* Getting size of the file is too expensive. See {@link HiveBulkWriterFactory#create}. We can't
* check for every element, which will cause great pressure on DFS. Therefore, in this
* implementation, only check the file size in {@link #shouldRollOnProcessingTime}, which can
* effectively avoid DFS pressure.
*/
private static class HiveRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
private final long rollingFileSize;
private final long rollingTimeInterval;
private HiveRollingPolicy(long rollingFileSize, long rollingTimeInterval) {
Preconditions.checkArgument(rollingFileSize > 0L);
Preconditions.checkArgument(rollingTimeInterval > 0L);
this.rollingFileSize = rollingFileSize;
this.rollingTimeInterval = rollingTimeInterval;
}
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
return true;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) {
return false;
}
@Override
public boolean shouldRollOnProcessingTime(
PartFileInfo<String> partFileState, long currentTime) {
try {
return currentTime - partFileState.getCreationTime() >= rollingTimeInterval
|| partFileState.getSize() > rollingFileSize;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
}