fixes #816 introduced better way to setup observers and deprecated old
diff --git a/docs/applications.md b/docs/applications.md
index 5685381..cbf12ba 100644
--- a/docs/applications.md
+++ b/docs/applications.md
@@ -100,12 +100,12 @@
 
 To create an observer, follow these steps:
 
-1. Create a class that extends [AbstractObserver] like the example below. Please use [slf4j] for
+1. Create one or more classes that extend [Observer] like the example below. Please use [slf4j] for
    any logging in observers as [slf4j] supports multiple logging implementations. This is
    necessary as Fluo applications have a hard requirement on [logback] when running in YARN.
 
     ```java
-    public class InvertObserver extends AbstractObserver {
+    public class InvertObserver implements Observer {
 
       @Override
       public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
@@ -114,25 +114,45 @@
         // invert row and value
         tx.set(value, new Column("inv", "data"), row);
       }
+    }
+    ```
+2.  Create a class that implements [ObserverProvider] like the example below.  The purpose of this
+    class is associate a set Observers with columns that trigger the observers.  The class can
+    create multiple observers.
 
+    ```java
+    class AppObserverProvider implements ObserverProvider {
       @Override
-      public ObservedColumn getObservedColumn() {
-        return new ObservedColumn(new Column("obs", "data"), NotificationType.STRONG);
+      public void provide(Registry or, Context ctx) {
+        //setup InvertObserver to be triggered when the column obs:data is modified
+        or.register(new Column("obs", "data"),
+                           NotificationType.STRONG,
+                           new InvertObserver());
+        
+        //Observer is a Functional interface.  So Obsevers can be written as lambdas.
+        or.register(new Column("new","data"),
+                           NotificationType.WEAK,
+                           (tx,row,col) -> { 
+                             Bytes combined = combineNewAndOld(tx,row);
+                             tx.set(row, new Column("current","data"), combined);
+                           });
       }
     }
     ```
-2.  Build a jar containing this class and include this jar in the `lib/` directory of your Fluo
+
+3.  Build a jar containing thses classes and include this jar in the `lib/` directory of your Fluo
     application.
-3.  Configure your Fluo instance to use this observer by modifying the Observer section of
+4.  Configure your Fluo instance to use this observer provider by modifying the Observer section of
     [fluo.properties].
-4.  Restart your Fluo instance so that your Fluo workers load the new observer.
+5.  Initialize Fluo.  During initialization Fluo will obtain the observed columns from the 
+    ObserverProvider and persist the columns in Zookeeper.  These columns persisted in Zookeeper
+    are used by transactions to know when to trigger observers.
+6.  Start your Fluo instance so that your Fluo workers load the new observer.
 
 ## Application Configuration
 
-Each observer can have its own configuration. This is useful for the case of using the same
-observer code w/ different parameters. However for the case of sharing the same configuration
-across observers, fluo provides a simple mechanism to set and access application specific
-configuration. See the javadoc on [FluoClient].getAppConfiguration() for more details.
+For configuring observers, fluo provides a simple mechanism to set and access application specific
+configuration.  See the javadoc on [FluoClient].getAppConfiguration() for more details.
 
 ## Debugging Applications
 
@@ -195,7 +215,8 @@
 [FluoFactory]: ../modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
 [FluoClient]: ../modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
 [FluoConfiguration]: ../modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
-[AbstractObserver]: ../modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
+[Observer]: ../modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
+[ObserverProvider]: ../modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
 [fluo.properties]: ../modules/distribution/src/main/config/fluo.properties
 [API]: https://fluo.apache.org/apidocs/
 [metrics]: metrics.md
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
index aa6039e..8c7f1d2 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
@@ -25,7 +25,9 @@
   public static final String CONFIG_ACCUMULO_INSTANCE_NAME = CONFIG + "/accumulo.instance.name";
   public static final String CONFIG_ACCUMULO_INSTANCE_ID = CONFIG + "/accumulo.instance.id";
   public static final String CONFIG_FLUO_APPLICATION_ID = CONFIG + "/fluo.application.id";
-  public static final String CONFIG_FLUO_OBSERVERS = CONFIG + "/fluo.observers";
+  @Deprecated
+  public static final String CONFIG_FLUO_OBSERVERS1 = CONFIG + "/fluo.observers";
+  public static final String CONFIG_FLUO_OBSERVERS2 = CONFIG + "/fluo.observers2";
   public static final String CONFIG_SHARED = CONFIG + "/shared.config";
 
   public static final String ORACLE = "/oracle";
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java b/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
index 140351e..fcbe4db 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
@@ -15,8 +15,10 @@
 
 package org.apache.fluo.api.client;
 
+import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
 
 /**
  * Client interface for Fluo. Fluo clients will have shared resources used by all objects created by
@@ -63,6 +65,8 @@
    *         keeping config files consistent across a cluster. To update this configuration, use
    *         {@link FluoAdmin#updateSharedConfig()}. Changes made to the returned Configuration will
    *         not update Zookeeper.
+   * @see FluoConfiguration#getAppConfiguration()
+   * @see Context#getAppConfiguration()
    */
   SimpleConfiguration getAppConfiguration();
 
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 53a4819..c759dbb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -27,6 +27,8 @@
 
 import com.google.common.base.Preconditions;
 import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,8 +88,19 @@
 
   /** The properties below get loaded into/from Zookeeper */
   // Observer
+  @Deprecated
   public static final String OBSERVER_PREFIX = FLUO_PREFIX + ".observer.";
 
+  /**
+   * @since 1.1.0
+   */
+  public static final String OBSERVER_PROVIDER = FLUO_PREFIX + ".observer.provider";
+
+  /**
+   * @since 1.1.0
+   */
+  public static final String OBSERVER_PROVIDER_DEFAULT = "";
+
   // Transaction
   public static final String TRANSACTION_PREFIX = FLUO_PREFIX + ".tx";
   public static final String TRANSACTION_ROLLBACK_TIME_PROP = TRANSACTION_PREFIX + ".rollback.time";
@@ -281,6 +294,11 @@
     return getPositiveInt(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
   }
 
+  /**
+   * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+   *             {@link #getObserverProvider()}
+   */
+  @Deprecated
   public List<ObserverSpecification> getObserverSpecifications() {
 
     List<ObserverSpecification> configList = new ArrayList<>();
@@ -344,6 +362,37 @@
     return max + 1;
   }
 
