| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samoa.streams; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| |
| import org.apache.samoa.instances.Instances; |
| import org.apache.samoa.moa.core.InstanceExample; |
| import org.apache.samoa.moa.core.ObjectRepository; |
| import org.apache.samoa.moa.tasks.TaskMonitor; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.github.javacliparser.FileOption; |
| import com.github.javacliparser.IntOption; |
| import com.github.javacliparser.StringOption; |
| |
| /** |
| * InstanceStream implementation to handle Apache Avro Files. Handles both JSON & Binary encoded streams |
| * |
| * |
| */ |
| public class AvroFileStream extends FileStream { |
| |
| private static final long serialVersionUID = 1L; |
| private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class); |
| |
| public FileOption avroFileOption = new FileOption("avroFile", 'f', "Avro File(s) to load.", null, null, false); |
| /*public IntOption classIndexOption = new IntOption("classIndex", 'c', |
| "Class index of data. 0 for none or -1 for last attribute in file.", -1, -1, Integer.MAX_VALUE);*/ |
| public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', |
| "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY"); |
| |
| /** Represents the last read Instance **/ |
| protected InstanceExample lastInstanceRead; |
| |
| /** Represents the binary input stream of avro data **/ |
| //protected transient InputStream inputStream = null; |
| |
| /** The extension to be considered for the files **/ |
| private static final String AVRO_FILE_EXTENSION = "avro"; |
| |
| /* (non-Javadoc) |
| * @see org.apache.samoa.streams.FileStream#reset() |
| * Reset the BINARY encoded Avro Stream & Close the file source |
| */ |
| @Override |
| protected void reset() { |
| |
| try { |
| if (this.inputStream != null) |
| this.inputStream.close(); |
| |
| fileSource.reset(); |
| } catch (IOException ioException) { |
| logger.error(AVRO_STREAM_FAILED_RESTART_ERROR + " : {}", ioException); |
| throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException); |
| } |
| |
| if (!getNextFileStream()) { |
| hitEndOfStream = true; |
| throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR); |
| } |
| } |
| |
| /** |
| * Get next File Stream & set the class index read from the command line option |
| * |
| * @return |
| */ |
| @Override |
| protected boolean getNextFileStream() { |
| if (this.inputStream != null) |
| try { |
| this.inputStream.close(); |
| } catch (IOException ioException) { |
| logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException); |
| throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); |
| } |
| |
| this.inputStream = this.fileSource.getNextInputStream(); |
| if (inputStream == null) |
| return false; |
| |
| this.instances = new Instances(this.inputStream, classIndexOption.getValue(), encodingFormatOption.getValue()); |
| |
| if (this.classIndexOption.getValue() < 0) { |
| this.instances.setClassIndex(this.instances.numAttributes() - 1); |
| } else if (this.classIndexOption.getValue() > 0) { |
| this.instances.setClassIndex(this.classIndexOption.getValue() - 1); |
| } |
| return true; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile() |
| * Read next Instance from File. Return false if unable to read next Instance |
| */ |
| @Override |
| protected boolean readNextInstanceFromFile() { |
| try { |
| if (this.instances.readInstance()) { |
| this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); |
| this.instances.delete(); |
| return true; |
| } |
| if (this.inputStream != null) { |
| this.inputStream.close(); |
| this.inputStream = null; |
| } |
| return false; |
| } catch (IOException ioException) { |
| logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException); |
| throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); |
| } |
| |
| } |
| |
| @Override |
| public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { |
| super.prepareForUseImpl(monitor, repository); |
| String filePath = this.avroFileOption.getFile().getAbsolutePath(); |
| //this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION); |
| this.fileSource.init(filePath, null); |
| this.lastInstanceRead = null; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.samoa.streams.FileStream#getLastInstanceRead() |
| * Return the last read Instance |
| */ |
| @Override |
| protected InstanceExample getLastInstanceRead() { |
| return this.lastInstanceRead; |
| } |
| |
| @Override |
| public void getDescription(StringBuilder sb, int indent) { |
| throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD); |
| } |
| |
| /** Error Messages to for all types of Avro File Streams */ |
| protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed."; |
| protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty."; |
| protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream."; |
| protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet."; |
| |
| } |