SAMOA-58: Issue described in https://issues.apache.org/jira/browse/SAMOA-58 was apparently more complicated than what was expected in previous commit. While we did succeed in replacing the first exhausted file stream with a new one, the loader was not changed and would return null. This rework of AvroFileStream, FileStream and ArffFileStream hopefully cleans things up a bit and allows multi-file streams of either (Avro or Arff) type.
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
index 9f8a322..417eb2e 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
@@ -20,7 +20,9 @@
* #L%
*/
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import org.apache.samoa.instances.Instances;
import org.apache.samoa.moa.core.InstanceExample;
@@ -44,6 +46,7 @@
-1, -1, Integer.MAX_VALUE);
protected InstanceExample lastInstanceRead;
+ private BufferedReader fileReader;
@Override
public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) {
@@ -56,32 +59,39 @@
@Override
protected void reset() {
try {
- if (this.fileReader != null)
- this.fileReader.close();
-
fileSource.reset();
} catch (IOException ioe) {
throw new RuntimeException("FileStream restart failed.", ioe);
}
- if (!getNextFileReader()) {
+ if (!getNextFileStream()) {
hitEndOfStream = true;
throw new RuntimeException("FileStream is empty.");
}
}
@Override
- protected boolean getNextFileReader() {
- boolean ret = super.getNextFileReader();
- if (ret) {
- this.instances = new Instances(this.fileReader, 1, -1);
- if (this.classIndexOption.getValue() < 0) {
- this.instances.setClassIndex(this.instances.numAttributes() - 1);
- } else if (this.classIndexOption.getValue() > 0) {
- this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+ protected boolean getNextFileStream() {
+ if (this.fileReader != null)
+ try {
+ this.fileReader.close();
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
}
+
+ this.inputStream = this.fileSource.getNextInputStream();
+ if (inputStream == null)
+ return false;
+
+ this.fileReader = new BufferedReader(new InputStreamReader(this.inputStream));
+ this.instances = new Instances(this.fileReader, 1, -1);
+ if (this.classIndexOption.getValue() < 0) {
+ this.instances.setClassIndex(this.instances.numAttributes() - 1);
+ } else if (this.classIndexOption.getValue() > 0) {
+ this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
}
- return ret;
+
+ return true;
}
@Override
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java
index 5b4e755..59bf22b 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java
@@ -54,7 +54,7 @@
protected InstanceExample lastInstanceRead;
/** Represents the binary input stream of avro data **/
- protected transient InputStream inputStream = null;
+ //protected transient InputStream inputStream = null;
/** The extension to be considered for the files **/
private static final String AVRO_FILE_EXTENSION = "avro";
@@ -87,6 +87,7 @@
*
* @return
*/
+ @Override
protected boolean getNextFileStream() {
if (this.inputStream != null)
try {
@@ -97,8 +98,7 @@
}
this.inputStream = this.fileSource.getNextInputStream();
-
- if (this.inputStream == null)
+ if (inputStream == null)
return false;
this.instances = new Instances(this.inputStream, classIndexOption.getValue(), encodingFormatOption.getValue());
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java
index 2998b22..cfa8de5 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java
@@ -52,7 +52,8 @@
"LocalFileStreamSource");
protected transient FileStreamSource fileSource;
- protected transient Reader fileReader;
+ //protected transient Reader fileReader;
+ protected transient InputStream inputStream;
protected Instances instances;
protected boolean hitEndOfStream;
@@ -81,7 +82,7 @@
@Override
public boolean hasMoreInstances() {
if (this.hitEndOfStream) {
- if (getNextFileReader()) {
+ if (getNextFileStream()) {
this.hitEndOfStream = false;
return hasMoreInstances();
} else {
@@ -115,38 +116,18 @@
protected void reset() {
try {
- if (this.fileReader != null)
- this.fileReader.close();
-
fileSource.reset();
} catch (IOException ioe) {
throw new RuntimeException("FileStream restart failed.", ioe);
}
- if (!getNextFileReader()) {
+ if (!getNextFileStream()) {
hitEndOfStream = true;
throw new RuntimeException("FileStream is empty.");
}
-
- this.instances = new Instances(this.fileReader, 1, -1);
- this.instances.setClassIndex(this.instances.numAttributes() - 1);
}
- protected boolean getNextFileReader() {
- if (this.fileReader != null)
- try {
- this.fileReader.close();
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
-
- InputStream inputStream = this.fileSource.getNextInputStream();
- if (inputStream == null)
- return false;
-
- this.fileReader = new BufferedReader(new InputStreamReader(inputStream));
- return true;
- }
+ protected abstract boolean getNextFileStream();
protected boolean readNextInstanceFromStream() {
if (!hasStarted) {
@@ -158,7 +139,7 @@
if (readNextInstanceFromFile())
return true;
- if (!getNextFileReader()) {
+ if (!getNextFileStream()) {
this.hitEndOfStream = true;
return false;
}