SAMOA-47: Integrate Avro Streams with SAMOA
diff --git a/pom.xml b/pom.xml
index 41fc5bd..71b131f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@
         <storm.version>0.9.4</storm.version>
         <!-- storm 0.8.2 loads zookeeper classes with hardcoded names from 3.3 version-->
         <zookeeper.storm.version>3.4.6</zookeeper.storm.version>
+        <avro.version>1.7.7</avro.version>
     </properties>
 
     <dependencies>
diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
new file mode 100644
index 0000000..e684687
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
@@ -0,0 +1,171 @@
+package org.apache.samoa.moa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+import org.apache.samoa.streams.FileStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+
+/**
+ *  InstanceStream implementation to handle Apache Avro Files.
+ *  Handles both JSON & Binary encoded streams
+ *  
+ *  @author jayadeepj
+ *
+ */
+public class AvroFileStream extends FileStream {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class);
+
+	public FileOption avroFileOption = new FileOption("avroFile", 'f',"Avro File(s) to load.", null, null, false);
+	public IntOption classIndexOption = new IntOption("classIndex", 'c',"Class index of data. 0 for none or -1 for last attribute in file.",-1, -1, Integer.MAX_VALUE);
+	public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY");
+
+	/** Represents the last read Instance **/
+	protected InstanceExample lastInstanceRead;
+
+	/** Represents the binary input stream of avro data **/
+	protected transient InputStream inputStream = null;
+
+	/** The extension to be considered for the files **/
+	private static final String AVRO_FILE_EXTENSION = "avro";
+
+	/* (non-Javadoc)
+	 * @see org.apache.samoa.streams.FileStream#reset()
+	 * Reset the BINARY encoded Avro Stream & Close the file source
+	 */
+	@Override
+	protected void reset() {
+
+		try {
+			if (this.inputStream != null)
+				this.inputStream.close();
+
+			fileSource.reset();
+		} catch (IOException ioException) {
+			logger.error(AVRO_STREAM_FAILED_RESTART_ERROR+" : {}",ioException);
+			throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException);
+		}
+
+		if (!getNextFileStream()) {
+			hitEndOfStream = true;
+			throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR);
+		}
+	}
+
+
+	/**
+	 * Get next File Stream & set the class index read from the command line option
+	 * @return
+	 */
+	protected boolean getNextFileStream() {
+		if (this.inputStream != null)
+			try {
+				this.inputStream.close();
+			} catch (IOException ioException) {
+				logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException);
+				throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
+			}
+
+		this.inputStream = this.fileSource.getNextInputStream();
+
+		if (this.inputStream == null)
+			return false;
+
+		this.instances = new Instances(this.inputStream, classIndexOption.getValue(),encodingFormatOption.getValue());
+
+		if (this.classIndexOption.getValue() < 0) {
+			this.instances.setClassIndex(this.instances.numAttributes() - 1);
+		} else if (this.classIndexOption.getValue() > 0) {
+			this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+		}
+		return true;
+	}
+
+
+	/* (non-Javadoc)
+	 * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile()
+	 * Read next Instance from File. Return false if unable to read next Instance
+	 */
+	@Override
+	protected boolean readNextInstanceFromFile() {
+		try {
+			if (this.instances.readInstance()) {
+				this.lastInstanceRead = new InstanceExample(this.instances.instance(0));
+				this.instances.delete(); 
+				return true;
+			}
+			if (this.inputStream != null) {
+				this.inputStream.close();
+				this.inputStream = null;
+			}
+			return false;
+		} catch (IOException ioException) {
+			logger.error(AVRO_STREAM_FAILED_READ_ERROR+" : {}",ioException);
+			throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
+		}
+
+	}
+
+	@Override
+	public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) {
+		super.prepareForUseImpl(monitor, repository);
+		String filePath = this.avroFileOption.getFile().getAbsolutePath();
+		this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION);
+		this.lastInstanceRead = null;
+	}
+
+
+	/* (non-Javadoc)
+	 * @see org.apache.samoa.streams.FileStream#getLastInstanceRead()
+	 * Return the last read Instance
+	 */
+	@Override
+	protected InstanceExample getLastInstanceRead() {
+		return this.lastInstanceRead;
+	}
+
+
+	@Override
+	public void getDescription(StringBuilder sb, int indent) {
+		throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD);
+	}
+
+	/** Error Messages to for all types of Avro File Streams */
+	protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed."; 
+	protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty."; 
+	protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream.";
+	protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet.";	
+	
+}
diff --git a/samoa-instances/pom.xml b/samoa-instances/pom.xml
index ed24597..64ffcd8 100644
--- a/samoa-instances/pom.xml
+++ b/samoa-instances/pom.xml
@@ -34,4 +34,11 @@
     <artifactId>samoa</artifactId>
     <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<version>${avro.version}</version>