+  /**
+   * Configure the observer provider that Fluo workers will use.
+   *
+   * @since 1.1.0
+   *
+   * @param className Name of a class that implements {@link ObserverProvider}. Must be non-null and
+   *        non-empty.
+   */
+  public void setObserverProvider(String className) {
+    setNonEmptyString(OBSERVER_PROVIDER, className);
+  }
+
+  /**
+   * Calls {@link #setObserverProvider(String)} with the class name.
+   *
+   * @since 1.1.0
+   */
+  public void setObserverProvider(Class<? extends ObserverProvider> clazz) {
+    setObserverProvider(clazz.getName());
+  }
+
+  /**
+   * @return The configured {@link ObserverProvider} class name. If one was not configured, returns
+   *         {@value #OBSERVER_PROVIDER_DEFAULT}
+   * @since 1.1.0
+   */
+  public String getObserverProvider() {
+    return getString(OBSERVER_PROVIDER, OBSERVER_PROVIDER_DEFAULT);
+  }
+
+  @Deprecated
   private void addObserver(ObserverSpecification oconf, int next) {
     Map<String, String> params = oconf.getConfiguration().toMap();
     StringBuilder paramString = new StringBuilder();
@@ -359,7 +408,11 @@
   /**
    * Adds an {@link ObserverSpecification} to the configuration using a unique integer prefix thats
    * not currently in use.
+   *
+   * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+   *             {@link #getObserverProvider()}
    */
+  @Deprecated
   public FluoConfiguration addObserver(ObserverSpecification oconf) {
     int next = getNextObserverId();
     addObserver(oconf, next);
@@ -368,7 +421,11 @@
 
   /**
    * Adds multiple observers using unique integer prefixes for each.
+   *
+   * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+   *             {@link #getObserverProvider()}
    */
+  @Deprecated
   public FluoConfiguration addObservers(Iterable<ObserverSpecification> observers) {
     int next = getNextObserverId();
     for (ObserverSpecification oconf : observers) {
@@ -379,7 +436,11 @@
 
   /**
    * Removes any configured observers.
+   *
+   * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+   *             {@link #getObserverProvider()}
    */
+  @Deprecated
   public FluoConfiguration clearObservers() {
     Iterator<String> iter1 = getKeys(OBSERVER_PREFIX.substring(0, OBSERVER_PREFIX.length() - 1));
     while (iter1.hasNext()) {
@@ -429,7 +490,7 @@
    *         to subset will be reflected in this configuration, but with the prefix added. This
    *         method is useful for setting application configuration before initialization. For
    *         reading application configuration after initialization, see
-   *         {@link FluoClient#getAppConfiguration()}
+   *         {@link FluoClient#getAppConfiguration()} and {@link Context#getAppConfiguration()}
    */
   public SimpleConfiguration getAppConfiguration() {
     return subset(APP_PREFIX);
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
index 76829c9..9ebb47b 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
@@ -26,7 +26,10 @@
  * {@link FluoConfiguration#addObserver(ObserverSpecification)}.
  *
  * @since 1.0.0
+ * @deprecated since 1.1.0. The methods that used this class in {@link FluoConfiguration} were
+ *             deprecated.
  */
+@Deprecated
 public class ObserverSpecification {
   private final String className;
   private final Map<String, String> configMap;
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
index 077eac7..59c4321 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
@@ -58,7 +58,7 @@
   private final int offset;
   private final int length;
 
-  private WeakReference<String> utf8String;
+  private transient WeakReference<String> utf8String;
 
   public static final Bytes EMPTY = new Bytes(new byte[0]);
 
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java b/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
index 24505a5..6c191f9 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
@@ -26,7 +26,11 @@
  * user.
  *
  * @since 1.0.0
+ * @deprecated since 1.1.0. This class was deprecated for two reasons. First the methods its
+ *             overrides were deprecated. Second, the methods it overrides were made into Java 8
+ *             default methods.
  */
+@Deprecated
 public abstract class AbstractObserver implements Observer {
 
   @Override
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
index 381784a..21f4a2a 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
@@ -17,6 +17,7 @@
 
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -24,13 +25,15 @@
 
 /**
  * Implemented by users to a watch a {@link Column} and be notified of changes to the Column via the
- * {@link #process(TransactionBase, Bytes, Column)} method. An observer is created for each worker
- * thread and reused for the lifetime of a worker thread. Consider extending
- * {@link AbstractObserver} as it will let you optionally implement {@link #init(Context)} and
- * {@link #close()}. The abstract class will also shield you from the addition of interface methods.
+ * {@link #process(TransactionBase, Bytes, Column)} method.
+ * 
+ * <p>
+ * In Fluo version 1.1.0 this was converted to a functional interface. This change along with the
+ * introduction of {@link ObserverProvider} allows Observers to be written as lambdas.
  *
  * @since 1.0.0
  */
+@FunctionalInterface
 public interface Observer {
 
   /**
@@ -44,7 +47,9 @@
    * A {@link Column} and {@link NotificationType} pair
    *
    * @since 1.0.0
+   * @deprecated since 1.1.0. The method that used this class was deprecated.
    */
+  @Deprecated
   class ObservedColumn {
     private final Column col;
     private final NotificationType notificationType;
@@ -61,11 +66,22 @@
     public NotificationType getType() {
       return notificationType;
     }
+
+    /**
+     * @since 1.1.0
+     */
+    @Override
+    public String toString() {
+      return col + " " + notificationType;
+    }
   }
 
   /**
    * @since 1.0.0
+   *
+   * @deprecated since 1.1.0. The method that used this interface was deprecated.
    */
+  @Deprecated
   interface Context {
     /**
      * @return A configuration object with application configuration like that returned by
@@ -88,8 +104,14 @@
    * Implemented by user to initialize Observer.
    *
    * @param context Observer context
+   *
+   * @deprecated since 1.1.0. Fluo will no longer call this method when observers are configured by
+   *             {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+   *             observers are configured the old way by
+   *             {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
    */
-  void init(Context context) throws Exception;
+  @Deprecated
+  default void init(Context context) throws Exception {}
 
   /**
    * Implemented by users to process notifications on a {@link ObservedColumn}. If a notification
@@ -107,11 +129,25 @@
    * then an exception will be thrown. It is safe to assume that {@link #init(Context)} will be
    * called before this method. If the return value of the method is derived from what is passed to
    * {@link #init(Context)}, then the derivation process should be deterministic.
+   *
+   * @deprecated since 1.1.0 Fluo will no longer call this method when observers are configured by
+   *             {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+   *             observers are configured the old way by
+   *             {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
    */
-  ObservedColumn getObservedColumn();
+  @Deprecated
+  default ObservedColumn getObservedColumn() {
+    throw new UnsupportedOperationException();
+  }
 
   /**
    * Implemented by user to close resources used by Observer
+   *
+   * @deprecated since 1.1.0. Fluo will no longer call this method when observers are configured by
+   *             {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+   *             observers are configured the old way by
+   *             {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
    */
-  void close();
+  @Deprecated
+  default void close() {}
 }
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
new file mode 100644
index 0000000..c172268
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.observer;
+
+import java.util.function.BiConsumer;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+/**
+ * Fluo Workers use this class to register {@link Observer}s to process notifications.
+ * Implementations of this class should register zero or more {@link Observer}s.
+ *
+ * <p>
+ * When Fluo is initialized {@link #provideColumns(BiConsumer, Context)} is called. The columns it
+ * registers are stored in Zookeeper. Transactions will use the columns stored in Zookeeper to
+ * determine when to set notifications. When Workers call {@link #provide(Registry, Context)}, the
+ * columns registered must be the same as those registered during initialization. If this is not the
+ * case, then the worker will fail to start.
+ *
+ * @see FluoConfiguration#setObserverProvider(String)
+ * @since 1.1.0
+ */
+public interface ObserverProvider {
+
+  /**
+   * @since 1.1.0
+   */
+  interface Context {
+    /**
+     * @return A configuration object with application configuration like that returned by
+     *         {@link FluoClient#getAppConfiguration()}
+     */
+    SimpleConfiguration getAppConfiguration();
+
+    /**
+     * @return A {@link MetricsReporter} to report application metrics from observers.
+     */
+    MetricsReporter getMetricsReporter();
+  }
+
+  /**
+   * Observers are registered with the worker using this interface. Registering an {@link Observer}s
+   * relates it to the columns that trigger it.
+   *
+   * @since 1.1.0
+   */
+  interface Registry {
+    void register(Column observedColumn, NotificationType ntfyType, Observer observer);
+
+    /**
+     * This method was created to allow Observers written as lambda to be passed {@link String}
+     * instead of {@link Bytes} for the row.
+     * 
+     * <pre>
+     * <code>
+     *   void provide(ObserverRegistry or, Context ctx) {
+     *     or.registers(someColumn, WEAK, (tx,row,col) -> {
+     *      //row is of type String
+     *     };
+     *   }
+     * </code>
+     * </pre>
+     */
+    void registers(Column observedColumn, NotificationType ntfyType, StringObserver observer);
+  }
+
+  /**
+   * This is method is called by Fluo Workers to register observers to process notifications.
+   *
+   * <p>
+   * Observers registered may be called concurrently by multiple threads to process different
+   * notifications. Observers should be tolerant of this.
+   *
+   * @param or Register observers with this.
+   */
+  void provide(Registry or, Context ctx);
+
+  /**
+   * Called during Fluo initialization to determine what columns are being observed. The default
+   * implementation of this method calls {@link #provide(Registry, Context)} and ignores Observers.
+   *
+   * @param colRegistry register all observed columns with this consumer
+   */
+  default void provideColumns(BiConsumer<Column, NotificationType> colRegistry, Context ctx) {
+    Registry or = new Registry() {
+      @Override
+      public void registers(Column oc, NotificationType nt, StringObserver obs) {
+        colRegistry.accept(oc, nt);
+      }
+
+      @Override
+      public void register(Column oc, NotificationType nt, Observer obs) {
+        colRegistry.accept(oc, nt);
+      }
+    };
+
+    provide(or, ctx);
+  }
+}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java b/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java
new file mode 100644
index 0000000..f57a98e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.observer;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * @since 1.1.0
+ */
+@FunctionalInterface
+public interface StringObserver extends Observer {
+
+  @Override
+  default void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+    process(tx, row.toString(), col);
+  }
+
+  abstract void process(TransactionBase tx, String row, Column col) throws Exception;
+}
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index 38705db..3e36ba1 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -209,6 +209,7 @@
     Assert.assertEquals("33", config.getReporterConfiguration("jmx").getString("frequency"));
   }
 
+  @SuppressWarnings("deprecation")
   private void assertIAE(String value) {
     FluoConfiguration config = new FluoConfiguration();
     try {
@@ -219,6 +220,7 @@
     }
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void testObserverConfig() {
     FluoConfiguration config = new FluoConfiguration();
@@ -247,6 +249,7 @@
     Assert.assertEquals(0, ocList.get(0).getConfiguration().toMap().size());
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void testObserverConfig2() {
     FluoConfiguration config = new FluoConfiguration();
@@ -374,7 +377,7 @@
     c1.setAccumuloZookeepers("localhost:7171");
     c1.setInstanceZookeepers("localhost:7171/testS");
     c1.setWorkerThreads(100);
-    c1.addObserver(new ObserverSpecification("com.foo.Observer1"));
+    c1.setObserverProvider("com.foo.MyObserverProvider");
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     ObjectOutputStream oo = new ObjectOutputStream(baos);
@@ -395,4 +398,22 @@
 
     in.close();
   }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullObserverProvider() {
+    FluoConfiguration fc = new FluoConfiguration();
+    fc.setObserverProvider((String) null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyObserverProvider() {
+    FluoConfiguration fc = new FluoConfiguration();
+    fc.setObserverProvider("");
+  }
+
+  @Test
+  public void testNoObserverProvider() {
+    FluoConfiguration fc = new FluoConfiguration();
+    Assert.assertEquals("", fc.getObserverProvider());
+  }
 }
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index de20ac5..d6784a4 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -26,6 +26,10 @@
   <description>The modules contains the core implementation code of Apache Fluo.</description>
   <dependencies>
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 8d059a5..62914e0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -15,6 +15,7 @@
 
 package org.apache.fluo.core.client;
 
+import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -38,17 +39,11 @@
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.api.observer.Observer.NotificationType;
-import org.apache.fluo.api.observer.Observer.ObservedColumn;
+import org.apache.fluo.core.observer.ObserverUtil;
 import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.ByteUtil;
 import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.fluo.core.worker.ObserverContext;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -208,42 +203,6 @@
   @Override
   public void updateSharedConfig() {
 
-    logger.info("Setting up observers using app config: {}", config.getAppConfiguration());
-
-    Map<Column, ObserverSpecification> colObservers = new HashMap<>();
-    Map<Column, ObserverSpecification> weakObservers = new HashMap<>();
-    for (ObserverSpecification ospec : config.getObserverSpecifications()) {
-
-      Observer observer;
-      try {
-        observer = Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance();
-      } catch (ClassNotFoundException e1) {
-        throw new FluoException("Observer class '" + ospec.getClassName() + "' was not "
-            + "found.  Check for class name misspellings or failure to include "
-            + "the observer jar.", e1);
-      } catch (InstantiationException | IllegalAccessException e2) {
-        throw new FluoException("Observer class '" + ospec.getClassName()
-            + "' could not be created.", e2);
-      }
-
-      SimpleConfiguration oc = ospec.getConfiguration();
-      logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(),
-          oc.toMap());
-      try {
-        observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), oc));
-      } catch (Exception e) {
-        throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized",
-            e);
-      }
-
-      ObservedColumn observedCol = observer.getObservedColumn();
-      if (observedCol.getType() == NotificationType.STRONG) {
-        colObservers.put(observedCol.getColumn(), ospec);
-      } else {
-        weakObservers.put(observedCol.getColumn(), ospec);
-      }
-    }
-
     Properties sharedProps = new Properties();
     Iterator<String> iter = config.getKeys();
     while (iter.hasNext()) {
@@ -257,8 +216,13 @@
 
     try {
       CuratorFramework curator = getAppCurator();
-      Operations.updateObservers(curator, colObservers, weakObservers);
-      Operations.updateSharedConfig(curator, sharedProps);
+      ObserverUtil.initialize(curator, config);
+
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      sharedProps.store(baos, "Shared java props");
+
+      CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
+          CuratorUtil.NodeExistsPolicy.OVERWRITE);
     } catch (Exception e) {
       throw new FluoException("Failed to update shared configuration in Zookeeper", e);
     }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java b/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java
deleted file mode 100644
index 16851a6..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.client;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.fluo.accumulo.util.ZookeeperPath;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.core.util.ColumnUtil;
-import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility methods for initializing Zookeeper and Accumulo
- */
-public class Operations {
-
-  private Operations() {}
-
-  private static final Logger logger = LoggerFactory.getLogger(Operations.class);
-
-  // TODO refactor all method in this class to take a properties object... if so the prop keys would
-  // need to be public
-
-  public static void updateSharedConfig(CuratorFramework curator, Properties sharedProps)
-      throws Exception {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    sharedProps.store(baos, "Shared java props");
-
-    CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
-        CuratorUtil.NodeExistsPolicy.OVERWRITE);
-  }
-
-  public static void updateObservers(CuratorFramework curator,
-      Map<Column, ObserverSpecification> colObservers,
-      Map<Column, ObserverSpecification> weakObservers) throws Exception {
-
-    // TODO check that no workers are running... or make workers watch this znode
-
-    String observerPath = ZookeeperPath.CONFIG_FLUO_OBSERVERS;
-    try {
-      curator.delete().deletingChildrenIfNeeded().forPath(observerPath);
-    } catch (NoNodeException nne) {
-      // it's ok if node doesn't exist
-    } catch (Exception e) {
-      logger.error("An error occurred deleting Zookeeper node. node=[" + observerPath
-          + "], error=[" + e.getMessage() + "]");
-      throw new RuntimeException(e);
-    }
-
-    byte[] serializedObservers = serializeObservers(colObservers, weakObservers);
-    CuratorUtil.putData(curator, observerPath, serializedObservers,
-        CuratorUtil.NodeExistsPolicy.OVERWRITE);
-  }
-
-  private static void serializeObservers(DataOutputStream dos,
-      Map<Column, ObserverSpecification> colObservers) throws IOException {
-    // TODO use a human readable serialized format like json
-
-    Set<Entry<Column, ObserverSpecification>> es = colObservers.entrySet();
-
-    WritableUtils.writeVInt(dos, colObservers.size());
-
-    for (Entry<Column, ObserverSpecification> entry : es) {
-      ColumnUtil.writeColumn(entry.getKey(), dos);
-      dos.writeUTF(entry.getValue().getClassName());
-      Map<String, String> params = entry.getValue().getConfiguration().toMap();
-      WritableUtils.writeVInt(dos, params.size());
-      for (Entry<String, String> pentry : params.entrySet()) {
-        dos.writeUTF(pentry.getKey());
-        dos.writeUTF(pentry.getValue());
-      }
-    }
-  }
-
-  private static byte[] serializeObservers(Map<Column, ObserverSpecification> colObservers,
-      Map<Column, ObserverSpecification> weakObservers) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try (DataOutputStream dos = new DataOutputStream(baos)) {
-      serializeObservers(dos, colObservers);
-      serializeObservers(dos, weakObservers);
-    }
-
-    byte[] serializedObservers = baos.toByteArray();
-    return serializedObservers;
-  }
-}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 30b398d..4ad6ec9 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -16,17 +16,10 @@
 package org.apache.fluo.core.impl;
 
 import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.accumulo.core.client.Connector;
@@ -35,16 +28,14 @@
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.metrics.MetricsReporter;
 import org.apache.fluo.core.metrics.MetricNames;
 import org.apache.fluo.core.metrics.MetricsReporterImpl;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.ObserverUtil;
 import org.apache.fluo.core.util.AccumuloUtil;
-import org.apache.fluo.core.util.ColumnUtil;
 import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Holds common environment configuration and shared resources
@@ -54,9 +45,7 @@
   private String table;
   private Authorizations auths = new Authorizations();
   private String accumuloInstance;
-  private Map<Column, ObserverSpecification> observers;
-  private Map<Column, ObserverSpecification> weakObservers;
-  private Set<Column> allObserversColumns;
+  private RegisteredObservers observers;
   private Connector conn;
   private String accumuloInstanceID;
   private String fluoApplicationID;
@@ -105,8 +94,6 @@
     this.auths = env.auths;
     this.accumuloInstance = env.accumuloInstance;
     this.observers = env.observers;
-    this.weakObservers = env.weakObservers;
-    this.allObserversColumns = env.allObserversColumns;
     this.conn = env.conn;
     this.accumuloInstanceID = env.accumuloInstanceID;
     this.fluoApplicationID = env.fluoApplicationID;
@@ -136,18 +123,10 @@
           new String(curator.getData().forPath(ZookeeperPath.CONFIG_ACCUMULO_TABLE),
               StandardCharsets.UTF_8);
 
+      observers = ObserverUtil.load(curator);
+
       ByteArrayInputStream bais =
-          new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS));
-      DataInputStream dis = new DataInputStream(bais);
-
-      observers = Collections.unmodifiableMap(readObservers(dis));
-      weakObservers = Collections.unmodifiableMap(readObservers(dis));
-      allObserversColumns = new HashSet<>();
-      allObserversColumns.addAll(observers.keySet());
-      allObserversColumns.addAll(weakObservers.keySet());
-      allObserversColumns = Collections.unmodifiableSet(allObserversColumns);
-
-      bais = new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_SHARED));
+          new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_SHARED));
       Properties sharedProps = new Properties();
       sharedProps.load(bais);
 
@@ -164,29 +143,6 @@
     }
   }
 
