blob: ef40a692a2cdcd2ae8b42b535cb5b38aa7c7a0fe [file] [log] [blame]
package org.apache.apex.examples.kafka.kafka2hdfs;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import javax.validation.constraints.NotNull;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
/**
* Converts each tuple to a string and writes it as a new line to the output file
*/
public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
private static final String NL = System.lineSeparator();
private static final Charset CS = StandardCharsets.UTF_8;
@NotNull
private String baseName;
@Override
public byte[] getBytesForTuple(byte[] t) {
String result = new String(t, CS) + NL;
return result.getBytes(CS);
}
@Override
protected String getFileName(byte[] tuple) {
return baseName;
}
public String getBaseName() { return baseName; }
public void setBaseName(String v) { baseName = v; }
}