SAMOA-47: Integrate Avro Streams with SAMOA
diff --git a/pom.xml b/pom.xml
index 41fc5bd..71b131f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@
<storm.version>0.9.4</storm.version>
<!-- storm 0.8.2 loads zookeeper classes with hardcoded names from 3.3 version-->
<zookeeper.storm.version>3.4.6</zookeeper.storm.version>
+ <avro.version>1.7.7</avro.version>
</properties>
<dependencies>
diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
new file mode 100644
index 0000000..e684687
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
@@ -0,0 +1,171 @@
+package org.apache.samoa.moa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+import org.apache.samoa.streams.FileStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+
+/**
+ * InstanceStream implementation to handle Apache Avro Files.
+ * Handles both JSON & Binary encoded streams
+ *
+ * @author jayadeepj
+ *
+ */
+public class AvroFileStream extends FileStream {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class);
+
+ public FileOption avroFileOption = new FileOption("avroFile", 'f',"Avro File(s) to load.", null, null, false);
+ public IntOption classIndexOption = new IntOption("classIndex", 'c',"Class index of data. 0 for none or -1 for last attribute in file.",-1, -1, Integer.MAX_VALUE);
+ public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY");
+
+ /** Represents the last read Instance **/
+ protected InstanceExample lastInstanceRead;
+
+ /** Represents the binary input stream of avro data **/
+ protected transient InputStream inputStream = null;
+
+ /** The extension to be considered for the files **/
+ private static final String AVRO_FILE_EXTENSION = "avro";
+
+ /* (non-Javadoc)
+ * @see org.apache.samoa.streams.FileStream#reset()
+ * Reset the BINARY encoded Avro Stream & Close the file source
+ */
+ @Override
+ protected void reset() {
+
+ try {
+ if (this.inputStream != null)
+ this.inputStream.close();
+
+ fileSource.reset();
+ } catch (IOException ioException) {
+ logger.error(AVRO_STREAM_FAILED_RESTART_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException);
+ }
+
+ if (!getNextFileStream()) {
+ hitEndOfStream = true;
+ throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR);
+ }
+ }
+
+
+ /**
+ * Get next File Stream & set the class index read from the command line option
+ * @return
+ */
+ protected boolean getNextFileStream() {
+ if (this.inputStream != null)
+ try {
+ this.inputStream.close();
+ } catch (IOException ioException) {
+ logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
+ }
+
+ this.inputStream = this.fileSource.getNextInputStream();
+
+ if (this.inputStream == null)
+ return false;
+
+ this.instances = new Instances(this.inputStream, classIndexOption.getValue(),encodingFormatOption.getValue());
+
+ if (this.classIndexOption.getValue() < 0) {
+ this.instances.setClassIndex(this.instances.numAttributes() - 1);
+ } else if (this.classIndexOption.getValue() > 0) {
+ this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+ }
+ return true;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile()
+ * Read next Instance from File. Return false if unable to read next Instance
+ */
+ @Override
+ protected boolean readNextInstanceFromFile() {
+ try {
+ if (this.instances.readInstance()) {
+ this.lastInstanceRead = new InstanceExample(this.instances.instance(0));
+ this.instances.delete();
+ return true;
+ }
+ if (this.inputStream != null) {
+ this.inputStream.close();
+ this.inputStream = null;
+ }
+ return false;
+ } catch (IOException ioException) {
+ logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
+ }
+
+ }
+
+ @Override
+ public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) {
+ super.prepareForUseImpl(monitor, repository);
+ String filePath = this.avroFileOption.getFile().getAbsolutePath();
+ this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION);
+ this.lastInstanceRead = null;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.samoa.streams.FileStream#getLastInstanceRead()
+ * Return the last read Instance
+ */
+ @Override
+ protected InstanceExample getLastInstanceRead() {
+ return this.lastInstanceRead;
+ }
+
+
+ @Override
+ public void getDescription(StringBuilder sb, int indent) {
+ throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD);
+ }
+
+ /** Error Messages to for all types of Avro File Streams */
+ protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed.";
+ protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty.";
+ protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream.";
+ protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet.";
+
+}
diff --git a/samoa-instances/pom.xml b/samoa-instances/pom.xml
index ed24597..64ffcd8 100644
--- a/samoa-instances/pom.xml
+++ b/samoa-instances/pom.xml
@@ -34,4 +34,11 @@
<artifactId>samoa</artifactId>
<version>0.4.0-incubating-SNAPSHOT</version>
</parent>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+ </dependencies>
</project>
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
index 3d314f0..325d1b8 100644
--- a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
@@ -23,7 +23,6 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
-import java.io.Serializable;
import java.io.StreamTokenizer;
import java.util.ArrayList;
import java.util.List;
@@ -33,7 +32,7 @@
/**
* @author abifet
*/
-public class ArffLoader implements Serializable {
+public class ArffLoader implements Loader {
protected InstanceInformation instanceInformation;
@@ -393,4 +392,9 @@
this.instanceInformation.setClassIndex(classAttribute - 1);
}
}
+
+ @Override
+ public Instance readInstance() {
+ return readInstance(this.reader);
+ }
}
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java
new file mode 100644
index 0000000..c3e32dc
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java
@@ -0,0 +1,120 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load Data from Binary Avro Stream and parse to corresponding Dense & Parse Instances
+ *
+ * @author jayadeepj
+ *
+ */
+public class AvroBinaryLoader extends AvroLoader {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(AvroBinaryLoader.class);
+
+ /** Avro Binary reader for an input stream **/
+ protected DataFileStream<GenericRecord> dataFileStream = null;
+
+ public AvroBinaryLoader(InputStream inputStream,int classAttribute) {
+ super(classAttribute);
+ initializeSchema(inputStream);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream)
+ */
+ @Override
+ public void initializeSchema(InputStream inputStream)
+ {
+ try {
+ this.datumReader = new GenericDatumReader<GenericRecord>();
+ this.dataFileStream = new DataFileStream<GenericRecord>(inputStream, datumReader);
+ this.schema = dataFileStream.getSchema();
+
+ this.instanceInformation = getHeader();
+ this.isSparseData = isSparseData();
+
+ if (classAttribute < 0) {
+ this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1);
+ } else if (classAttribute > 0) {
+ this.instanceInformation.setClassIndex(classAttribute - 1);
+ }
+
+ } catch (IOException ioException) {
+ logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.samoa.instances.AvroLoader#readInstance()
+ */
+ @Override
+ public Instance readInstance() {
+
+ GenericRecord record = null;
+
+ try{
+ if (dataFileStream.hasNext()) {
+ record =(GenericRecord) dataFileStream.next();
+ }
+ } catch (Exception ioException) {
+ logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+ }
+
+ if(record==null)
+ {
+ //closeReader();
+ return null;
+ }
+
+ if(isSparseData)
+ return readInstanceSparse(record);
+
+ return readInstanceDense(record);
+ }
+
+ /**
+ * Close the Avro Data Stream
+ */
+ private void closeReader()
+ {
+ if(dataFileStream !=null)
+ try {
+ dataFileStream.close();
+ } catch (IOException ioException) {
+ logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+ }
+ }
+}
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java
new file mode 100644
index 0000000..827b507
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java
@@ -0,0 +1,133 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load Data from JSON Avro Stream and parse to corresponding Dense & Parse Instances
+ *
+ * @author jayadeepj
+ *
+ */
+public class AvroJsonLoader extends AvroLoader {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(AvroJsonLoader.class);
+
+ /** The Character reader for JSON read */
+ protected Reader reader = null;
+
+ public AvroJsonLoader(InputStream inputStream, int classAttribute) {
+ super(classAttribute);
+ initializeSchema(inputStream);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream)
+ */
+ @Override
+ public void initializeSchema(InputStream inputStream)
+ {
+ String schemaString = null;
+ try {
+ this.reader = new BufferedReader(new InputStreamReader(inputStream));
+ schemaString = ((BufferedReader)this.reader).readLine();
+ this.schema = new Schema.Parser().parse(schemaString);
+ this.datumReader = new GenericDatumReader<GenericRecord>(schema);
+ this.instanceInformation = getHeader();
+ this.isSparseData = isSparseData();
+
+ if (classAttribute < 0) {
+ this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1);
+ } else if (classAttribute > 0) {
+ this.instanceInformation.setClassIndex(classAttribute - 1);
+ }
+
+ } catch (IOException ioException) {
+ logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.samoa.instances.AvroLoader#readInstance()
+ */
+ @Override
+ public Instance readInstance() {
+
+ String line = null;
+ Decoder decoder = null;
+ GenericRecord record = null;
+
+ try{
+ while ((line = ((BufferedReader)reader).readLine()) != null) {
+ if(line==null || line.trim().length()<=0)
+ continue;
+
+ decoder = DecoderFactory.get().jsonDecoder(schema, line);
+ record = datumReader.read(null, decoder);
+ break;
+ }
+ } catch (IOException ioException) {
+ logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+ }
+
+ if(record==null)
+ {
+ closeReader();
+ return null;
+ }
+
+ if(isSparseData)
+ return readInstanceSparse(record);
+
+ return readInstanceDense(record);
+ }
+
+ /**
+ * Close the Avro Data Stream
+ */
+ private void closeReader()
+ {
+ if(reader !=null)
+ try {
+ reader.close();
+ } catch (IOException ioException) {
+ logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException);
+ throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+ }
+ }
+}
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java
new file mode 100644
index 0000000..09f410f
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java
@@ -0,0 +1,286 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData.EnumSymbol;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+
+/**
+ * Load Data from Avro Stream and parse to corresponding Dense & Parse Instances
+ * Abstract Class: Subclass this class for different types of Avro Encodings
+ *
+ * @author jayadeepj
+ *
+ */
+public abstract class AvroLoader implements Loader {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Representation of the Avro Schema for the Instances being read. Built from the first line in the data */
+ protected Schema schema = null;
+
+ /** Meta-data of the Instance */
+ protected InstanceInformation instanceInformation;
+
+ /** List of attributes in the data as read from the schema */
+ protected List<Attribute> attributes;
+
+ /** This variable is to check if the data stored is Sparse or Dense */
+ protected boolean isSparseData;
+
+ protected int classAttribute;
+
+ /** Datum Reader for Avro Data*/
+ public DatumReader<GenericRecord> datumReader = null;
+
+ public AvroLoader(int classAttribute) {
+ this.classAttribute = classAttribute;
+ this.isSparseData = false;
+ }
+
+ /** Intialize Avro Schema, Meta Data, InstanceInformation from Input Avro Stream */
+ public abstract void initializeSchema(InputStream inputStream);
+
+ /** Read a single SAMOA Instance from Input Avro Stream */
+ public abstract Instance readInstance();
+
+ /**
+ * Method to read Dense Instances from Avro File
+ * @return Instance
+ */
+ protected Instance readInstanceDense(GenericRecord record)
+ {
+ Instance instance = new DenseInstance(this.instanceInformation.numAttributes() + 1);
+ int numAttribute = 0;
+
+ for (Attribute attribute : attributes) {
+ Object value = record.get(attribute.name);
+
+ boolean isNumeric = attributes.get(numAttribute).isNumeric();
+ boolean isNominal = attributes.get(numAttribute).isNominal();
+
+ if(isNumeric)
+ {
+ if(value instanceof Double)
+ this.setDenseValue(instance, numAttribute, (double)value);
+ else if (value instanceof Long)
+ this.setDenseValue(instance, numAttribute, (long)value);
+ else if (value instanceof Integer)
+ this.setDenseValue(instance, numAttribute, (int)value);
+ else
+ throw new RuntimeException("Invalid data type in the Avro data for Numeric Type : "+attribute.name);
+ }
+ else if(isNominal)
+ {
+ double valueAttribute;
+
+ if (!(value instanceof EnumSymbol))
+ throw new RuntimeException("Invalid data type in the Avro data for Nominal Type : "+attribute.name);
+
+ EnumSymbol enumSymbolalue = (EnumSymbol)value;
+
+ String stringValue = enumSymbolalue.toString();
+
+ if (("?".equals(stringValue))||(stringValue==null)) {
+ valueAttribute = Double.NaN;
+ } else {
+ valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue);
+ }
+
+ this.setDenseValue(instance, numAttribute, valueAttribute);
+ }
+ numAttribute++;
+ }
+
+ return (numAttribute > 0) ? instance : null;
+
+ }
+
+
+ /**
+ * Sets a Dense Value in the corresponding attribute index
+ * @param instance is the Instance where values will be set
+ * @param numAttribute is the index of the attribute
+ * @param valueAttribute is the value of the attribute for this Instance
+ */
+
+ private void setDenseValue(Instance instance, int numAttribute, double valueAttribute) {
+
+ if (this.instanceInformation.classIndex() == numAttribute)
+ instance.setClassValue(valueAttribute);
+ else
+ instance.setValue(numAttribute, valueAttribute);
+ }
+
+ /**
+ * Method to read Sparse Instances from Avro File
+ * @return Instance
+ */
+ protected Instance readInstanceSparse(GenericRecord record) {
+
+ Instance instance = new SparseInstance(1.0, null);
+ int numAttribute = -1;
+ ArrayList<Double> attributeValues = new ArrayList<Double>();
+ List<Integer> indexValues = new ArrayList<Integer>();
+
+ for (Attribute attribute : attributes) {
+ numAttribute++;
+ Object value = record.get(attribute.name);
+
+ boolean isNumeric = attributes.get(numAttribute).isNumeric();
+ boolean isNominal = attributes.get(numAttribute).isNominal();
+
+ /** If value is empty/null iterate to the next attribute.**/
+ if(value==null)
+ continue;
+
+ if(isNumeric)
+ {
+ if(value instanceof Double)
+ this.setSparseValue(instance, indexValues, attributeValues, numAttribute, (double)value);
+ else if (value instanceof Long)
+ this.setSparseValue(instance,indexValues, attributeValues, numAttribute, (long)value);
+ else if (value instanceof Integer)
+ this.setSparseValue(instance,indexValues, attributeValues, numAttribute, (int)value);
+ else
+ throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name);
+ }
+ else if(isNominal)
+ {
+ double valueAttribute;
+
+ if (!(value instanceof EnumSymbol))
+ throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name);
+
+ EnumSymbol enumSymbolalue = (EnumSymbol)value;
+
+ String stringValue = enumSymbolalue.toString();
+
+ if (("?".equals(stringValue))||(stringValue==null)) {
+ valueAttribute = Double.NaN;
+ } else {
+ valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue);
+ }
+
+ this.setSparseValue(instance, indexValues, attributeValues, numAttribute, valueAttribute);
+ }
+ }
+
+ int[] arrayIndexValues = new int[attributeValues.size()];
+ double[] arrayAttributeValues = new double[attributeValues.size()];
+
+ for (int i = 0; i < arrayIndexValues.length; i++) {
+ arrayIndexValues[i] = indexValues.get(i).intValue();
+ arrayAttributeValues[i] = attributeValues.get(i).doubleValue();
+ }
+
+ instance.addSparseValues(arrayIndexValues, arrayAttributeValues,this.instanceInformation.numAttributes());
+ return instance;
+
+ }
+
+ /**
+ * Sets a Sparse Value in the corresponding attribute index
+ * @param instance is the Instance where values will be set
+ * @param indexValues is the list of Index values
+ * @param attributeValues is the list of Attribute values
+ * @param numAttribute is the index of the attribute
+ * @param valueAttribute is the value of the attribute for this Instance
+ */
+ private void setSparseValue(Instance instance, List<Integer> indexValues, List<Double> attributeValues, int numAttribute, double valueAttribute) {
+
+ if (this.instanceInformation.classIndex() == numAttribute) {
+ instance.setClassValue(valueAttribute);
+ } else {
+ indexValues.add(numAttribute);
+ attributeValues.add(valueAttribute);
+ }
+ }
+
+ /**
+ * Builds the Meta Data of from the Avro Schema
+ * @return
+ */
+ protected InstanceInformation getHeader() {
+
+ String relation = schema.getName();
+ attributes = new ArrayList<Attribute>();
+
+ /** By Definition, the returned list is in the order of their positions. **/
+ List<Schema.Field> fields = schema.getFields();
+
+ for (Field field : fields) {
+ Schema attributeSchema = field.schema();
+
+ /** Currently SAMOA supports only NOMINAL & Numeric Types.**/
+ if(attributeSchema.getType()==Schema.Type.ENUM)
+ {
+ List<String> attributeLabels = attributeSchema.getEnumSymbols();
+ attributes.add(new Attribute(field.name(), attributeLabels));
+ }
+ else
+ attributes.add(new Attribute(field.name()));
+ }
+ return new InstanceInformation(relation, attributes);
+ }
+
+ /**
+ * Identifies if the dataset is is Sparse or Dense
+ * @return boolean
+ */
+ protected boolean isSparseData()
+ {
+ List<Schema.Field> fields = schema.getFields();
+ for (Field field : fields) {
+ Schema attributeSchema = field.schema();
+
+ /** If even one attribute has a null union (nullable attribute) consider it as sparse data**/
+ if(attributeSchema.getType()==Schema.Type.UNION)
+ {
+ List<Schema> unionTypes = attributeSchema.getTypes();
+ for (Schema unionSchema : unionTypes) {
+ if(unionSchema.getType()==Schema.Type.NULL)
+ return true;
+ }
+ }
+
+ }
+ return false;
+ }
+
+ @Override
+ public InstanceInformation getStructure() {
+ return this.instanceInformation;
+ }
+
+ /** Error Messages to for all types of Avro Loaders */
+ protected static final String AVRO_LOADER_INVALID_TYPE_ERROR = "Invalid data type in the Avro data";
+ protected static final String AVRO_LOADER_SCHEMA_READ_ERROR = "Exception while reading the schema from Avro File";
+ protected static final String AVRO_LOADER_INSTANCE_READ_ERROR = "Exception while reading the Instance from Avro File.";
+}
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
index 556caaa..707d7f2 100644
--- a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
@@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.io.InputStream;
/**
*
@@ -45,8 +46,11 @@
* The instances.
*/
protected List<Instance> instances;
+
+ transient protected Loader loader;
+
- transient protected ArffLoader arff;
+ protected static enum AVRO_ENCODING_FORMAT{JSON,BINARY}
protected int classAttribute;
@@ -69,12 +73,24 @@
}
public Instances(Reader reader, int size, int classAttribute) {
- this.classAttribute = classAttribute;
- arff = new ArffLoader(reader, 0, classAttribute);
- this.instanceInformation = arff.getStructure();
- this.instances = new ArrayList<>();
+ this.classAttribute = classAttribute;
+ loader = new ArffLoader(reader, 0, classAttribute);
+ this.instanceInformation = loader.getStructure();
+ this.instances = new ArrayList<>();
}
+ public Instances(InputStream inputStream, int classAttribute, String encodingFormat) {
+ this.classAttribute = classAttribute;
+
+ if(encodingFormat.equalsIgnoreCase(AVRO_ENCODING_FORMAT.BINARY.toString()))
+ loader = new AvroBinaryLoader(inputStream, classAttribute);
+ else
+ loader = new AvroJsonLoader(inputStream, classAttribute);
+
+ this.instanceInformation = loader.getStructure();
+ this.instances = new ArrayList<>();
+ }
+
public Instances(Instances chunk, int capacity) {
this(chunk);
}
@@ -175,18 +191,22 @@
public boolean readInstance(Reader fileReader) {
- // ArffReader arff = new ArffReader(reader, this, m_Lines, 1);
- if (arff == null) {
- arff = new ArffLoader(fileReader, 0, this.classAttribute);
- }
- Instance inst = arff.readInstance(fileReader);
- if (inst != null) {
- inst.setDataset(this);
- add(inst);
- return true;
- } else {
- return false;
- }
+ if (loader == null) {
+ loader = new ArffLoader(fileReader, 0, this.classAttribute);
+ }
+ return readInstance() ;
+ }
+
+ public boolean readInstance() {
+
+ Instance inst = loader.readInstance();
+ if (inst != null) {
+ inst.setDataset(this);
+ add(inst);
+ return true;
+ } else {
+ return false;
+ }
}
public void delete() {
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java
new file mode 100644
index 0000000..f806bf5
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java
@@ -0,0 +1,45 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.Serializable;
+
+/**
+ * Loads Instances from streams of different types of Input Formats e.g ARFF & AVRO
+ * @author jayadeepj
+ */
+
+public interface Loader extends Serializable{
+
+ /**
+ * Fetch the Meta-data from the data
+ * @return InstanceInformation
+ */
+ public InstanceInformation getStructure();
+
+ /**
+ * Read a single instance from the Stream
+ * @return Instance
+ */
+ public Instance readInstance();
+
+}