+		</dependency>
+	</dependencies>
 </project>
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
index 3d314f0..325d1b8 100644
--- a/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/ArffLoader.java
@@ -23,7 +23,6 @@
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.Reader;
-import java.io.Serializable;
 import java.io.StreamTokenizer;
 import java.util.ArrayList;
 import java.util.List;
@@ -33,7 +32,7 @@
 /**
  * @author abifet
  */
-public class ArffLoader implements Serializable {
+public class ArffLoader implements Loader {
 
   protected InstanceInformation instanceInformation;
 
@@ -393,4 +392,9 @@
       this.instanceInformation.setClassIndex(classAttribute - 1);
     }
   }
+
+  @Override
+  public Instance readInstance() {
+	  return readInstance(this.reader);
+  }
 }
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java
new file mode 100644
index 0000000..c3e32dc
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroBinaryLoader.java
@@ -0,0 +1,120 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load Data from Binary Avro Stream and parse to corresponding Dense & Parse Instances
+ * 
+ * @author jayadeepj
+ *
+ */
+public class AvroBinaryLoader extends AvroLoader {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger logger = LoggerFactory.getLogger(AvroBinaryLoader.class);
+
+	/** Avro Binary reader for an input stream **/
+	protected DataFileStream<GenericRecord> dataFileStream   = null;
+
+	public AvroBinaryLoader(InputStream inputStream,int classAttribute) {
+		super(classAttribute);
+		initializeSchema(inputStream);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream)
+	 */
+	@Override
+	public void initializeSchema(InputStream inputStream)
+	{
+		try {
+			this.datumReader = new GenericDatumReader<GenericRecord>();
+			this.dataFileStream =  new DataFileStream<GenericRecord>(inputStream, datumReader);
+			this.schema = dataFileStream.getSchema();
+
+			this.instanceInformation = getHeader();
+			this.isSparseData = isSparseData();
+
+			if (classAttribute < 0) {
+				this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1);
+			} else if (classAttribute > 0) {
+				this.instanceInformation.setClassIndex(classAttribute - 1);
+			}
+
+		} catch (IOException ioException) {
+			logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : {}",ioException);
+			throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException);
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.samoa.instances.AvroLoader#readInstance()
+	 */
+	@Override
+	public Instance readInstance() {
+
+		GenericRecord record = null;
+
+		try{
+			if (dataFileStream.hasNext()) {
+				record =(GenericRecord) dataFileStream.next();
+			}
+		} catch (Exception ioException) {
+			logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException);
+			throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+		}
+
+		if(record==null)
+		{
+			//closeReader();
+			return null;
+		}
+
+		if(isSparseData)
+			return readInstanceSparse(record);
+
+		return readInstanceDense(record);
+	}
+
+	/**
+	 * Close the Avro Data Stream
+	 */
+	private void closeReader()
+	{
+		if(dataFileStream !=null)
+			try {
+				dataFileStream.close();
+			} catch (IOException ioException) {
+				logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException);
+				throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+			}
+	}
+}
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java
new file mode 100644
index 0000000..827b507
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroJsonLoader.java
@@ -0,0 +1,133 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Load Data from JSON Avro Stream and parse to corresponding Dense & Parse Instances
+ * 
+ * @author jayadeepj
+ *
+ */
+public class AvroJsonLoader extends AvroLoader {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger logger = LoggerFactory.getLogger(AvroJsonLoader.class);
+
+	/** The Character reader for JSON read */
+	protected Reader reader  = null;
+
+	public AvroJsonLoader(InputStream inputStream, int classAttribute) {
+		super(classAttribute);
+		initializeSchema(inputStream);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.samoa.instances.AvroLoader#initializeSchema(java.io.InputStream)
+	 */
+	@Override
+	public void initializeSchema(InputStream inputStream)
+	{
+		String schemaString = null;
+		try {
+			this.reader = new BufferedReader(new InputStreamReader(inputStream));
+			schemaString = ((BufferedReader)this.reader).readLine();
+			this.schema = new Schema.Parser().parse(schemaString);
+			this.datumReader = new GenericDatumReader<GenericRecord>(schema);
+			this.instanceInformation = getHeader();
+			this.isSparseData = isSparseData();
+
+			if (classAttribute < 0) {
+				this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1);
+			} else if (classAttribute > 0) {
+				this.instanceInformation.setClassIndex(classAttribute - 1);
+			}
+
+		} catch (IOException ioException) {
+			logger.error(AVRO_LOADER_SCHEMA_READ_ERROR+" : {}",ioException);
+			throw new RuntimeException(AVRO_LOADER_SCHEMA_READ_ERROR+" : " + ioException);
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.samoa.instances.AvroLoader#readInstance()
+	 */
+	@Override
+	public Instance readInstance() {
+
+		String line = null;
+		Decoder decoder = null;
+		GenericRecord record = null;
+
+		try{
+			while ((line = ((BufferedReader)reader).readLine()) != null) {
+				if(line==null || line.trim().length()<=0)
+					continue;
+
+				decoder = DecoderFactory.get().jsonDecoder(schema, line);
+				record  = datumReader.read(null, decoder);
+				break;
+			}
+		} catch (IOException ioException) {
+			logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException);
+			throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+		}
+
+		if(record==null)
+		{
+			closeReader();
+			return null;
+		}
+
+		if(isSparseData)
+			return readInstanceSparse(record);
+
+		return readInstanceDense(record);
+	}
+
+	/**
+	 * Close the Avro Data Stream
+	 */
+	private void closeReader()
+	{
+		if(reader !=null)
+			try {
+				reader.close();
+			} catch (IOException ioException) {
+				logger.error(AVRO_LOADER_INSTANCE_READ_ERROR+" : {}",ioException);
+				throw new RuntimeException(AVRO_LOADER_INSTANCE_READ_ERROR+" : " + ioException);
+			}
+	}
+}
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java
new file mode 100644
index 0000000..09f410f
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/AvroLoader.java
@@ -0,0 +1,286 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData.EnumSymbol;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+
+/**
+ * Load Data from Avro Stream and parse to corresponding Dense & Parse Instances
+ * Abstract Class: Subclass this class for different types of Avro Encodings
+ * 
+ * @author jayadeepj
+ *
+ */
+public abstract class AvroLoader implements Loader {
+
+	private static final long serialVersionUID = 1L;
+
+	/** Representation of the Avro Schema for the Instances being read. Built from the first line in the data  */
+	protected Schema schema = null;
+
+	/**  Meta-data of the Instance */
+	protected InstanceInformation instanceInformation;
+
+	/** List of attributes in the data as read from the schema */
+	protected List<Attribute> attributes;
+
+	/** This variable is to check if the data stored is Sparse or Dense */
+	protected boolean isSparseData;
+
+	protected int classAttribute;
+
+	/** Datum Reader for Avro Data*/
+	public DatumReader<GenericRecord> datumReader = null;
+
+	public AvroLoader(int classAttribute) {
+		this.classAttribute = classAttribute;
+		this.isSparseData = false;
+	}
+
+	/** Intialize Avro Schema, Meta Data, InstanceInformation from Input Avro Stream */
+	public abstract void initializeSchema(InputStream inputStream);
+
+	/** Read a single SAMOA Instance from Input Avro Stream */
+	public abstract Instance readInstance();
+	
+	/**
+	 * Method to read Dense Instances from Avro File
+	 * @return Instance
+	 */
+	protected Instance readInstanceDense(GenericRecord record)
+	{
+		Instance instance = new DenseInstance(this.instanceInformation.numAttributes() + 1);
+		int numAttribute = 0;
+
+		for (Attribute attribute : attributes) {
+			Object value = record.get(attribute.name);
+
+			boolean isNumeric = attributes.get(numAttribute).isNumeric();
+			boolean isNominal = attributes.get(numAttribute).isNominal();
+
+			if(isNumeric)
+			{
+				if(value instanceof Double)	
+					this.setDenseValue(instance, numAttribute, (double)value);
+				else if (value instanceof Long)	
+					this.setDenseValue(instance, numAttribute, (long)value);
+				else if (value instanceof Integer)	
+					this.setDenseValue(instance, numAttribute, (int)value);
+				else
+					throw new RuntimeException("Invalid data type in the Avro data for Numeric Type : "+attribute.name);
+			}
+			else if(isNominal)
+			{
+				double valueAttribute;
+
+				if (!(value instanceof EnumSymbol))	
+					throw new RuntimeException("Invalid data type in the Avro data for Nominal Type : "+attribute.name);
+
+				EnumSymbol enumSymbolalue = (EnumSymbol)value;
+
+				String stringValue = enumSymbolalue.toString();
+
+				if (("?".equals(stringValue))||(stringValue==null)) {
+					valueAttribute = Double.NaN; 
+				} else {
+					valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue);
+				}
+
+				this.setDenseValue(instance, numAttribute, valueAttribute);
+			}
+			numAttribute++;
+		}
+
+		return (numAttribute > 0) ? instance : null;
+
+	}
+
+
+	/**
+	 * Sets a Dense Value in the corresponding attribute index
+	 * @param instance is the Instance where values will be set
+	 * @param numAttribute is the index of the attribute
+	 * @param valueAttribute is the value of the attribute for this Instance
+	 */
+
+	private void setDenseValue(Instance instance, int numAttribute, double valueAttribute) {
+
+		if (this.instanceInformation.classIndex() == numAttribute) 
+			instance.setClassValue(valueAttribute);
+		else 
+			instance.setValue(numAttribute, valueAttribute);
+	}
+
+	/**
+	 * Method to read Sparse Instances from Avro File
+	 * @return Instance
+	 */
+	protected Instance readInstanceSparse(GenericRecord record) {
+
+		Instance instance = new SparseInstance(1.0, null); 
+		int numAttribute = -1;
+		ArrayList<Double> attributeValues = new ArrayList<Double>();
+		List<Integer> indexValues = new ArrayList<Integer>();
+
+		for (Attribute attribute : attributes) {
+			numAttribute++;
+			Object value = record.get(attribute.name);
+
+			boolean isNumeric = attributes.get(numAttribute).isNumeric();
+			boolean isNominal = attributes.get(numAttribute).isNominal();
+
+			/** If value is empty/null iterate to the next attribute.**/
+			if(value==null)
+				continue;
+
+			if(isNumeric)
+			{
+				if(value instanceof Double)	
+					this.setSparseValue(instance,  indexValues, attributeValues, numAttribute, (double)value);
+				else if (value instanceof Long)	
+					this.setSparseValue(instance,indexValues, attributeValues, numAttribute, (long)value);
+				else if (value instanceof Integer)	
+					this.setSparseValue(instance,indexValues, attributeValues, numAttribute, (int)value);
+				else
+					throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name);
+			}
+			else if(isNominal)
+			{
+				double valueAttribute;
+
+				if (!(value instanceof EnumSymbol))	
+					throw new RuntimeException(AVRO_LOADER_INVALID_TYPE_ERROR+" : "+attribute.name);
+
+				EnumSymbol enumSymbolalue = (EnumSymbol)value;
+
+				String stringValue = enumSymbolalue.toString();
+
+				if (("?".equals(stringValue))||(stringValue==null)) {
+					valueAttribute = Double.NaN; 
+				} else {
+					valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(stringValue);
+				}
+
+				this.setSparseValue(instance, indexValues, attributeValues, numAttribute, valueAttribute);
+			}
+		}
+
+		int[] arrayIndexValues = new int[attributeValues.size()];
+		double[] arrayAttributeValues = new double[attributeValues.size()];
+		
+		for (int i = 0; i < arrayIndexValues.length; i++) {
+			arrayIndexValues[i] = indexValues.get(i).intValue();
+			arrayAttributeValues[i] = attributeValues.get(i).doubleValue();
+		}
+		
+		instance.addSparseValues(arrayIndexValues, arrayAttributeValues,this.instanceInformation.numAttributes());
+		return instance;
+
+	}
+
+	/**
+	 * Sets a Sparse Value in the corresponding attribute index
+	 * @param instance  is the Instance where values will be set
+	 * @param indexValues is the list of Index values
+	 * @param attributeValues is the list of Attribute values
+	 * @param numAttribute is the index of the attribute
+	 * @param valueAttribute is the value of the attribute for this Instance
+	 */
+	private void setSparseValue(Instance instance, List<Integer> indexValues, List<Double> attributeValues, int numAttribute, double valueAttribute) {
+	
+		if (this.instanceInformation.classIndex() == numAttribute) {
+			instance.setClassValue(valueAttribute);
+		} else {
+			indexValues.add(numAttribute);
+			attributeValues.add(valueAttribute);
+		}
+	}
+
+	/**
+	 * Builds the Meta Data of from the Avro Schema
+	 * @return
+	 */
+	protected InstanceInformation getHeader() {
+
+		String relation = schema.getName();
+		attributes = new ArrayList<Attribute>();
+
+		/** By Definition, the returned list is in the order of their positions. **/
+		List<Schema.Field> fields = schema.getFields();
+
+		for (Field field : fields) {
+			Schema attributeSchema = field.schema();
+			
+			/** Currently SAMOA supports only NOMINAL & Numeric Types.**/
+			if(attributeSchema.getType()==Schema.Type.ENUM)
+			{
+				List<String> attributeLabels = attributeSchema.getEnumSymbols();
+				attributes.add(new Attribute(field.name(), attributeLabels));
+			}
+			else
+				attributes.add(new Attribute(field.name()));
+		}
+		return new InstanceInformation(relation, attributes);
+	}
+
+	/**
+	 * Identifies if the dataset is is Sparse or Dense
+	 * @return boolean
+	 */
+	protected boolean isSparseData()
+	{
+		List<Schema.Field> fields = schema.getFields();
+		for (Field field : fields) {
+			Schema attributeSchema = field.schema(); 
+
+			/** If even one attribute has a null union (nullable attribute) consider it as sparse data**/
+			if(attributeSchema.getType()==Schema.Type.UNION)
+			{
+				List<Schema> unionTypes = attributeSchema.getTypes();
+				for (Schema unionSchema : unionTypes) {
+					if(unionSchema.getType()==Schema.Type.NULL)
+						return true;
+				}
+			}
+
+		}
+		return false;
+	}
+	
+	@Override
+	public InstanceInformation getStructure() {
+		return this.instanceInformation;
+	}
+
+	/** Error Messages to for all types of Avro Loaders */
+	protected static final String AVRO_LOADER_INVALID_TYPE_ERROR = "Invalid data type in the Avro data"; 
+	protected static final String AVRO_LOADER_SCHEMA_READ_ERROR = "Exception while reading the schema from Avro File"; 
+	protected static final String AVRO_LOADER_INSTANCE_READ_ERROR = "Exception while reading the Instance from Avro File.";
+}
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
index 556caaa..707d7f2 100644
--- a/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Instances.java
@@ -30,6 +30,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.io.InputStream;
 
 /**
  * 
@@ -45,8 +46,11 @@
    * The instances.
    */
   protected List<Instance> instances;
