Fixes #813 made SimpleConfiguration serializable
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 801a025..53a4819 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -37,6 +37,8 @@
  */
 public class FluoConfiguration extends SimpleConfiguration {
 
+  private static final long serialVersionUID = 1L;
+
   private static final Logger log = LoggerFactory.getLogger(FluoConfiguration.class);
 
   public static final String FLUO_PREFIX = "fluo";
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
index 9877b18..545f761 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
@@ -15,9 +15,15 @@
 
 package org.apache.fluo.api.config;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -41,17 +47,35 @@
  * @since 1.0.0
  */
 
-public class SimpleConfiguration {
+public class SimpleConfiguration implements Serializable {
 
-  private Configuration internalConfig;
+  private static final long serialVersionUID = 1L;
 
-  public SimpleConfiguration() {
+  private transient Configuration internalConfig;
+
+  private void init() {
     CompositeConfiguration compositeConfig = new CompositeConfiguration();
     compositeConfig.setThrowExceptionOnMissing(true);
     compositeConfig.setDelimiterParsingDisabled(true);
     internalConfig = compositeConfig;
   }
 
+  private void load(InputStream in) {
+    try {
+      PropertiesConfiguration config = new PropertiesConfiguration();
+      // disabled to prevent accumulo classpath value from being shortened
+      config.setDelimiterParsingDisabled(true);
+      config.load(in);
+      ((CompositeConfiguration) internalConfig).addConfiguration(config);
+    } catch (ConfigurationException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  public SimpleConfiguration() {
+    init();
+  }
+
   private SimpleConfiguration(Configuration subset) {
     this.internalConfig = subset;
   }
@@ -77,15 +101,7 @@
    */
   public SimpleConfiguration(InputStream in) {
     this();
-    try {
-      PropertiesConfiguration config = new PropertiesConfiguration();
-      // disabled to prevent accumulo classpath value from being shortened
-      config.setDelimiterParsingDisabled(true);
-      config.load(in);
-      ((CompositeConfiguration) internalConfig).addConfiguration(config);
-    } catch (ConfigurationException e) {
-      throw new IllegalArgumentException(e);
-    }
+    load(in);
   }
 
   /**
@@ -237,4 +253,32 @@
 
     return builder.build();
   }
+
+  /*
+   * These custom serialization methods were added because commons config does not support
+   * serialization.
+   */
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    out.defaultWriteObject();
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    save(baos);
+
+    byte[] data = baos.toByteArray();
+
+    out.writeInt(data.length);
+    out.write(data);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    init();
+
+    int len = in.readInt();
+    byte[] data = new byte[len];
+    in.readFully(data);
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(data);
+    load(bais);
+  }
 }
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index f7d7c97..38705db 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -15,11 +15,16 @@
 
 package org.apache.fluo.api.config;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
 import com.google.common.collect.ImmutableMap;
@@ -357,4 +362,37 @@
       }
     }
   }
+
+  @Test
+  public void testSerialization() throws Exception {
+    FluoConfiguration c1 = new FluoConfiguration();
+    c1.setAccumuloUser("fluo");
+    c1.setAccumuloPassword("fc683cd9");
+    c1.setAccumuloTable("fd1");
+    c1.setApplicationName("testS");
+    c1.setAccumuloInstance("I9");
+    c1.setAccumuloZookeepers("localhost:7171");
+    c1.setInstanceZookeepers("localhost:7171/testS");
+    c1.setWorkerThreads(100);
+    c1.addObserver(new ObserverSpecification("com.foo.Observer1"));
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream oo = new ObjectOutputStream(baos);
+    oo.writeObject(c1);
+    // want to ensure data written after the config is not read by custom deserialization
+    oo.writeObject("testdata");
+    oo.close();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    ObjectInputStream in = new ObjectInputStream(bais);
+
+    FluoConfiguration c2 = (FluoConfiguration) in.readObject();
+    Map<String, String> m2 = c2.toMap();
+    Assert.assertEquals(c1.toMap(), c2.toMap());
+    Assert.assertEquals(9, m2.size());
+
+    Assert.assertEquals("testdata", in.readObject());
+
+    in.close();
+  }
 }