blob: 111ea7bc988ff6f7ab078e6f1d350061e3dbde80 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.fs;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.lib.converter.Converter;
import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
import com.datatorrent.netlet.util.DTThrowable;
/**
* This class is responsible for writing tuples to HDFS. All tuples are written
* to the same file. Rolling file based on size, no. of tuples, idle windows,
* elapsed windows is supported. The user can configure how tuples are written
* to the file through the converter property.
*
* @since 3.4.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class GenericFileOutputOperator<INPUT> extends AbstractSingleFileOutputOperator<INPUT>
{
/**
* Flag to mark if new data in current application window
*/
private transient boolean isNewDataInCurrentWindow;
/**
* Separator between the tuples
*/
private String tupleSeparator;
/**
* byte[] representation of tupleSeparator
*/
private transient byte[] tupleSeparatorBytes;
/**
* No. of bytes received in current application window
*/
@AutoMetric
private long byteCount;
/**
* No. of tuples present in current part for file
*/
private long currentPartTupleCount;
/**
* Max. number of tuples allowed per part. Part file will be finalized after
* these many tuples
*/
private long maxTupleCount = Long.MAX_VALUE;
/**
* No. of windows since last new data received
*/
private long currentPartIdleWindows;
/**
* Converter for conversion of input tuples to byte[]
*/
@NotNull
private Converter<INPUT, byte[]> converter;
/**
* Max number of idle windows for which no new data is added to current part
* file. Part file will be finalized after these many idle windows after last
* new data.
*/
private long maxIdleWindows = Long.MAX_VALUE;
/**
* Stream codec for string input port
*/
protected StreamCodec<String> stringStreamCodec;
/**
* Default value for stream expiry
*/
private static final long DEFAULT_STREAM_EXPIRY_ACCESS_MILL = 60 * 60 * 1000L; //1 hour
/**
* Default value for rotation windows
*/
private static final int DEFAULT_ROTATION_WINDOWS = 2 * 60 * 10; //10 min
/**
* Initializing default values for tuple separator, stream expiry, rotation
* windows
*/
public GenericFileOutputOperator()
{
setTupleSeparator(System.getProperty("line.separator"));
setExpireStreamAfterAccessMillis(DEFAULT_STREAM_EXPIRY_ACCESS_MILL);
setRotationWindows(DEFAULT_ROTATION_WINDOWS);
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
this.tupleSeparatorBytes = tupleSeparator.getBytes();
}
/**
* {@inheritDoc}
*
* @return byte[] representation of the given tuple. if input tuple is of type
* byte[] then it is returned as it is. for any other type toString()
* representation is used to generate byte[].
*/
@Override
protected byte[] getBytesForTuple(INPUT tuple)
{
ByteArrayOutputStream bytesOutStream = new ByteArrayOutputStream();
try {
bytesOutStream.write(converter.convert(tuple));
bytesOutStream.write(tupleSeparatorBytes);
byteCount += bytesOutStream.size();
return bytesOutStream.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
bytesOutStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
* Initializing per window level fields {@inheritDoc}
*/
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
byteCount = 0;
isNewDataInCurrentWindow = false;
}
/**
* {@inheritDoc} Does additional state maintenance for rollover
*/
@Override
protected void processTuple(INPUT tuple)
{
super.processTuple(tuple);
isNewDataInCurrentWindow = true;
if (++currentPartTupleCount == maxTupleCount) {
rotateCall(getPartitionedFileName());
}
}
/**
* {@inheritDoc} Does additional checks if file should be rolled over for this
* window.
*/
@Override
public void endWindow()
{
super.endWindow();
if (!isNewDataInCurrentWindow) {
++currentPartIdleWindows;
} else {
currentPartIdleWindows = 0;
}
if (checkEndWindowFinalization()) {
rotateCall(getPartitionedFileName());
}
}
/**
* Rollover check at the endWindow
*/
private boolean checkEndWindowFinalization()
{
if ((currentPartIdleWindows == maxIdleWindows)) {
return true;
}
return false;
}
/**
* {@inheritDoc} Handles file rotation along with exception handling
*
* @param lastFile
*/
protected void rotateCall(String lastFile)
{
try {
if (currentPartTupleCount != 0) {
this.rotate(lastFile);
}
currentPartIdleWindows = 0;
currentPartTupleCount = 0;
} catch (IOException ex) {
LOG.error("Exception in file rotation", ex);
DTThrowable.rethrow(ex);
} catch (ExecutionException ex) {
LOG.error("Exception in file rotation", ex);
DTThrowable.rethrow(ex);
}
}
/**
* @return Separator between the tuples
*/
public String getTupleSeparator()
{
return tupleSeparator;
}
/**
* @param separator
* Separator between the tuples
*/
public void setTupleSeparator(String separator)
{
this.tupleSeparator = separator;
this.tupleSeparatorBytes = separator.getBytes();
}
/**
* @return max tuples in a part file
*/
public long getMaxTupleCount()
{
return maxTupleCount;
}
/**
* @param maxTupleCount
* max tuples in a part file
*/
public void setMaxTupleCount(long maxTupleCount)
{
this.maxTupleCount = maxTupleCount;
}
/**
* @return max number of idle windows for rollover
*/
public long getMaxIdleWindows()
{
return maxIdleWindows;
}
/**
* @param maxIdleWindows
* max number of idle windows for rollover
*/
public void setMaxIdleWindows(long maxIdleWindows)
{
this.maxIdleWindows = maxIdleWindows;
}
/**
* Converter for conversion of input tuples to byte[]
* @return converter
*/
public Converter<INPUT, byte[]> getConverter()
{
return converter;
}
/**
* Converter for conversion of input tuples to byte[]
* @param converter
*/
public void setConverter(Converter<INPUT, byte[]> converter)
{
this.converter = converter;
}
/**
* Converter returning input tuples as byte[] without any conversion
*/
public static class NoOpConverter implements Converter<byte[], byte[]>
{
@Override
public byte[] convert(byte[] tuple)
{
return tuple;
}
}
public static class BytesFileOutputOperator extends GenericFileOutputOperator<byte[]>
{
public BytesFileOutputOperator()
{
setConverter(new NoOpConverter());
}
}
/**
* Converter returning byte[] conversion of the input String.
*/
public static class StringToBytesConverter implements Converter<String, byte[]>
{
@Override
public byte[] convert(String tuple)
{
return tuple.getBytes();
}
}
public static class StringFileOutputOperator extends GenericFileOutputOperator<String>
{
public StringFileOutputOperator()
{
setConverter(new StringToBytesConverter());
}
}
private static final Logger LOG = LoggerFactory.getLogger(GenericFileOutputOperator.class);
}