blob: c9815be135815f735fc476f594a6e8e77aa6133a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.sinks.orc;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.SafetyNetWrapperFileSystem;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.sources.orc.OrcSchemaConverter;
import org.apache.flink.table.sources.orc.OrcSerializer;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.orc.CompressionKind;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import static org.apache.orc.OrcConf.COMPRESS;
import static org.apache.orc.OrcConf.MAPRED_OUTPUT_SCHEMA;
import static org.apache.orc.OrcConf.ROW_INDEX_STRIDE;
/**
* A subclass of {@link OutputFormat} to write {@link BaseRow} to Orc files.
*/
public class RowOrcOutputFormat implements OutputFormat<BaseRow> {
private static final Logger LOG = LoggerFactory.getLogger(RowOrcOutputFormat.class);
private static final String FILE_PREFIX_NAME = "orc-";
private static final int DEFAULT_ROW_INDEX_STRIDE = 10000;
private final InternalType[] fieldTypes;
private final String[] fieldNames;
private final String dir;
private final CompressionKind compression;
private final String filePrefixName;
private final int rowIndexStride;
private org.apache.hadoop.mapreduce.RecordWriter<Void, OrcStruct> realWriter;
private TaskAttemptContext taskContext;
private final OrcSerializer serializer;
private final String typeDescription;
private transient OrcStruct struct;
public RowOrcOutputFormat(
InternalType[] fieldTypes, String[] fieldNames, String dir) {
this(fieldTypes, fieldNames, dir, CompressionKind.NONE, FILE_PREFIX_NAME, DEFAULT_ROW_INDEX_STRIDE);
}
public RowOrcOutputFormat(
InternalType[] fieldTypes, String[] fieldNames, String dir, CompressionKind compression) {
this(fieldTypes, fieldNames, dir, compression, FILE_PREFIX_NAME, DEFAULT_ROW_INDEX_STRIDE);
}
public RowOrcOutputFormat(
InternalType[] fieldTypes, String[] fieldNames, String dir,
CompressionKind compression, String filePrefixName, int rowIndexStride) {
Preconditions.checkArgument(fieldNames != null && fieldNames.length > 0);
Preconditions.checkArgument(fieldTypes != null && fieldTypes.length == fieldNames.length);
this.fieldTypes = fieldTypes;
this.fieldNames = fieldNames;
this.dir = dir;
this.compression = compression;
this.filePrefixName = filePrefixName;
this.rowIndexStride = rowIndexStride;
this.serializer = new OrcSerializer(fieldTypes, fieldNames);
this.typeDescription = OrcSchemaConverter.convert(fieldTypes, fieldNames).toString();
this.struct = (OrcStruct) OrcStruct.createValue(TypeDescription.fromString(this.typeDescription));
}
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
JobConf jobConf = new JobConf();
// init and register file system
String fileName = filePrefixName + numTasks + "-" +
taskNumber + "-" + System.currentTimeMillis() + ".orc";
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(
new org.apache.flink.core.fs.Path(dir, String.valueOf(taskNumber)), fileName);
FileSystem fs = path.getFileSystem();
if (fs instanceof SafetyNetWrapperFileSystem) {
fs = ((SafetyNetWrapperFileSystem) fs).getWrappedDelegate();
}
if (fs instanceof HadoopFileSystem) {
jobConf.addResource(((HadoopFileSystem) fs).getConfig());
}
if (!(fs instanceof LocalFileSystem || fs instanceof HadoopFileSystem)) {
throw new RuntimeException("FileSystem: " + fs.getClass().getCanonicalName() + " is not supported.");
}
// clean up output file in case of failover.
fs.delete(path, true);
OrcOutputFormat realOutputFormat = new OrcOutputFormat<OrcStruct>() {
@Override
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
return new Path(new Path(dir, String.valueOf(taskNumber)), fileName);
}
};
jobConf.set(MAPRED_OUTPUT_SCHEMA.getAttribute(), OrcSchemaConverter.convert(fieldTypes, fieldNames).toString());
jobConf.set(COMPRESS.getAttribute(), compression.name());
jobConf.set(ROW_INDEX_STRIDE.getAttribute(), String.valueOf(rowIndexStride));
TaskAttemptID taskAttemptID = new TaskAttemptID();
taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
realWriter = realOutputFormat.getRecordWriter(taskContext);
}
@Override
public void writeRecord(BaseRow record) throws IOException {
try {
realWriter.write(null, serializer.serialize(record, struct));
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
try {
if (realWriter != null) {
realWriter.close(taskContext);
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
this.struct.write(out);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
if (this.struct == null) {
this.struct = new OrcStruct(TypeDescription.fromString(this.typeDescription));
}
this.struct.readFields(in);
}
}