blob: dd3dcc62a4d0231f5e94673ffa6f2572ecdb1b32 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samoa.streams;
import java.io.IOException;
import java.io.InputStream;
import org.apache.samoa.instances.Instances;
import org.apache.samoa.moa.core.InstanceExample;
import org.apache.samoa.moa.core.ObjectRepository;
import org.apache.samoa.moa.tasks.TaskMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.javacliparser.FileOption;
import com.github.javacliparser.IntOption;
import com.github.javacliparser.StringOption;
/**
* InstanceStream implementation to handle Apache Avro Files. Handles both JSON & Binary encoded streams
*
*
*/
public class AvroFileStream extends FileStream {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class);
public FileOption avroFileOption = new FileOption("avroFile", 'f', "Avro File(s) to load.", null, null, false);
/*public IntOption classIndexOption = new IntOption("classIndex", 'c',
"Class index of data. 0 for none or -1 for last attribute in file.", -1, -1, Integer.MAX_VALUE);*/
public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e',
"Encoding format for Avro Files. Can be JSON/AVRO", "BINARY");
/** Represents the last read Instance **/
protected InstanceExample lastInstanceRead;
/** Represents the binary input stream of avro data **/
//protected transient InputStream inputStream = null;
/** The extension to be considered for the files **/
private static final String AVRO_FILE_EXTENSION = "avro";
/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#reset()
* Reset the BINARY encoded Avro Stream & Close the file source
*/
@Override
protected void reset() {
try {
if (this.inputStream != null)
this.inputStream.close();
fileSource.reset();
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_RESTART_ERROR + " : {}", ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException);
}
if (!getNextFileStream()) {
hitEndOfStream = true;
throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR);
}
}
/**
* Get next File Stream & set the class index read from the command line option
*
* @return
*/
@Override
protected boolean getNextFileStream() {
if (this.inputStream != null)
try {
this.inputStream.close();
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
}
this.inputStream = this.fileSource.getNextInputStream();
if (inputStream == null)
return false;
this.instances = new Instances(this.inputStream, classIndexOption.getValue(), encodingFormatOption.getValue());
if (this.classIndexOption.getValue() < 0) {
this.instances.setClassIndex(this.instances.numAttributes() - 1);
} else if (this.classIndexOption.getValue() > 0) {
this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
}
return true;
}
/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile()
* Read next Instance from File. Return false if unable to read next Instance
*/
@Override
protected boolean readNextInstanceFromFile() {
try {
if (this.instances.readInstance()) {
this.lastInstanceRead = new InstanceExample(this.instances.instance(0));
this.instances.delete();
return true;
}
if (this.inputStream != null) {
this.inputStream.close();
this.inputStream = null;
}
return false;
} catch (IOException ioException) {
logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException);
throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
}
}
@Override
public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) {
super.prepareForUseImpl(monitor, repository);
String filePath = this.avroFileOption.getFile().getAbsolutePath();
//this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION);
this.fileSource.init(filePath, null);
this.lastInstanceRead = null;
}
/* (non-Javadoc)
* @see org.apache.samoa.streams.FileStream#getLastInstanceRead()
* Return the last read Instance
*/
@Override
protected InstanceExample getLastInstanceRead() {
return this.lastInstanceRead;
}
@Override
public void getDescription(StringBuilder sb, int indent) {
throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD);
}
/** Error Messages to for all types of Avro File Streams */
protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed.";
protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty.";
protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream.";
protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet.";
}