+  
+  transient protected Loader loader;
+  
 
-  transient protected ArffLoader arff;
+  protected static enum AVRO_ENCODING_FORMAT{JSON,BINARY}
 
   protected int classAttribute;
 
@@ -69,12 +73,24 @@
   }
 
   public Instances(Reader reader, int size, int classAttribute) {
-    this.classAttribute = classAttribute;
-    arff = new ArffLoader(reader, 0, classAttribute);
-    this.instanceInformation = arff.getStructure();
-    this.instances = new ArrayList<>();
+	  this.classAttribute = classAttribute;
+	  loader = new ArffLoader(reader, 0, classAttribute);
+	  this.instanceInformation = loader.getStructure();
+	  this.instances = new ArrayList<>();
   }
 
+  public Instances(InputStream inputStream, int classAttribute, String encodingFormat) {
+	  this.classAttribute = classAttribute;
+
+	  if(encodingFormat.equalsIgnoreCase(AVRO_ENCODING_FORMAT.BINARY.toString()))
+		  loader = new AvroBinaryLoader(inputStream, classAttribute);
+	  else
+		  loader = new AvroJsonLoader(inputStream, classAttribute);
+
+	  this.instanceInformation = loader.getStructure();
+	  this.instances = new ArrayList<>();
+  }
+	
   public Instances(Instances chunk, int capacity) {
     this(chunk);
   }
