blob: e2d8eb52c97eb8b40007947daed807383ad02744 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.flink.compiler;
import org.apache.flink.api.common.io.BlockInfo;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.URI;
/**
* Wrapper for {@link FileOutputFormat}
*/
public class WayangFileOutputFormat<IT> extends FileOutputFormat<IT> implements InitializeOnMaster, CleanupWhenUnsuccessful {
private static final long serialVersionUID = 1L;
// --------------------------------------------------------------------------------------------
private static FileSystem.WriteMode DEFAULT_WRITE_MODE;
private static FileOutputFormat.OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
static {
initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
}
/**
* Initialize defaults for output format. Needs to be a static method because it is configured for local
* cluster execution, see LocalFlinkMiniCluster.
* @param configuration The configuration to load defaults from
*/
public static void initDefaultsFromConfiguration(Configuration configuration) {
DEFAULT_WRITE_MODE = FileSystem.WriteMode.OVERWRITE ;
DEFAULT_OUTPUT_DIRECTORY_MODE = FileOutputFormat.OutputDirectoryMode.PARONLY;
}
// --------------------------------------------------------------------------------------------
/**
* The LOG for logging messages in this class.
*/
private static final Logger LOG = LogManager.getLogger(FileOutputFormat.class);
/**
* The key under which the name of the target path is stored in the configuration.
*/
public static final String FILE_PARAMETER_KEY = "flink.output.file";
/**
* The path of the file to be written.
*/
protected Path outputFilePath;
/**
* The write mode of the output.
*/
private FileSystem.WriteMode writeMode;
/**
* The output directory mode
*/
private FileOutputFormat.OutputDirectoryMode outputDirectoryMode;
SequenceFile.Writer writer;
// --------------------------------------------------------------------------------------------
/** The stream to which the data is written; */
protected transient FSDataOutputStream stream;
/** The path that is actually written to (may a a file in a the directory defined by {@code outputFilePath} ) */
private transient Path actualFilePath;
/** Flag indicating whether this format actually created a file, which should be removed on cleanup. */
private transient boolean fileCreated;
/** The config parameter which defines the fixed length of a record. */
public static final String BLOCK_SIZE_PARAMETER_KEY = "output.block_size";
public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
/** The block size to use. */
private long blockSize = NATIVE_BLOCK_SIZE;
private transient WayangFileOutputFormat.BlockBasedOutput blockBasedOutput;
private transient DataOutputViewStreamWrapper outView;
// --------------------------------------------------------------------------------------------
public WayangFileOutputFormat() {}
public WayangFileOutputFormat(String path){
this( new Path(URI.create(path)) );
}
public WayangFileOutputFormat(Path outputPath) {
this.outputFilePath = outputPath;
}
public void setOutputFilePath(Path path) {
if (path == null) {
throw new IllegalArgumentException("Output file path may not be null.");
}
this.outputFilePath = path;
}
public Path getOutputFilePath() {
return this.outputFilePath;
}
public void setWriteMode(FileSystem.WriteMode mode) {
if (mode == null) {
throw new NullPointerException();
}
this.writeMode = mode;
}
public FileSystem.WriteMode getWriteMode() {
return this.writeMode;
}
public void setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode mode) {
if (mode == null) {
throw new NullPointerException();
}
this.outputDirectoryMode = mode;
}
public FileOutputFormat.OutputDirectoryMode getOutputDirectoryMode() {
return this.outputDirectoryMode;
}
// ----------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
try {
// get the output file path, if it was not yet set
if (this.outputFilePath == null) {
// get the file parameter
String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
if (filePath == null) {
throw new IllegalArgumentException("The output path has been specified neither via constructor/setters" +
", nor via the Configuration.");
}
try {
this.outputFilePath = new Path(filePath);
} catch (RuntimeException rex) {
throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage());
}
}
// check if have not been set and use the defaults in that case
if (this.writeMode == null) {
this.writeMode = DEFAULT_WRITE_MODE;
}
if (this.outputDirectoryMode == null) {
this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
}
// read own parameters
this.blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE);
if (this.blockSize < 1 && this.blockSize != NATIVE_BLOCK_SIZE) {
throw new IllegalArgumentException("The block size parameter must be set and larger than 0.");
}
if (this.blockSize > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Currently only block size up to Integer.MAX_VALUE are supported");
}
} catch (Exception e){
throw new WayangException(e);
}
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
if (taskNumber < 0 || numTasks < 1) {
throw new IllegalArgumentException("TaskNumber: " + taskNumber + ", numTasks: " + numTasks);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Opening stream for output (" + (taskNumber + 1) + "/" + numTasks + "). WriteMode=" + writeMode +
", OutputDirectoryMode=" + outputDirectoryMode);
}
Path p = this.outputFilePath;
if (p == null) {
throw new IOException("The file path is null.");
}
final FileSystem fs = p.getFileSystem();
if(fs.exists(p)) {
fs.delete(p, true);
}
this.fileCreated = true;
final SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(new org.apache.hadoop.fs.Path(p.toString()));
final SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(NullWritable.class);
final SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(BytesWritable.class);
writer = SequenceFile.createWriter(new org.apache.hadoop.conf.Configuration(true), fileOption, keyClassOption, valueClassOption);
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void writeRecord(IT record) throws IOException {
//this.blockBasedOutput.startRecord();
try{
ByteArrayOutputStream b = new ByteArrayOutputStream();
ObjectOutputStream objStream = new ObjectOutputStream(b);
objStream.writeObject(record);
BytesWritable bytesWritable = new BytesWritable(b.toByteArray());
writer.append(NullWritable.get(), bytesWritable);
}catch (Exception e){
e.printStackTrace();
}
}
protected String getDirectoryFileName(int taskNumber) {
return Integer.toString(taskNumber + 1);
}
@Override
public void close() throws IOException {
try {
this.writer.close();
DataOutputViewStreamWrapper o = this.outView;
if (o != null) {
o.close();
}
}
finally {
final FSDataOutputStream s = this.stream;
if (s != null) {
this.stream = null;
s.close();
}
}
}
/**
* Initialization of the distributed file system if it is used.
*
* @param parallelism The task parallelism.
*/
@Override
public void initializeGlobal(int parallelism) throws IOException {
try {
final Path path = getOutputFilePath();
final FileSystem fs = path.getFileSystem();
// only distributed file systems can be initialized at start-up time.
if (fs.isDistributedFS()) {
final FileSystem.WriteMode writeMode = getWriteMode();
final FileOutputFormat.OutputDirectoryMode outDirMode = getOutputDirectoryMode();
if (parallelism == 1 && outDirMode == FileOutputFormat.OutputDirectoryMode.PARONLY) {
// output is not written in parallel and should be written to a single file.
// prepare distributed output path
if (!fs.initOutPathDistFS(path, writeMode, false)) {
// output preparation failed! Cancel task.
throw new IOException("Output path could not be initialized.");
}
} else {
// output should be written to a directory
// only distributed file systems can be initialized at start-up time.
if (!fs.initOutPathDistFS(path, writeMode, true)) {
throw new IOException("Output directory could not be created.");
}
}
}
}catch (Exception e){
throw new WayangException(e);
}
}
@Override
public void tryCleanupOnError() {
if (this.fileCreated) {
this.fileCreated = false;
try {
close();
} catch (IOException e) {
LOG.error("Could not properly close FileOutputFormat.", e);
}
try {
FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
} catch (FileNotFoundException e) {
// ignore, may not be visible yet or may be already removed
} catch (Throwable t) {
LOG.error("Could not remove the incomplete file " + actualFilePath + '.', t);
}
}
}
/**
* Writes a block info at the end of the blocks.<br>
* Current implementation uses only int and not long.
*
*/
protected class BlockBasedOutput extends FilterOutputStream {
private static final int NO_RECORD = -1;
private final int maxPayloadSize;
private int blockPos;
private int blockCount, totalCount;
private long firstRecordStartPos = NO_RECORD;
private BlockInfo blockInfo = WayangFileOutputFormat.this.createBlockInfo();
private DataOutputView headerStream;
public BlockBasedOutput(OutputStream out, int blockSize) {
super(out);
this.headerStream = new DataOutputViewStreamWrapper(out);
this.maxPayloadSize = blockSize - this.blockInfo.getInfoSize();
}
@Override
public void close() throws IOException {
if (this.blockPos > 0) {
this.writeInfo();
}
super.flush();
super.close();
}
public void startRecord() {
if (this.firstRecordStartPos == NO_RECORD) {
this.firstRecordStartPos = this.blockPos;
}
this.blockCount++;
this.totalCount++;
}
@Override
public void write(byte[] b) throws IOException {
this.write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
for (int remainingLength = len, offset = off; remainingLength > 0;) {
int blockLen = Math.min(remainingLength, this.maxPayloadSize - this.blockPos);
this.out.write(b, offset, blockLen);
this.blockPos += blockLen;
if (this.blockPos >= this.maxPayloadSize) {
this.writeInfo();
}
remainingLength -= blockLen;
offset += blockLen;
}
}
@Override
public void write(int b) throws IOException {
super.write(b);
if (++this.blockPos >= this.maxPayloadSize) {
this.writeInfo();
}
}
private void writeInfo() throws IOException {
this.blockInfo.setRecordCount(this.blockCount);
this.blockInfo.setAccumulatedRecordCount(this.totalCount);
this.blockInfo.setFirstRecordStart(this.firstRecordStartPos == NO_RECORD ? 0 : this.firstRecordStartPos);
WayangFileOutputFormat.this.complementBlockInfo(this.blockInfo);
this.blockInfo.write(this.headerStream);
this.blockPos = 0;
this.blockCount = 0;
this.firstRecordStartPos = NO_RECORD;
}
}
protected BlockInfo createBlockInfo() {
return new BlockInfo();
}
protected void complementBlockInfo(BlockInfo blockInfo) {}
}