blob: f863d41fe072af69e8c782f059ae80577cf28626 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.avro;
import java.io.IOException;
import java.io.InputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import com.google.common.annotations.VisibleForTesting;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
/**
* <p>
* Avro File Input Operator
* </p>
* A specific implementation of the AbstractFileInputOperator to read Avro
* container files.<br>
* No need to provide schema,its inferred from the file<br>
* This operator emits a GenericRecord based on the schema derived from the
* input file<br>
* Users can add the {@link FSWindowDataManager}
* to ensure exactly once semantics with a HDFS backed WAL.
*
* @displayName AvroFileInputOperator
* @category Input
* @tags fs, file,avro, input operator
*
* @since 3.4.0
*/
@InterfaceStability.Evolving
public class AvroFileInputOperator extends AbstractFileInputOperator<GenericRecord>
{
private transient long offset = 0L;
@AutoMetric
@VisibleForTesting
int recordCount = 0;
@AutoMetric
@VisibleForTesting
int errorCount = 0;
private transient DataFileStream<GenericRecord> avroDataStream;
public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<GenericRecord>();
public final transient DefaultOutputPort<String> completedFilesPort = new DefaultOutputPort<String>();
public final transient DefaultOutputPort<String> errorRecordsPort = new DefaultOutputPort<String>();
/**
* Returns a input stream given a file path
*
* @param path
* @return InputStream
* @throws IOException
*/
@Override
protected InputStream openFile(Path path) throws IOException
{
InputStream is = super.openFile(path);
if (is != null) {
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
avroDataStream = new DataFileStream<GenericRecord>(is, datumReader);
datumReader.setSchema(avroDataStream.getSchema());
}
return is;
}
/**
* Reads a GenericRecord from the given input stream<br>
* Emits the FileName,Offset,Exception on the error port if its connected
*
* @return GenericRecord
*/
@Override
protected GenericRecord readEntity() throws IOException
{
GenericRecord record = null;
record = null;
try {
if (avroDataStream != null && avroDataStream.hasNext()) {
offset++;
record = avroDataStream.next();
recordCount++;
return record;
}
} catch (AvroRuntimeException are) {
LOG.error("Exception in parsing record for file - " + super.currentFile + " at offset - " + offset, are);
if (errorRecordsPort.isConnected()) {
errorRecordsPort.emit("FileName:" + super.currentFile + ", Offset:" + offset);
}
errorCount++;
throw new AvroRuntimeException(are);
}
return record;
}
/**
* Closes the input stream If the completed files port is connected, the
* completed file is emitted from this port
*/
@Override
protected void closeFile(InputStream is) throws IOException
{
String fileName = super.currentFile;
if (avroDataStream != null) {
avroDataStream.close();
}
super.closeFile(is);
if (completedFilesPort.isConnected()) {
completedFilesPort.emit(fileName);
}
offset = 0;
}
@Override
protected void emit(GenericRecord tuple)
{
if (tuple != null) {
output.emit(tuple);
}
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
errorCount = 0;
recordCount = 0;
}
private static final Logger LOG = LoggerFactory.getLogger(AvroFileInputOperator.class);
}