-  private static Map<Column, ObserverSpecification> readObservers(DataInputStream dis)
-      throws IOException {
-
-    HashMap<Column, ObserverSpecification> omap = new HashMap<>();
-
-    int num = WritableUtils.readVInt(dis);
-    for (int i = 0; i < num; i++) {
-      Column col = ColumnUtil.readColumn(dis);
-      String clazz = dis.readUTF();
-      Map<String, String> params = new HashMap<>();
-      int numParams = WritableUtils.readVInt(dis);
-      for (int j = 0; j < numParams; j++) {
-        String k = dis.readUTF();
-        String v = dis.readUTF();
-        params.put(k, v);
-      }
-
-      ObserverSpecification ospec = new ObserverSpecification(clazz, params);
-      omap.put(col, ospec);
-    }
-
-    return omap;
-  }
 
   public void setAuthorizations(Authorizations auths) {
     this.auths = auths;
@@ -216,14 +172,10 @@
     return fluoApplicationID;
   }
 
-  public Map<Column, ObserverSpecification> getObservers() {
+  public RegisteredObservers getConfiguredObservers() {
     return observers;
   }
 
-  public Map<Column, ObserverSpecification> getWeakObservers() {
-    return weakObservers;
-  }
-
   public String getTable() {
     return table;
   }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index cd08482..6ff9437 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -42,6 +42,8 @@
 import org.apache.fluo.core.util.FluoCondition;
 import org.apache.fluo.core.util.SpanUtil;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 /**
  * This is utility code for either rolling forward or back failed transactions. A transaction is
  * deemed to have failed if the reading transaction waited too long or the transactor id does not
@@ -242,7 +244,7 @@
 
       LockValue lv = new LockValue(entry.getValue().get());
       ColumnUtil.commitColumn(env, lv.isTrigger(), false, col, lv.isWrite(), lv.isDelete(), lockTs,
-          commitTs, env.getObservers().keySet(), mut);
+          commitTs, env.getConfiguredObservers().getObservedColumns(STRONG), mut);
     }
 
   }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index e8398f8..322ae77 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -78,6 +78,9 @@
 import org.apache.fluo.core.util.Hex;
 import org.apache.fluo.core.util.SpanUtil;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
 /**
  * Transaction implementation
  */
@@ -125,9 +128,10 @@
     this.env = env;
     this.stats = new TxStats(env);
     this.startTs = startTs;
-    this.observedColumns = env.getObservers().keySet();
+    this.observedColumns = env.getConfiguredObservers().getObservedColumns(STRONG);
 
-    if (trigger != null && env.getWeakObservers().containsKey(trigger.getColumn())) {
+    if (trigger != null
+        && env.getConfiguredObservers().getObservedColumns(WEAK).contains(trigger.getColumn())) {
       this.weakNotification = trigger;
     } else {
       this.notification = trigger;
@@ -310,7 +314,7 @@
     Objects.requireNonNull(row);
     Objects.requireNonNull(col);
 
-    if (!env.getWeakObservers().containsKey(col)) {
+    if (!env.getConfiguredObservers().getObservedColumns(WEAK).contains(col)) {
       throw new IllegalArgumentException("Column not configured for weak notifications " + col);
     }
 
@@ -1022,7 +1026,7 @@
 
     HashMap<Bytes, Mutation> mutations = new HashMap<>();
 
-    if (env.getObservers().containsKey(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
+    if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
       Flutation m = new Flutation(env, cd.prow);
       Notification.put(env, m, cd.pcol, commitTs);
       mutations.put(cd.prow, m);
@@ -1031,7 +1035,7 @@
     for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
 
       for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        if (env.getObservers().containsKey(colUpdates.getKey())) {
+        if (observedColumns.contains(colUpdates.getKey())) {
           Bytes val = colUpdates.getValue();
           if (isWrite(val) && !isDelete(val)) {
             Mutation m = mutations.get(rowUpdates.getKey());
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java
new file mode 100644
index 0000000..bac18c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+
+/*
+ * This interface enables abstracting the new and old way on configuring observers.
+ */
+public interface ObserverStore {
+  boolean handles(FluoConfiguration config);
+
+  void clear(CuratorFramework curator) throws Exception;
+
+  void update(CuratorFramework curator, FluoConfiguration config) throws Exception;
+
+  RegisteredObservers load(CuratorFramework curator) throws Exception;
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
new file mode 100644
index 0000000..0e2f94a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.v1.ObserverStoreV1;
+import org.apache.fluo.core.observer.v2.ObserverStoreV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ObserverUtil {
+
+  private static Logger logger = LoggerFactory.getLogger(ObserverUtil.class);
+
+  public static void initialize(CuratorFramework curator, FluoConfiguration config) {
+
+    logger.info("Setting up observers using app config: {}", config.getAppConfiguration());
+
+    ObserverStore ov1 = new ObserverStoreV1();
+    ObserverStore ov2 = new ObserverStoreV2();
+
+    if (ov1.handles(config) && ov2.handles(config)) {
+      throw new IllegalArgumentException(
+          "Old and new observers configuration present.  There can only be one.");
+    }
+
+    try {
+      if (ov1.handles(config)) {
+        ov2.clear(curator);
+        ov1.update(curator, config);
+      } else if (ov2.handles(config)) {
+        ov1.clear(curator);
+        ov2.update(curator, config);
+      }
+    } catch (Exception e) {
+      throw new FluoException("Failed to update shared configuration in Zookeeper", e);
+    }
+  }
+
+  public static RegisteredObservers load(CuratorFramework curator) throws Exception {
+    ObserverStore ov1 = new ObserverStoreV1();
+    ObserverStore ov2 = new ObserverStoreV2();
+
+    // try to load observers using old and new config
+    RegisteredObservers co = ov1.load(curator);
+    if (co == null) {
+      co = ov2.load(curator);
+    }
+
+    if (co == null) {
+      // no observers configured, so return an empty provider
+      co = new RegisteredObservers() {
+        @Override
+        public Observers getObservers(Environment env) {
+          return new Observers() {
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public void returnObserver(Observer o) {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public Observer getObserver(Column col) {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+
+        @Override
+        public Set<Column> getObservedColumns(NotificationType nt) {
+          return Collections.emptySet();
+        }
+      };
+    }
+
+    return co;
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
new file mode 100644
index 0000000..d4cc366
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+
+public interface Observers extends AutoCloseable {
+  Observer getObserver(Column col);
+
+  void returnObserver(Observer o);
+
+  @Override
+  void close();
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java
new file mode 100644
index 0000000..0b07c1f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import java.util.Set;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.core.impl.Environment;
+
+public interface RegisteredObservers {
+  Set<Column> getObservedColumns(Observer.NotificationType nt);
+
+  Observers getObservers(Environment env);
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
similarity index 96%
rename from modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
rename to modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
index 47d5997..f37f2c1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
@@ -13,7 +13,7 @@
  * the License.
  */
 
-package org.apache.fluo.core.worker;
+package org.apache.fluo.core.observer.v1;
 
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.metrics.MetricsReporter;
@@ -21,6 +21,7 @@
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.metrics.DummyMetricsReporter;
 
+@SuppressWarnings("deprecation")
 public class ObserverContext implements Observer.Context {
 
   private final SimpleConfiguration observerConfig;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
new file mode 100644
index 0000000..754ca28
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v1;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.Observer.ObservedColumn;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.Observers;
+import org.apache.fluo.core.observer.ObserverStore;
+import org.apache.fluo.core.util.ColumnUtil;
+import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Support for observers configured the old way.
+ */
+@SuppressWarnings("deprecation")
+public class ObserverStoreV1 implements ObserverStore {
+
+  private static final Logger logger = LoggerFactory.getLogger(ObserverStoreV1.class);
+
+  @Override
+  public boolean handles(FluoConfiguration config) {
+    Collection<ObserverSpecification> obsSpecs = config.getObserverSpecifications();
+    return !obsSpecs.isEmpty();
+  }
+
+  @Override
+  public void update(CuratorFramework curator, FluoConfiguration config) throws Exception {
+    Collection<ObserverSpecification> obsSpecs = config.getObserverSpecifications();
+
+    Map<Column, ObserverSpecification> colObservers = new HashMap<>();
+    Map<Column, ObserverSpecification> weakObservers = new HashMap<>();
+
+    for (ObserverSpecification ospec : obsSpecs) {
+      Observer observer;
+      try {
+        observer = Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance();
+      } catch (ClassNotFoundException e1) {
+        throw new FluoException("Observer class '" + ospec.getClassName() + "' was not "
+            + "found.  Check for class name misspellings or failure to include "
+            + "the observer jar.", e1);
+      } catch (InstantiationException | IllegalAccessException e2) {
+        throw new FluoException("Observer class '" + ospec.getClassName()
+            + "' could not be created.", e2);
+      }
+
+      SimpleConfiguration oc = ospec.getConfiguration();
+      logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(),
+          oc.toMap());
+      try {
+        observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), oc));
+      } catch (Exception e) {
+        throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized",
+            e);
+      }
+
+      ObservedColumn observedCol = observer.getObservedColumn();
+      if (observedCol.getType() == NotificationType.STRONG) {
+        colObservers.put(observedCol.getColumn(), ospec);
+      } else {
+        weakObservers.put(observedCol.getColumn(), ospec);
+      }
+    }
+
+    updateObservers(curator, colObservers, weakObservers);
+  }
+
+  private static void updateObservers(CuratorFramework curator,
+      Map<Column, ObserverSpecification> colObservers,
+      Map<Column, ObserverSpecification> weakObservers) throws Exception {
+
+    // TODO check that no workers are running... or make workers watch this znode
+
+    String observerPath = ZookeeperPath.CONFIG_FLUO_OBSERVERS1;
+    try {
+      curator.delete().deletingChildrenIfNeeded().forPath(observerPath);
+    } catch (NoNodeException nne) {
+      // it's ok if node doesn't exist
+    } catch (Exception e) {
+      logger.error("An error occurred deleting Zookeeper node. node=[" + observerPath
+          + "], error=[" + e.getMessage() + "]");
+      throw new RuntimeException(e);
+    }
+
+    byte[] serializedObservers = serializeObservers(colObservers, weakObservers);
+    CuratorUtil.putData(curator, observerPath, serializedObservers,
+        CuratorUtil.NodeExistsPolicy.OVERWRITE);
+  }
+
+  private static void serializeObservers(DataOutputStream dos,
+      Map<Column, ObserverSpecification> colObservers) throws IOException {
+    // TODO use a human readable serialized format like json
+
+    Set<Entry<Column, ObserverSpecification>> es = colObservers.entrySet();
+
+    WritableUtils.writeVInt(dos, colObservers.size());
+
+    for (Entry<Column, ObserverSpecification> entry : es) {
+      ColumnUtil.writeColumn(entry.getKey(), dos);
+      dos.writeUTF(entry.getValue().getClassName());
+      Map<String, String> params = entry.getValue().getConfiguration().toMap();
+      WritableUtils.writeVInt(dos, params.size());
+      for (Entry<String, String> pentry : params.entrySet()) {
+        dos.writeUTF(pentry.getKey());
+        dos.writeUTF(pentry.getValue());
+      }
+    }
+  }
+
+  private static byte[] serializeObservers(Map<Column, ObserverSpecification> colObservers,
+      Map<Column, ObserverSpecification> weakObservers) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (DataOutputStream dos = new DataOutputStream(baos)) {
+      serializeObservers(dos, colObservers);
+      serializeObservers(dos, weakObservers);
+    }
+
+    byte[] serializedObservers = baos.toByteArray();
+    return serializedObservers;
+  }
+
+
+  private static Map<Column, ObserverSpecification> readObservers(DataInputStream dis)
+      throws IOException {
+
+    HashMap<Column, ObserverSpecification> omap = new HashMap<>();
+
+    int num = WritableUtils.readVInt(dis);
+    for (int i = 0; i < num; i++) {
+      Column col = ColumnUtil.readColumn(dis);
+      String clazz = dis.readUTF();
+      Map<String, String> params = new HashMap<>();
+      int numParams = WritableUtils.readVInt(dis);
+      for (int j = 0; j < numParams; j++) {
+        String k = dis.readUTF();
+        String v = dis.readUTF();
+        params.put(k, v);
+      }
+
+      ObserverSpecification ospec = new ObserverSpecification(clazz, params);
+      omap.put(col, ospec);
+    }
+
+    return omap;
+  }
+
+  @Override
+  public RegisteredObservers load(CuratorFramework curator) throws Exception {
+
+    Map<Column, ObserverSpecification> observers;
+    Map<Column, ObserverSpecification> weakObservers;
+
+    ByteArrayInputStream bais;
+    try {
+      bais =
+          new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS1));
+    } catch (NoNodeException nne) {
+      return null;
+    }
+    DataInputStream dis = new DataInputStream(bais);
+
+    observers = Collections.unmodifiableMap(readObservers(dis));
+    weakObservers = Collections.unmodifiableMap(readObservers(dis));
+
+
+    return new RegisteredObservers() {
+
+      @Override
+      public Observers getObservers(Environment env) {
+        return new ObserversV1(env, observers, weakObservers);
+      }
+
+      @Override
+      public Set<Column> getObservedColumns(NotificationType nt) {
+        switch (nt) {
+          case STRONG:
+            return observers.keySet();
+          case WEAK:
+            return weakObservers.keySet();
+          default:
+            throw new IllegalArgumentException("Unknown notification type " + nt);
+        }
+      }
+    };
+  }
+
+  @Override
+  public void clear(CuratorFramework curator) throws Exception {
+    try {
+      curator.delete().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS1);
+    } catch (NoNodeException nne) {
+      // nothing to delete
+    }
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
similarity index 83%
rename from modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java
rename to modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
index 285a69a..65c1b2f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
@@ -13,7 +13,7 @@
  * the License.
  */
 
-package org.apache.fluo.core.worker;
+package org.apache.fluo.core.observer.v1;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -24,15 +24,19 @@
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.Observer;
 import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.Observers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Observers implements AutoCloseable {
+@SuppressWarnings("deprecation")
+class ObserversV1 implements Observers {
 
-  private static final Logger log = LoggerFactory.getLogger(Observers.class);
+  private static final Logger log = LoggerFactory.getLogger(ObserversV1.class);
 
   private Environment env;
   Map<Column, List<Observer>> observers = new HashMap<>();
+  Map<Column, ObserverSpecification> strongObservers;
+  Map<Column, ObserverSpecification> weakObservers;
 
   private List<Observer> getObserverList(Column col) {
     List<Observer> observerList;
@@ -46,8 +50,11 @@
     return observerList;
   }
 
-  public Observers(Environment env) {
+  public ObserversV1(Environment env, Map<Column, ObserverSpecification> strongObservers,
+      Map<Column, ObserverSpecification> weakObservers) {
     this.env = env;
+    this.strongObservers = strongObservers;
+    this.weakObservers = weakObservers;
   }
 
   public Observer getObserver(Column col) {
@@ -63,9 +70,9 @@
 
     Observer observer = null;
 
-    ObserverSpecification observerConfig = env.getObservers().get(col);
+    ObserverSpecification observerConfig = strongObservers.get(col);
     if (observerConfig == null) {
-      observerConfig = env.getWeakObservers().get(col);
+      observerConfig = weakObservers.get(col);
     }
 
     if (observerConfig != null) {
@@ -119,4 +126,5 @@
 
     observers = null;
   }
+
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java
new file mode 100644
index 0000000..3787be3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+/**
+ * this class created for json serialization
+ */
+class JsonObservedColumn {
+  private byte[] fam;
+  private byte[] qual;
+  private byte[] vis;
+  private String notificationType;
+
+  JsonObservedColumn(Column col, NotificationType nt) {
+    this.fam = col.getFamily().toArray();
+    this.qual = col.getQualifier().toArray();
+    this.vis = col.getVisibility().toArray();
+    this.notificationType = nt.name();
+  }
+
+  public Column getColumn() {
+    return new Column(Bytes.of(fam), Bytes.of(qual), Bytes.of(vis));
+  }
+
+  public NotificationType getNotificationType() {
+    return NotificationType.valueOf(notificationType);
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
new file mode 100644
index 0000000..44f229b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * this class created for json serialization
+ */
+class JsonObservers {
+  String obsProviderClass;
+  List<JsonObservedColumn> observedColumns;
+
+  JsonObservers(String obsProviderClass, Map<Column, NotificationType> columns) {
+    this.obsProviderClass = obsProviderClass;
+    this.observedColumns =
+        columns.entrySet().stream()
+            .map(entry -> new JsonObservedColumn(entry.getKey(), entry.getValue()))
+            .collect(toList());
+  }
+
+  public String getObserverProviderClass() {
+    return obsProviderClass;
+  }
+
+  public Map<Column, NotificationType> getObservedColumns() {
+    return observedColumns.stream().collect(
+        toMap(JsonObservedColumn::getColumn, JsonObservedColumn::getNotificationType));
+  }
+
+  @Override
+  public String toString() {
+    return obsProviderClass + " " + getObservedColumns();
+  }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
similarity index 70%
copy from modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
copy to modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
index 47d5997..3d2b1f3 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
@@ -13,30 +13,27 @@
  * the License.
  */
 
-package org.apache.fluo.core.worker;
+package org.apache.fluo.core.observer.v2;
 
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.metrics.MetricsReporter;
-import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.metrics.DummyMetricsReporter;
 
-public class ObserverContext implements Observer.Context {
+public class ObserverProviderContextImpl implements Context {
 
-  private final SimpleConfiguration observerConfig;
-  private final SimpleConfiguration appConfig;
+  private SimpleConfiguration appConfig;
   private final Environment env;
 
-  public ObserverContext(SimpleConfiguration appConfig, SimpleConfiguration observerConfig) {
+  public ObserverProviderContextImpl(SimpleConfiguration appConfig) {
     this.appConfig = appConfig;
-    this.observerConfig = observerConfig;
     this.env = null;
   }
 
-  public ObserverContext(Environment env, SimpleConfiguration observerConfig) {
+  public ObserverProviderContextImpl(Environment env) {
     this.env = env;
     this.appConfig = null;
-    this.observerConfig = observerConfig;
   }
 
   @Override
@@ -48,15 +45,11 @@
   }
 
   @Override
-  public SimpleConfiguration getObserverConfiguration() {
-    return observerConfig;
-  }
-
-  @Override
   public MetricsReporter getMetricsReporter() {
     if (env == null) {
       return new DummyMetricsReporter();
     }
     return env.getMetricsReporter();
   }
+
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
new file mode 100644
index 0000000..2e544cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.Observers;
+import org.apache.fluo.core.observer.ObserverStore;
+import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.fluo.accumulo.util.ZookeeperPath.CONFIG_FLUO_OBSERVERS2;
+
+/*
+ * Support for observers configured the new way.
+ */
+public class ObserverStoreV2 implements ObserverStore {
+
+  @Override
+  public boolean handles(FluoConfiguration config) {
+    return !config.getObserverProvider().isEmpty();
+  }
+
+  @Override
+  public void update(CuratorFramework curator, FluoConfiguration config) throws Exception {
+    String obsProviderClass = config.getObserverProvider();
+
+    ObserverProvider observerProvider = newObserverProvider(obsProviderClass);
+
+    Map<Column, NotificationType> obsCols = new HashMap<>();
+    BiConsumer<Column, NotificationType> obsColConsumer = (col, nt) -> {
+      Objects.requireNonNull(col, "Observed column must be non-null");
+      Objects.requireNonNull(nt, "Notification type must be non-null");
+      Preconditions.checkArgument(!obsCols.containsKey(col), "Duplicate observed column %s", col);
+      obsCols.put(col, nt);
+    };
+
+    observerProvider.provideColumns(obsColConsumer,
+        new ObserverProviderContextImpl(config.getAppConfiguration()));
+
+    Gson gson = new Gson();
+    String json = gson.toJson(new JsonObservers(obsProviderClass, obsCols));
+    CuratorUtil.putData(curator, CONFIG_FLUO_OBSERVERS2, json.getBytes(UTF_8),
+        CuratorUtil.NodeExistsPolicy.OVERWRITE);
+
+  }
+
+  static ObserverProvider newObserverProvider(String obsProviderClass) {
+    ObserverProvider observerProvider;
+    try {
+      observerProvider =
+          Class.forName(obsProviderClass).asSubclass(ObserverProvider.class).newInstance();
+    } catch (ClassNotFoundException e1) {
+      throw new FluoException("ObserverProvider class '" + obsProviderClass + "' was not "
+          + "found.  Check for class name misspellings or failure to include "
+          + "the observer provider jar.", e1);
+    } catch (InstantiationException | IllegalAccessException e2) {
+      throw new FluoException("ObserverProvider class '" + obsProviderClass
+          + "' could not be created.", e2);
+    }
+    return observerProvider;
+  }
+
+  @Override
+  public RegisteredObservers load(CuratorFramework curator) throws Exception {
+    byte[] data;
+    try {
+      data = curator.getData().forPath(CONFIG_FLUO_OBSERVERS2);
+    } catch (NoNodeException nne) {
+      return null;
+    }
+    String json = new String(data, UTF_8);
+    JsonObservers jco = new Gson().fromJson(json, JsonObservers.class);
+
+    Set<Column> weakColumns = new HashSet<>();
+    Set<Column> strongColumns = new HashSet<>();
+
+    for (Entry<Column, NotificationType> entry : jco.getObservedColumns().entrySet()) {
+      switch (entry.getValue()) {
+        case STRONG:
+          strongColumns.add(entry.getKey());
+          break;
+        case WEAK:
+          weakColumns.add(entry.getKey());
+          break;
+        default:
+          throw new IllegalStateException("Unknown notification type " + entry.getValue());
+      }
+    }
+
+    return new RegisteredObservers() {
+
+      @Override
+      public Observers getObservers(Environment env) {
+        return new ObserversV2(env, jco, strongColumns, weakColumns);
+      }
+
+      @Override
+      public Set<Column> getObservedColumns(NotificationType nt) {
+        switch (nt) {
+          case STRONG:
+            return strongColumns;
+          case WEAK:
+            return weakColumns;
+          default:
+            throw new IllegalArgumentException("Unknown notification type " + nt);
+        }
+      }
+    };
+  }
+
+  @Override
+  public void clear(CuratorFramework curator) throws Exception {
+    try {
+      curator.delete().forPath(CONFIG_FLUO_OBSERVERS2);
+    } catch (NoNodeException nne) {
+      // nothing to delete
+    }
+  }
+
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
new file mode 100644
index 0000000..d9d4a97
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.ObserverProvider.Registry;
+import org.apache.fluo.api.observer.StringObserver;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.Observers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ObserversV2 implements Observers {
+
+  private static final Logger log = LoggerFactory.getLogger(ObserversV2.class);
+
+  Map<Column, Observer> observers;
+
+  public ObserversV2(Environment env, JsonObservers jco, Set<Column> strongColumns,
+      Set<Column> weakColumns) {
+    observers = new HashMap<>();
+
+    ObserverProvider obsProvider =
+        ObserverStoreV2.newObserverProvider(jco.getObserverProviderClass());
+
+    ObserverProviderContextImpl ctx = new ObserverProviderContextImpl(env);
+
+    Registry or = new Registry() {
+
+      @Override
+      public void register(Column col, NotificationType nt, Observer obs) {
+        try {
+          Method closeMethod = obs.getClass().getMethod("close");
+          if (!closeMethod.getDeclaringClass().equals(Observer.class)) {
+            log.warn(
+                "Observer {} implements close().  Close is not called on Observers created using ObserverProvider."
+                    + " Close is only called on Observers configured the old way.", obs.getClass()
+                    .getName());
+          }
+        } catch (NoSuchMethodException | SecurityException e) {
+          throw new RuntimeException("Failed to check if close() is implemented", e);
+        }
+
+        if (nt == NotificationType.STRONG && !strongColumns.contains(col)) {
+          throw new IllegalArgumentException("Column " + col
+              + " not previously configured for strong notifications");
+        }
+
+        if (nt == NotificationType.WEAK && !weakColumns.contains(col)) {
+          throw new IllegalArgumentException("Column " + col
+              + " not previously configured for weak notifications");
+        }
+
+        if (observers.containsKey(col)) {
+          throw new IllegalArgumentException("Duplicate observed column " + col);
+        }
+
+        observers.put(col, obs);
+      }
+
+      @Override
+      public void registers(Column col, NotificationType nt, StringObserver obs) {
+        register(col, nt, obs);
+      }
+    };
+
+    obsProvider.provide(or, ctx);
+
+    // the following check ensures observers are provided for all previously configured columns
+    SetView<Column> diff =
+        Sets.difference(observers.keySet(), Sets.union(strongColumns, weakColumns));
+    if (diff.size() > 0) {
+      throw new FluoException("ObserverProvider " + jco.getObserverProviderClass()
+          + " did not provide observers for columns " + diff);
+    }
+  }
+
+  @Override
+  public Observer getObserver(Column col) {
+    return observers.get(col);
+  }
+
+  @Override
+  public void returnObserver(Observer o) {}
+
+  @Override
+  public void close() {}
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index 1834835..56aa78d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -29,6 +29,7 @@
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.observer.Observers;
 import org.apache.fluo.core.util.FluoExecutors;
 import org.apache.fluo.core.util.Hex;
 import org.slf4j.Logger;
@@ -50,7 +51,7 @@
     this.queue = new PriorityBlockingQueue<>();
     this.executor = FluoExecutors.newFixedThreadPool(numThreads, queue, "ntfyProc");
     this.tracker = new NotificationTracker();
-    this.observers = new Observers(env);
+    this.observers = env.getConfiguredObservers().getObservers(env);
     env.getSharedResources().getMetricRegistry()
         .register(env.getMetricNames().getNotificationQueued(), new Gauge<Integer>() {
           @Override
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index 895fc21..ff582ed 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -23,6 +23,7 @@
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.impl.TransactionImpl;
 import org.apache.fluo.core.log.TracingTransaction;
+import org.apache.fluo.core.observer.Observers;
 import org.apache.fluo.core.util.Hex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/modules/distribution/src/main/config/fluo.properties b/modules/distribution/src/main/config/fluo.properties
index ad0c97f..f9f603d 100644
--- a/modules/distribution/src/main/config/fluo.properties
+++ b/modules/distribution/src/main/config/fluo.properties
@@ -60,10 +60,9 @@
 
 # Observer properties
 # -------------------
-# Specifies observers
-# fluo.observer.0=com.foo.Observer1
-# Can optionally have configuration key values
-# fluo.observer.1=com.foo.Observer2,configKey1=configVal1,configKey2=configVal2
+# Specifies an observer provider.  This should be the name of a class that
+# implements org.apache.fluo.api.observer.ObserverProvider.
+#fluo.observer.provider=com.foo.AppObserverProvider
 
 # Transaction properties
 # ----------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
index 8f6b425..8f644eb 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
@@ -16,8 +16,6 @@
 package org.apache.fluo.integration;
 
 import java.io.File;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.client.Connector;
@@ -30,8 +28,8 @@
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -80,8 +78,15 @@
     conn = miniAccumulo.getConnector(USER, new PasswordToken(PASSWORD));
   }
 
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.emptyList();
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return null;
+  }
+
+  protected void setupObservers(FluoConfiguration fc) {
+    Class<? extends ObserverProvider> ofc = getObserverProviderClass();
+    if (ofc != null) {
+      fc.setObserverProvider(ofc);
+    }
   }
 
   public String getCurTableName() {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
index 656a5d9..481e6e8 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -75,7 +75,7 @@
     config.setAccumuloZookeepers(miniAccumulo.getZooKeepers());
     config.setInstanceZookeepers(miniAccumulo.getZooKeepers() + "/fluo");
     config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
-    config.addObservers(getObservers());
+    setupObservers(config);
     config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
     config.setMiniStartAccumulo(false);
 
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
index 0d28381..bcef827 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
@@ -49,7 +49,7 @@
     config.setInstanceZookeepers(miniAccumulo.getZooKeepers() + "/fluo");
     config.setAccumuloTable(getNextTableName());
     config.setWorkerThreads(5);
-    config.addObservers(getObservers());
+    setupObservers(config);
     config.setMiniStartAccumulo(false);
 
     setConfig(config);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
index 22f8ea2..7c48cc3 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
@@ -15,9 +15,6 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
@@ -25,25 +22,28 @@
 import org.apache.fluo.api.client.LoaderExecutor;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.integration.ITBaseMini;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 public class AppConfigIT extends ITBaseMini {
 
+  public static final Column DF_COL = new Column("data", "foo");
+  public static final Column DB_COL = new Column("data", "bar");
+
   @Override
   protected void setAppConfig(SimpleConfiguration config) {
     config.setProperty("myapp.sizeLimit", 50000);
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(TestObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return TestObserverProvider.class;
   }
 
   @Test
@@ -89,32 +89,24 @@
     public void load(TransactionBase tx, Context context) throws Exception {
       int limit = context.getAppConfiguration().getInt("myapp.sizeLimit");
       if (data < limit) {
-        tx.set(row, new Column("data", "foo"), Integer.toString(data));
+        tx.set(row, DF_COL, Integer.toString(data));
       }
     }
   }
 
-  public static class TestObserver extends AbstractObserver {
-
-    private int limit;
-
+  public static class TestObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(new Column("data", "foo"), NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      int limit = ctx.getAppConfiguration().getInt("myapp.sizeLimit");
+
+      or.registers(DF_COL, STRONG, (tx, row, col) -> {
+        int d = Integer.parseInt(tx.gets(row, col));
+        if (2 * d < limit) {
+          tx.set(row.toString(), DB_COL, Integer.toString(2 * d));
+        }
+      });
     }
 
-    @Override
-    public void init(Context context) {
-      limit = context.getAppConfiguration().getInt("myapp.sizeLimit");
-    }
-
-    @Override
-    public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-      int d = Integer.parseInt(tx.gets(row.toString(), col));
-      if (2 * d < limit) {
-        tx.set(row.toString(), new Column("data", "bar"), Integer.toString(2 * d));
-      }
-    }
   }
 
   @Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
index 3af4fdc..f927102 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -15,9 +15,7 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Random;
 
@@ -32,11 +30,9 @@
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.impl.FluoConfigurationImpl;
 import org.apache.fluo.core.util.UtilWaitThread;
 import org.apache.fluo.integration.ITBaseMini;
@@ -81,27 +77,22 @@
     }
   }
 
-  public static class TotalObserver extends AbstractObserver {
-
+  public static class CollisionObserverProvider implements ObserverProvider {
     @Override
-    public Observer.ObservedColumn getObservedColumn() {
-      return new Observer.ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
-    }
+    public void provide(Registry or, Context ctx) {
+      or.registers(STAT_CHANGED, NotificationType.WEAK, (tx, row, col) -> {
+        int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
+        int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
 
-    @Override
-    public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
-      String row = rowBytes.toString();
-      int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
-      int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
-
-      tx.set(row, STAT_PROCESSED, total + "");
-      TestUtil.increment(tx, "all", STAT_TOTAL, total - processed);
+        tx.set(row, STAT_PROCESSED, total + "");
+        TestUtil.increment(tx, "all", STAT_TOTAL, total - processed);
+      });
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(TotalObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return CollisionObserverProvider.class;
   }
 
   @Override
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
index 6b4d279..d86f2ef 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -15,9 +15,7 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Random;
 
@@ -32,12 +30,12 @@
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.exceptions.CommitException;
 import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.exceptions.StaleScanException;
 import org.apache.fluo.core.impl.Notification;
@@ -54,6 +52,7 @@
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
 import static org.apache.fluo.integration.BankUtil.BALANCE;
 
 public class FailureIT extends ITBaseImpl {
@@ -61,22 +60,21 @@
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
-  public static class NullObserver extends AbstractObserver {
-
+  public static class NullObserver implements Observer {
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) throws Exception {}
+  }
 
+  public static class FailuresObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(new Column("attr", "lastupdate"), NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      or.register(new Column("attr", "lastupdate"), STRONG, new NullObserver());
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    List<ObserverSpecification> observed = new ArrayList<>();
-    observed.add(new ObserverSpecification(NullObserver.class.getName()));
-    return observed;
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return FailuresObserverProvider.class;
   }
 
   @Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index e67d4d9..288bed9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -18,7 +18,6 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
@@ -28,16 +27,15 @@
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
@@ -52,23 +50,19 @@
 
 public class FluoIT extends ITBaseImpl {
 
-  public static class BalanceObserver extends AbstractObserver {
-
+  public static class FluoITObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(BALANCE, NotificationType.STRONG);
-    }
-
-    @Override
-    public void process(TransactionBase tx, Bytes row, Column col) {
-      Assert.fail();
+    public void provide(Registry or, Context ctx) {
+      or.register(BALANCE, NotificationType.STRONG, (tx, row, col) -> {
+        Assert.fail();
+      });
     }
   }
 
   @Override
-  protected List<org.apache.fluo.api.config.ObserverSpecification> getObservers() {
-    return Arrays.asList(new ObserverSpecification(BalanceObserver.class.getName()));
-  };
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return FluoITObserverProvider.class;
+  }
 
   @Test
   public void testFluoFactory() throws Exception {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
index b496955..e0a08ac 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
@@ -15,8 +15,6 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Map.Entry;
 
 import com.google.common.collect.Iterables;
@@ -24,14 +22,14 @@
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.util.ColumnConstants;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.util.ByteUtil;
 import org.apache.fluo.integration.ITBaseMini;
 import org.apache.fluo.integration.TestTransaction;
-import org.apache.fluo.integration.impl.WeakNotificationIT.SimpleObserver;
+import org.apache.fluo.integration.impl.WeakNotificationIT.WeakNotificationITObserverProvider;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -60,8 +58,8 @@
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return WeakNotificationITObserverProvider.class;
   }
 
   @Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
index bfd8e25..5c9c02f 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
@@ -23,6 +23,7 @@
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
@@ -35,6 +36,7 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+@Deprecated
 public class ObserverConfigIT extends ITBaseMini {
 
   public static class ConfigurableObserver extends AbstractObserver {
@@ -95,7 +97,7 @@
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
+  protected void setupObservers(FluoConfiguration fc) {
     List<ObserverSpecification> observers = new ArrayList<>();
 
     observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
@@ -108,7 +110,7 @@
     observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
         "observedCol", "fam1:col3:" + NotificationType.WEAK, "outputCQ", "col4")));
 
-    return observers;
+    fc.addObservers(observers);
   }
 
   @Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
index fe4b0d6..5381952 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
@@ -22,14 +22,16 @@
 
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.integration.ITBaseMini;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 /**
  * Test an observer notifying the column its observing. This is a useful pattern for exporting data.
  */
@@ -39,15 +41,9 @@
   private static final Column EXPORT_CHECK_COL = new Column("export", "check");
   private static final Column EXPORT_COUNT_COL = new Column("export", "count");
 
-  @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(ExportingObserver.class.getName()));
-  }
-
   private static List<String> exports = new ArrayList<>();
 
-  public static class ExportingObserver extends AbstractObserver {
-
+  public static class ExportingObserver implements Observer {
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
       String r = row.toString();
@@ -69,13 +65,20 @@
     private void export(Bytes row, String exportCount) {
       exports.add(exportCount);
     }
+  }
 
+  public static class SelfNtfyObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(EXPORT_COUNT_COL, NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      or.register(EXPORT_COUNT_COL, STRONG, new ExportingObserver());
     }
   }
 
+  @Override
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return SelfNtfyObserverProvider.class;
+  }
+
   @Test
   public void test1() throws Exception {
 
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
index 1d065e1..ce002cb 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
@@ -15,15 +15,10 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
 import org.apache.fluo.core.impl.TransactorNode;
@@ -32,30 +27,26 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 public class StrongNotificationIT extends ITBaseMini {
 
   private static final Column OC = new Column("f", "q");
   private static final Column RC = new Column("f", "r");
 
-  public static class SimpleObserver extends AbstractObserver {
-
+  public static class StrongNtfyObserverProvider implements ObserverProvider {
     @Override
-    public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-      String r = row.toString();
-
-      String v = tx.gets(r, col);
-      tx.set(v, RC, r);
-    }
-
-    @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(OC, NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      or.register(OC, STRONG, (tx, row, col) -> {
+        Bytes v = tx.get(row, col);
+        tx.set(v, RC, row);
+      });
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return StrongNtfyObserverProvider.class;
   }
 
   @Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
index 22c6632..2bd4ce9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
@@ -15,18 +15,15 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
 import org.apache.fluo.core.oracle.Stamp;
@@ -36,13 +33,14 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
 public class WeakNotificationIT extends ITBaseMini {
 
   private static final Column STAT_COUNT = new Column("stat", "count");
   private static final Column STAT_CHECK = new Column("stat", "check");
 
-  public static class SimpleObserver extends AbstractObserver {
-
+  public static class SimpleObserver implements Observer {
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
 
@@ -61,16 +59,18 @@
         tx.set(row.toString(), STAT_COUNT, sum + "");
       }
     }
+  }
 
+  public static class WeakNotificationITObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(STAT_CHECK, NotificationType.WEAK);
+    public void provide(Registry or, Context ctx) {
+      or.register(STAT_CHECK, WEAK, new SimpleObserver());
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return WeakNotificationITObserverProvider.class;
   }
 
   @Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
index 13bb7a7..9dcf6dd 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
@@ -15,9 +15,7 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.Scanner;
@@ -25,11 +23,10 @@
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
 import org.apache.fluo.core.oracle.Stamp;
@@ -39,38 +36,35 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
 public class WeakNotificationOverlapIT extends ITBaseImpl {
 
   private static final Column STAT_TOTAL = new Column("stat", "total");
   private static final Column STAT_PROCESSED = new Column("stat", "processed");
   private static final Column STAT_CHANGED = new Column("stat", "changed");
 
-  public static class TotalObserver extends AbstractObserver {
-
-    @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
+  private static final StringObserver TOTAL_OBSERVER = (tx, row, col) -> {
+    String totalStr = tx.gets(row, STAT_TOTAL);
+    if (totalStr == null) {
+      return;
     }
+    Integer total = Integer.parseInt(totalStr);
+    int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
+    tx.set(row, new Column("stat", "processed"), total + "");
+    TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
+  };
 
+  public static class WeakNtfyObserverProvider implements ObserverProvider {
     @Override
-    public void process(TransactionBase tx, Bytes row, Column col) {
-      String r = row.toString();
-      String totalStr = tx.gets(r, STAT_TOTAL);
-      if (totalStr == null) {
-        return;
-      }
-      Integer total = Integer.parseInt(totalStr);
-      int processed = TestUtil.getOrDefault(tx, r, STAT_PROCESSED, 0);
-      tx.set(r, new Column("stat", "processed"), total + "");
-      TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
+    public void provide(Registry or, Context ctx) {
+      or.registers(STAT_CHANGED, WEAK, TOTAL_OBSERVER);
     }
   }
 
-
-
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(TotalObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return WeakNtfyObserverProvider.class;
   }
 
   @Test
@@ -92,7 +86,7 @@
 
     Assert.assertEquals(1, countNotifications());
 
-    new TotalObserver().process(ttx2, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx2, Bytes.of("1"), STAT_CHANGED);
     // should not delete notification created by ttx3
     ttx2.done();
 
@@ -103,7 +97,7 @@
     Assert.assertEquals(1, countNotifications());
 
     TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
-    new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx4, Bytes.of("1"), STAT_CHANGED);
     ttx4.done();
 
     Assert.assertEquals(0, countNotifications());
@@ -132,7 +126,7 @@
 
     Assert.assertEquals(1, countNotifications());
 
-    new TotalObserver().process(ttx6, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx6, Bytes.of("1"), STAT_CHANGED);
     // should not delete notification created by ttx7
     ttx6.done();
 
@@ -143,7 +137,7 @@
     snap3.done();
 
     TestTransaction ttx8 = new TestTransaction(env, "1", STAT_CHANGED);
-    new TotalObserver().process(ttx8, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx8, Bytes.of("1"), STAT_CHANGED);
     ttx8.done();
 
     Assert.assertEquals(0, countNotifications());
@@ -182,7 +176,7 @@
 
     Assert.assertEquals(1, countNotifications());
 
-    new TotalObserver().process(ttx3, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx3, Bytes.of("1"), STAT_CHANGED);
     ttx3.done();
 
     Assert.assertEquals(1, countNotifications());
@@ -191,7 +185,7 @@
     }
 
     TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
-    new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx4, Bytes.of("1"), STAT_CHANGED);
     ttx4.done();
 
     Assert.assertEquals(0, countNotifications());
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index 2406b82..9db0ce4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -15,23 +15,19 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
-
 import com.google.common.collect.Iterables;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
+import org.apache.fluo.core.observer.Observers;
 import org.apache.fluo.core.worker.NotificationFinder;
-import org.apache.fluo.core.worker.Observers;
 import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder;
 import org.apache.fluo.integration.ITBaseMini;
 import org.apache.fluo.integration.TestTransaction;
@@ -39,6 +35,8 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 /**
  * A simple test that added links between nodes in a graph. There is an observer that updates an
  * index of node degree.
@@ -50,21 +48,10 @@
 
   private static Column observedColumn = LAST_UPDATE;
 
-  @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(DegreeIndexer.class.getName()));
-  }
-
-  public static class DegreeIndexer implements Observer {
+  public static class DegreeIndexer implements StringObserver {
 
     @Override
-    public void init(Context context) {}
-
-    @Override
-    public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
-
-      String row = rowBytes.toString();
-
+    public void process(TransactionBase tx, String row, Column col) throws Exception {
       // get previously calculated degree
       String degree = tx.gets(row, DEGREE);
 
@@ -84,14 +71,18 @@
         tx.delete("IDEG" + degree, new Column("node", row));
       }
     }
+  }
 
+  public static class WorkerITObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(observedColumn, NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      or.register(observedColumn, STRONG, new DegreeIndexer());
     }
+  }
 
-    @Override
-    public void close() {}
+  @Override
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return WorkerITObserverProvider.class;
   }
 
   @Test
@@ -152,15 +143,16 @@
   public void testDiffObserverConfig() throws Exception {
     observedColumn = new Column("attr2", "lastupdate");
     try {
-      try (Environment env = new Environment(config); Observers observers = new Observers(env)) {
-        observers.getObserver(LAST_UPDATE);
+      try (Environment env = new Environment(config);
+          Observers op = env.getConfiguredObservers().getObservers(env)) {
+        op.getObserver(LAST_UPDATE);
       }
 
       Assert.fail();
 
-    } catch (IllegalStateException ise) {
+    } catch (IllegalArgumentException ise) {
       Assert.assertTrue(ise.getMessage().contains(
-          "Mismatch between configured column and class column"));
+          "Column attr2 lastupdate  not previously configured for strong notifications"));
     } finally {
       observedColumn = LAST_UPDATE;
     }
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 02a4fd3..01f04e4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -18,7 +18,6 @@
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
@@ -32,14 +31,15 @@
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
 import org.apache.fluo.integration.ITBaseMini;
 import org.apache.fluo.integration.TestUtil;
 import org.apache.log4j.Level;
@@ -49,6 +49,8 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
 public class LogIT extends ITBaseMini {
 
   private static final Column STAT_COUNT = new Column("stat", "count");
@@ -100,13 +102,7 @@
     }
   }
 
-  public static class BinaryObserver extends AbstractObserver {
-
-    @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(bCol2, NotificationType.WEAK);
-    }
-
+  public static class BinaryObserver implements Observer {
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) {
       tx.get(bRow1, bCol2);
@@ -115,23 +111,24 @@
     }
   }
 
-  public static class TestObserver extends AbstractObserver {
-
+  public static class TestObserver implements StringObserver {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(STAT_COUNT, NotificationType.WEAK);
+    public void process(TransactionBase tx, String row, Column col) {
+      TestUtil.increment(tx, "all", col, Integer.parseInt(tx.gets(row, col)));
     }
+  }
 
+  public static class LogItObserverProvider implements ObserverProvider {
     @Override
-    public void process(TransactionBase tx, Bytes row, Column col) {
-      TestUtil.increment(tx, "all", col, Integer.parseInt(tx.gets(row.toString(), col)));
+    public void provide(Registry or, Context ctx) {
+      or.registers(STAT_COUNT, WEAK, new TestObserver());
+      or.register(bCol2, WEAK, new BinaryObserver());
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Arrays.asList(new ObserverSpecification(TestObserver.class.getName()),
-        new ObserverSpecification(BinaryObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return LogItObserverProvider.class;
   }
 
   @Test
diff --git a/pom.xml b/pom.xml
index ef2eb78..9c0a694 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,11 @@
         <version>1.32</version>
       </dependency>
       <dependency>
+        <groupId>com.google.code.gson</groupId>
+        <artifactId>gson</artifactId>
+        <version>2.8.0</version>
+      </dependency>
+      <dependency>
         <!-- Guava 13.0.1 is required by Twill (due to beta method usage) -->
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>