@@ -175,18 +191,22 @@
 
   public boolean readInstance(Reader fileReader) {
 
-    // ArffReader arff = new ArffReader(reader, this, m_Lines, 1);
-    if (arff == null) {
-      arff = new ArffLoader(fileReader, 0, this.classAttribute);
-    }
-    Instance inst = arff.readInstance(fileReader);
-    if (inst != null) {
-      inst.setDataset(this);
-      add(inst);
-      return true;
-    } else {
-      return false;
-    }
+	  if (loader == null) {
+		  loader = new ArffLoader(fileReader, 0, this.classAttribute);
+	  }
+	  return readInstance() ;
+  }
+
+  public boolean readInstance() {
+
+	  Instance inst = loader.readInstance();
+	  if (inst != null) {
+		  inst.setDataset(this);
+		  add(inst);
+		  return true;
+	  } else {
+		  return false;
+	  }
   }
 
   public void delete() {
diff --git a/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java
new file mode 100644
index 0000000..f806bf5
--- /dev/null
+++ b/samoa-instances/src/main/java/org/apache/samoa/instances/Loader.java
@@ -0,0 +1,45 @@
+package org.apache.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.Serializable;
+
+/**
+ * Loads Instances from streams of different types of Input Formats e.g ARFF & AVRO
+ * @author jayadeepj
+ */
+
+public interface Loader extends Serializable{
+
+	/**
+	 * Fetch the Meta-data from the data 
+	 * @return InstanceInformation
+	 */
+	public InstanceInformation getStructure();
+
+	/**
+	 * Read a single instance from the Stream
+	 * @return Instance
+	 */
+	public Instance readInstance(); 
+
+}