fixes #816 introduced better way to setup observers and deprecated old
diff --git a/docs/applications.md b/docs/applications.md
index 5685381..cbf12ba 100644
--- a/docs/applications.md
+++ b/docs/applications.md
@@ -100,12 +100,12 @@
To create an observer, follow these steps:
-1. Create a class that extends [AbstractObserver] like the example below. Please use [slf4j] for
+1. Create one or more classes that extend [Observer] like the example below. Please use [slf4j] for
any logging in observers as [slf4j] supports multiple logging implementations. This is
necessary as Fluo applications have a hard requirement on [logback] when running in YARN.
```java
- public class InvertObserver extends AbstractObserver {
+ public class InvertObserver implements Observer {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
@@ -114,25 +114,45 @@
// invert row and value
tx.set(value, new Column("inv", "data"), row);
}
+ }
+ ```
+2. Create a class that implements [ObserverProvider] like the example below. The purpose of this
+ class is associate a set Observers with columns that trigger the observers. The class can
+ create multiple observers.
+ ```java
+ class AppObserverProvider implements ObserverProvider {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(new Column("obs", "data"), NotificationType.STRONG);
+ public void provide(Registry or, Context ctx) {
+ //setup InvertObserver to be triggered when the column obs:data is modified
+ or.register(new Column("obs", "data"),
+ NotificationType.STRONG,
+ new InvertObserver());
+
+ //Observer is a Functional interface. So Obsevers can be written as lambdas.
+ or.register(new Column("new","data"),
+ NotificationType.WEAK,
+ (tx,row,col) -> {
+ Bytes combined = combineNewAndOld(tx,row);
+ tx.set(row, new Column("current","data"), combined);
+ });
}
}
```
-2. Build a jar containing this class and include this jar in the `lib/` directory of your Fluo
+
+3. Build a jar containing thses classes and include this jar in the `lib/` directory of your Fluo
application.
-3. Configure your Fluo instance to use this observer by modifying the Observer section of
+4. Configure your Fluo instance to use this observer provider by modifying the Observer section of
[fluo.properties].
-4. Restart your Fluo instance so that your Fluo workers load the new observer.
+5. Initialize Fluo. During initialization Fluo will obtain the observed columns from the
+ ObserverProvider and persist the columns in Zookeeper. These columns persisted in Zookeeper
+ are used by transactions to know when to trigger observers.
+6. Start your Fluo instance so that your Fluo workers load the new observer.
## Application Configuration
-Each observer can have its own configuration. This is useful for the case of using the same
-observer code w/ different parameters. However for the case of sharing the same configuration
-across observers, fluo provides a simple mechanism to set and access application specific
-configuration. See the javadoc on [FluoClient].getAppConfiguration() for more details.
+For configuring observers, fluo provides a simple mechanism to set and access application specific
+configuration. See the javadoc on [FluoClient].getAppConfiguration() for more details.
## Debugging Applications
@@ -195,7 +215,8 @@
[FluoFactory]: ../modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
[FluoClient]: ../modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
[FluoConfiguration]: ../modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
-[AbstractObserver]: ../modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
+[Observer]: ../modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
+[ObserverProvider]: ../modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
[fluo.properties]: ../modules/distribution/src/main/config/fluo.properties
[API]: https://fluo.apache.org/apidocs/
[metrics]: metrics.md
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
index aa6039e..8c7f1d2 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
@@ -25,7 +25,9 @@
public static final String CONFIG_ACCUMULO_INSTANCE_NAME = CONFIG + "/accumulo.instance.name";
public static final String CONFIG_ACCUMULO_INSTANCE_ID = CONFIG + "/accumulo.instance.id";
public static final String CONFIG_FLUO_APPLICATION_ID = CONFIG + "/fluo.application.id";
- public static final String CONFIG_FLUO_OBSERVERS = CONFIG + "/fluo.observers";
+ @Deprecated
+ public static final String CONFIG_FLUO_OBSERVERS1 = CONFIG + "/fluo.observers";
+ public static final String CONFIG_FLUO_OBSERVERS2 = CONFIG + "/fluo.observers2";
public static final String CONFIG_SHARED = CONFIG + "/shared.config";
public static final String ORACLE = "/oracle";
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java b/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
index 140351e..fcbe4db 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
@@ -15,8 +15,10 @@
package org.apache.fluo.api.client;
+import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
/**
* Client interface for Fluo. Fluo clients will have shared resources used by all objects created by
@@ -63,6 +65,8 @@
* keeping config files consistent across a cluster. To update this configuration, use
* {@link FluoAdmin#updateSharedConfig()}. Changes made to the returned Configuration will
* not update Zookeeper.
+ * @see FluoConfiguration#getAppConfiguration()
+ * @see Context#getAppConfiguration()
*/
SimpleConfiguration getAppConfiguration();
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 53a4819..c759dbb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -27,6 +27,8 @@
import com.google.common.base.Preconditions;
import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,8 +88,19 @@
/** The properties below get loaded into/from Zookeeper */
// Observer
+ @Deprecated
public static final String OBSERVER_PREFIX = FLUO_PREFIX + ".observer.";
+ /**
+ * @since 1.1.0
+ */
+ public static final String OBSERVER_PROVIDER = FLUO_PREFIX + ".observer.provider";
+
+ /**
+ * @since 1.1.0
+ */
+ public static final String OBSERVER_PROVIDER_DEFAULT = "";
+
// Transaction
public static final String TRANSACTION_PREFIX = FLUO_PREFIX + ".tx";
public static final String TRANSACTION_ROLLBACK_TIME_PROP = TRANSACTION_PREFIX + ".rollback.time";
@@ -281,6 +294,11 @@
return getPositiveInt(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
}
+ /**
+ * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+ * {@link #getObserverProvider()}
+ */
+ @Deprecated
public List<ObserverSpecification> getObserverSpecifications() {
List<ObserverSpecification> configList = new ArrayList<>();
@@ -344,6 +362,37 @@
return max + 1;
}
+ /**
+ * Configure the observer provider that Fluo workers will use.
+ *
+ * @since 1.1.0
+ *
+ * @param className Name of a class that implements {@link ObserverProvider}. Must be non-null and
+ * non-empty.
+ */
+ public void setObserverProvider(String className) {
+ setNonEmptyString(OBSERVER_PROVIDER, className);
+ }
+
+ /**
+ * Calls {@link #setObserverProvider(String)} with the class name.
+ *
+ * @since 1.1.0
+ */
+ public void setObserverProvider(Class<? extends ObserverProvider> clazz) {
+ setObserverProvider(clazz.getName());
+ }
+
+ /**
+ * @return The configured {@link ObserverProvider} class name. If one was not configured, returns
+ * {@value #OBSERVER_PROVIDER_DEFAULT}
+ * @since 1.1.0
+ */
+ public String getObserverProvider() {
+ return getString(OBSERVER_PROVIDER, OBSERVER_PROVIDER_DEFAULT);
+ }
+
+ @Deprecated
private void addObserver(ObserverSpecification oconf, int next) {
Map<String, String> params = oconf.getConfiguration().toMap();
StringBuilder paramString = new StringBuilder();
@@ -359,7 +408,11 @@
/**
* Adds an {@link ObserverSpecification} to the configuration using a unique integer prefix thats
* not currently in use.
+ *
+ * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+ * {@link #getObserverProvider()}
*/
+ @Deprecated
public FluoConfiguration addObserver(ObserverSpecification oconf) {
int next = getNextObserverId();
addObserver(oconf, next);
@@ -368,7 +421,11 @@
/**
* Adds multiple observers using unique integer prefixes for each.
+ *
+ * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+ * {@link #getObserverProvider()}
*/
+ @Deprecated
public FluoConfiguration addObservers(Iterable<ObserverSpecification> observers) {
int next = getNextObserverId();
for (ObserverSpecification oconf : observers) {
@@ -379,7 +436,11 @@
/**
* Removes any configured observers.
+ *
+ * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+ * {@link #getObserverProvider()}
*/
+ @Deprecated
public FluoConfiguration clearObservers() {
Iterator<String> iter1 = getKeys(OBSERVER_PREFIX.substring(0, OBSERVER_PREFIX.length() - 1));
while (iter1.hasNext()) {
@@ -429,7 +490,7 @@
* to subset will be reflected in this configuration, but with the prefix added. This
* method is useful for setting application configuration before initialization. For
* reading application configuration after initialization, see
- * {@link FluoClient#getAppConfiguration()}
+ * {@link FluoClient#getAppConfiguration()} and {@link Context#getAppConfiguration()}
*/
public SimpleConfiguration getAppConfiguration() {
return subset(APP_PREFIX);
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
index 76829c9..9ebb47b 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
@@ -26,7 +26,10 @@
* {@link FluoConfiguration#addObserver(ObserverSpecification)}.
*
* @since 1.0.0
+ * @deprecated since 1.1.0. The methods that used this class in {@link FluoConfiguration} were
+ * deprecated.
*/
+@Deprecated
public class ObserverSpecification {
private final String className;
private final Map<String, String> configMap;
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
index 077eac7..59c4321 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
@@ -58,7 +58,7 @@
private final int offset;
private final int length;
- private WeakReference<String> utf8String;
+ private transient WeakReference<String> utf8String;
public static final Bytes EMPTY = new Bytes(new byte[0]);
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java b/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
index 24505a5..6c191f9 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
@@ -26,7 +26,11 @@
* user.
*
* @since 1.0.0
+ * @deprecated since 1.1.0. This class was deprecated for two reasons. First the methods its
+ * overrides were deprecated. Second, the methods it overrides were made into Java 8
+ * default methods.
*/
+@Deprecated
public abstract class AbstractObserver implements Observer {
@Override
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
index 381784a..21f4a2a 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
@@ -17,6 +17,7 @@
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
@@ -24,13 +25,15 @@
/**
* Implemented by users to a watch a {@link Column} and be notified of changes to the Column via the
- * {@link #process(TransactionBase, Bytes, Column)} method. An observer is created for each worker
- * thread and reused for the lifetime of a worker thread. Consider extending
- * {@link AbstractObserver} as it will let you optionally implement {@link #init(Context)} and
- * {@link #close()}. The abstract class will also shield you from the addition of interface methods.
+ * {@link #process(TransactionBase, Bytes, Column)} method.
+ *
+ * <p>
+ * In Fluo version 1.1.0 this was converted to a functional interface. This change along with the
+ * introduction of {@link ObserverProvider} allows Observers to be written as lambdas.
*
* @since 1.0.0
*/
+@FunctionalInterface
public interface Observer {
/**
@@ -44,7 +47,9 @@
* A {@link Column} and {@link NotificationType} pair
*
* @since 1.0.0
+ * @deprecated since 1.1.0. The method that used this class was deprecated.
*/
+ @Deprecated
class ObservedColumn {
private final Column col;
private final NotificationType notificationType;
@@ -61,11 +66,22 @@
public NotificationType getType() {
return notificationType;
}
+
+ /**
+ * @since 1.1.0
+ */
+ @Override
+ public String toString() {
+ return col + " " + notificationType;
+ }
}
/**
* @since 1.0.0
+ *
+ * @deprecated since 1.1.0. The method that used this interface was deprecated.
*/
+ @Deprecated
interface Context {
/**
* @return A configuration object with application configuration like that returned by
@@ -88,8 +104,14 @@
* Implemented by user to initialize Observer.
*
* @param context Observer context
+ *
+ * @deprecated since 1.1.0. Fluo will no longer call this method when observers are configured by
+ * {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+ * observers are configured the old way by
+ * {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
*/
- void init(Context context) throws Exception;
+ @Deprecated
+ default void init(Context context) throws Exception {}
/**
* Implemented by users to process notifications on a {@link ObservedColumn}. If a notification
@@ -107,11 +129,25 @@
* then an exception will be thrown. It is safe to assume that {@link #init(Context)} will be
* called before this method. If the return value of the method is derived from what is passed to
* {@link #init(Context)}, then the derivation process should be deterministic.
+ *
+ * @deprecated since 1.1.0 Fluo will no longer call this method when observers are configured by
+ * {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+ * observers are configured the old way by
+ * {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
*/
- ObservedColumn getObservedColumn();
+ @Deprecated
+ default ObservedColumn getObservedColumn() {
+ throw new UnsupportedOperationException();
+ }
/**
* Implemented by user to close resources used by Observer
+ *
+ * @deprecated since 1.1.0. Fluo will no longer call this method when observers are configured by
+ * {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+ * observers are configured the old way by
+ * {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
*/
- void close();
+ @Deprecated
+ default void close() {}
}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
new file mode 100644
index 0000000..c172268
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.observer;
+
+import java.util.function.BiConsumer;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+/**
+ * Fluo Workers use this class to register {@link Observer}s to process notifications.
+ * Implementations of this class should register zero or more {@link Observer}s.
+ *
+ * <p>
+ * When Fluo is initialized {@link #provideColumns(BiConsumer, Context)} is called. The columns it
+ * registers are stored in Zookeeper. Transactions will use the columns stored in Zookeeper to
+ * determine when to set notifications. When Workers call {@link #provide(Registry, Context)}, the
+ * columns registered must be the same as those registered during initialization. If this is not the
+ * case, then the worker will fail to start.
+ *
+ * @see FluoConfiguration#setObserverProvider(String)
+ * @since 1.1.0
+ */
+public interface ObserverProvider {
+
+ /**
+ * @since 1.1.0
+ */
+ interface Context {
+ /**
+ * @return A configuration object with application configuration like that returned by
+ * {@link FluoClient#getAppConfiguration()}
+ */
+ SimpleConfiguration getAppConfiguration();
+
+ /**
+ * @return A {@link MetricsReporter} to report application metrics from observers.
+ */
+ MetricsReporter getMetricsReporter();
+ }
+
+ /**
+ * Observers are registered with the worker using this interface. Registering an {@link Observer}s
+ * relates it to the columns that trigger it.
+ *
+ * @since 1.1.0
+ */
+ interface Registry {
+ void register(Column observedColumn, NotificationType ntfyType, Observer observer);
+
+ /**
+ * This method was created to allow Observers written as lambda to be passed {@link String}
+ * instead of {@link Bytes} for the row.
+ *
+ * <pre>
+ * <code>
+ * void provide(ObserverRegistry or, Context ctx) {
+ * or.registers(someColumn, WEAK, (tx,row,col) -> {
+ * //row is of type String
+ * };
+ * }
+ * </code>
+ * </pre>
+ */
+ void registers(Column observedColumn, NotificationType ntfyType, StringObserver observer);
+ }
+
+ /**
+ * This is method is called by Fluo Workers to register observers to process notifications.
+ *
+ * <p>
+ * Observers registered may be called concurrently by multiple threads to process different
+ * notifications. Observers should be tolerant of this.
+ *
+ * @param or Register observers with this.
+ */
+ void provide(Registry or, Context ctx);
+
+ /**
+ * Called during Fluo initialization to determine what columns are being observed. The default
+ * implementation of this method calls {@link #provide(Registry, Context)} and ignores Observers.
+ *
+ * @param colRegistry register all observed columns with this consumer
+ */
+ default void provideColumns(BiConsumer<Column, NotificationType> colRegistry, Context ctx) {
+ Registry or = new Registry() {
+ @Override
+ public void registers(Column oc, NotificationType nt, StringObserver obs) {
+ colRegistry.accept(oc, nt);
+ }
+
+ @Override
+ public void register(Column oc, NotificationType nt, Observer obs) {
+ colRegistry.accept(oc, nt);
+ }
+ };
+
+ provide(or, ctx);
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java b/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java
new file mode 100644
index 0000000..f57a98e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.observer;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * @since 1.1.0
+ */
+@FunctionalInterface
+public interface StringObserver extends Observer {
+
+ @Override
+ default void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+ process(tx, row.toString(), col);
+ }
+
+ abstract void process(TransactionBase tx, String row, Column col) throws Exception;
+}
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index 38705db..3e36ba1 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -209,6 +209,7 @@
Assert.assertEquals("33", config.getReporterConfiguration("jmx").getString("frequency"));
}
+ @SuppressWarnings("deprecation")
private void assertIAE(String value) {
FluoConfiguration config = new FluoConfiguration();
try {
@@ -219,6 +220,7 @@
}
}
+ @SuppressWarnings("deprecation")
@Test
public void testObserverConfig() {
FluoConfiguration config = new FluoConfiguration();
@@ -247,6 +249,7 @@
Assert.assertEquals(0, ocList.get(0).getConfiguration().toMap().size());
}
+ @SuppressWarnings("deprecation")
@Test
public void testObserverConfig2() {
FluoConfiguration config = new FluoConfiguration();
@@ -374,7 +377,7 @@
c1.setAccumuloZookeepers("localhost:7171");
c1.setInstanceZookeepers("localhost:7171/testS");
c1.setWorkerThreads(100);
- c1.addObserver(new ObserverSpecification("com.foo.Observer1"));
+ c1.setObserverProvider("com.foo.MyObserverProvider");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(baos);
@@ -395,4 +398,22 @@
in.close();
}
+
+ @Test(expected = NullPointerException.class)
+ public void testNullObserverProvider() {
+ FluoConfiguration fc = new FluoConfiguration();
+ fc.setObserverProvider((String) null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyObserverProvider() {
+ FluoConfiguration fc = new FluoConfiguration();
+ fc.setObserverProvider("");
+ }
+
+ @Test
+ public void testNoObserverProvider() {
+ FluoConfiguration fc = new FluoConfiguration();
+ Assert.assertEquals("", fc.getObserverProvider());
+ }
}
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index de20ac5..d6784a4 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -26,6 +26,10 @@
<description>The modules contains the core implementation code of Apache Fluo.</description>
<dependencies>
<dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 8d059a5..62914e0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -15,6 +15,7 @@
package org.apache.fluo.core.client;
+import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.EnumSet;
@@ -38,17 +39,11 @@
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.api.observer.Observer.NotificationType;
-import org.apache.fluo.api.observer.Observer.ObservedColumn;
+import org.apache.fluo.core.observer.ObserverUtil;
import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.fluo.core.worker.ObserverContext;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -208,42 +203,6 @@
@Override
public void updateSharedConfig() {
- logger.info("Setting up observers using app config: {}", config.getAppConfiguration());
-
- Map<Column, ObserverSpecification> colObservers = new HashMap<>();
- Map<Column, ObserverSpecification> weakObservers = new HashMap<>();
- for (ObserverSpecification ospec : config.getObserverSpecifications()) {
-
- Observer observer;
- try {
- observer = Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance();
- } catch (ClassNotFoundException e1) {
- throw new FluoException("Observer class '" + ospec.getClassName() + "' was not "
- + "found. Check for class name misspellings or failure to include "
- + "the observer jar.", e1);
- } catch (InstantiationException | IllegalAccessException e2) {
- throw new FluoException("Observer class '" + ospec.getClassName()
- + "' could not be created.", e2);
- }
-
- SimpleConfiguration oc = ospec.getConfiguration();
- logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(),
- oc.toMap());
- try {
- observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), oc));
- } catch (Exception e) {
- throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized",
- e);
- }
-
- ObservedColumn observedCol = observer.getObservedColumn();
- if (observedCol.getType() == NotificationType.STRONG) {
- colObservers.put(observedCol.getColumn(), ospec);
- } else {
- weakObservers.put(observedCol.getColumn(), ospec);
- }
- }
-
Properties sharedProps = new Properties();
Iterator<String> iter = config.getKeys();
while (iter.hasNext()) {
@@ -257,8 +216,13 @@
try {
CuratorFramework curator = getAppCurator();
- Operations.updateObservers(curator, colObservers, weakObservers);
- Operations.updateSharedConfig(curator, sharedProps);
+ ObserverUtil.initialize(curator, config);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ sharedProps.store(baos, "Shared java props");
+
+ CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
+ CuratorUtil.NodeExistsPolicy.OVERWRITE);
} catch (Exception e) {
throw new FluoException("Failed to update shared configuration in Zookeeper", e);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java b/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java
deleted file mode 100644
index 16851a6..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.client;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.fluo.accumulo.util.ZookeeperPath;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.core.util.ColumnUtil;
-import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility methods for initializing Zookeeper and Accumulo
- */
-public class Operations {
-
- private Operations() {}
-
- private static final Logger logger = LoggerFactory.getLogger(Operations.class);
-
- // TODO refactor all method in this class to take a properties object... if so the prop keys would
- // need to be public
-
- public static void updateSharedConfig(CuratorFramework curator, Properties sharedProps)
- throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- sharedProps.store(baos, "Shared java props");
-
- CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
- CuratorUtil.NodeExistsPolicy.OVERWRITE);
- }
-
- public static void updateObservers(CuratorFramework curator,
- Map<Column, ObserverSpecification> colObservers,
- Map<Column, ObserverSpecification> weakObservers) throws Exception {
-
- // TODO check that no workers are running... or make workers watch this znode
-
- String observerPath = ZookeeperPath.CONFIG_FLUO_OBSERVERS;
- try {
- curator.delete().deletingChildrenIfNeeded().forPath(observerPath);
- } catch (NoNodeException nne) {
- // it's ok if node doesn't exist
- } catch (Exception e) {
- logger.error("An error occurred deleting Zookeeper node. node=[" + observerPath
- + "], error=[" + e.getMessage() + "]");
- throw new RuntimeException(e);
- }
-
- byte[] serializedObservers = serializeObservers(colObservers, weakObservers);
- CuratorUtil.putData(curator, observerPath, serializedObservers,
- CuratorUtil.NodeExistsPolicy.OVERWRITE);
- }
-
- private static void serializeObservers(DataOutputStream dos,
- Map<Column, ObserverSpecification> colObservers) throws IOException {
- // TODO use a human readable serialized format like json
-
- Set<Entry<Column, ObserverSpecification>> es = colObservers.entrySet();
-
- WritableUtils.writeVInt(dos, colObservers.size());
-
- for (Entry<Column, ObserverSpecification> entry : es) {
- ColumnUtil.writeColumn(entry.getKey(), dos);
- dos.writeUTF(entry.getValue().getClassName());
- Map<String, String> params = entry.getValue().getConfiguration().toMap();
- WritableUtils.writeVInt(dos, params.size());
- for (Entry<String, String> pentry : params.entrySet()) {
- dos.writeUTF(pentry.getKey());
- dos.writeUTF(pentry.getValue());
- }
- }
- }
-
- private static byte[] serializeObservers(Map<Column, ObserverSpecification> colObservers,
- Map<Column, ObserverSpecification> weakObservers) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (DataOutputStream dos = new DataOutputStream(baos)) {
- serializeObservers(dos, colObservers);
- serializeObservers(dos, weakObservers);
- }
-
- byte[] serializedObservers = baos.toByteArray();
- return serializedObservers;
- }
-}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 30b398d..4ad6ec9 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -16,17 +16,10 @@
package org.apache.fluo.core.impl;
import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.accumulo.core.client.Connector;
@@ -35,16 +28,14 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.metrics.MetricsReporter;
import org.apache.fluo.core.metrics.MetricNames;
import org.apache.fluo.core.metrics.MetricsReporterImpl;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.ObserverUtil;
import org.apache.fluo.core.util.AccumuloUtil;
-import org.apache.fluo.core.util.ColumnUtil;
import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.hadoop.io.WritableUtils;
/**
* Holds common environment configuration and shared resources
@@ -54,9 +45,7 @@
private String table;
private Authorizations auths = new Authorizations();
private String accumuloInstance;
- private Map<Column, ObserverSpecification> observers;
- private Map<Column, ObserverSpecification> weakObservers;
- private Set<Column> allObserversColumns;
+ private RegisteredObservers observers;
private Connector conn;
private String accumuloInstanceID;
private String fluoApplicationID;
@@ -105,8 +94,6 @@
this.auths = env.auths;
this.accumuloInstance = env.accumuloInstance;
this.observers = env.observers;
- this.weakObservers = env.weakObservers;
- this.allObserversColumns = env.allObserversColumns;
this.conn = env.conn;
this.accumuloInstanceID = env.accumuloInstanceID;
this.fluoApplicationID = env.fluoApplicationID;
@@ -136,18 +123,10 @@
new String(curator.getData().forPath(ZookeeperPath.CONFIG_ACCUMULO_TABLE),
StandardCharsets.UTF_8);
+ observers = ObserverUtil.load(curator);
+
ByteArrayInputStream bais =
- new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS));
- DataInputStream dis = new DataInputStream(bais);
-
- observers = Collections.unmodifiableMap(readObservers(dis));
- weakObservers = Collections.unmodifiableMap(readObservers(dis));
- allObserversColumns = new HashSet<>();
- allObserversColumns.addAll(observers.keySet());
- allObserversColumns.addAll(weakObservers.keySet());
- allObserversColumns = Collections.unmodifiableSet(allObserversColumns);
-
- bais = new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_SHARED));
+ new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_SHARED));
Properties sharedProps = new Properties();
sharedProps.load(bais);
@@ -164,29 +143,6 @@
}
}
- private static Map<Column, ObserverSpecification> readObservers(DataInputStream dis)
- throws IOException {
-
- HashMap<Column, ObserverSpecification> omap = new HashMap<>();
-
- int num = WritableUtils.readVInt(dis);
- for (int i = 0; i < num; i++) {
- Column col = ColumnUtil.readColumn(dis);
- String clazz = dis.readUTF();
- Map<String, String> params = new HashMap<>();
- int numParams = WritableUtils.readVInt(dis);
- for (int j = 0; j < numParams; j++) {
- String k = dis.readUTF();
- String v = dis.readUTF();
- params.put(k, v);
- }
-
- ObserverSpecification ospec = new ObserverSpecification(clazz, params);
- omap.put(col, ospec);
- }
-
- return omap;
- }
public void setAuthorizations(Authorizations auths) {
this.auths = auths;
@@ -216,14 +172,10 @@
return fluoApplicationID;
}
- public Map<Column, ObserverSpecification> getObservers() {
+ public RegisteredObservers getConfiguredObservers() {
return observers;
}
- public Map<Column, ObserverSpecification> getWeakObservers() {
- return weakObservers;
- }
-
public String getTable() {
return table;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index cd08482..6ff9437 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -42,6 +42,8 @@
import org.apache.fluo.core.util.FluoCondition;
import org.apache.fluo.core.util.SpanUtil;
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
/**
* This is utility code for either rolling forward or back failed transactions. A transaction is
* deemed to have failed if the reading transaction waited too long or the transactor id does not
@@ -242,7 +244,7 @@
LockValue lv = new LockValue(entry.getValue().get());
ColumnUtil.commitColumn(env, lv.isTrigger(), false, col, lv.isWrite(), lv.isDelete(), lockTs,
- commitTs, env.getObservers().keySet(), mut);
+ commitTs, env.getConfiguredObservers().getObservedColumns(STRONG), mut);
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index e8398f8..322ae77 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -78,6 +78,9 @@
import org.apache.fluo.core.util.Hex;
import org.apache.fluo.core.util.SpanUtil;
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
/**
* Transaction implementation
*/
@@ -125,9 +128,10 @@
this.env = env;
this.stats = new TxStats(env);
this.startTs = startTs;
- this.observedColumns = env.getObservers().keySet();
+ this.observedColumns = env.getConfiguredObservers().getObservedColumns(STRONG);
- if (trigger != null && env.getWeakObservers().containsKey(trigger.getColumn())) {
+ if (trigger != null
+ && env.getConfiguredObservers().getObservedColumns(WEAK).contains(trigger.getColumn())) {
this.weakNotification = trigger;
} else {
this.notification = trigger;
@@ -310,7 +314,7 @@
Objects.requireNonNull(row);
Objects.requireNonNull(col);
- if (!env.getWeakObservers().containsKey(col)) {
+ if (!env.getConfiguredObservers().getObservedColumns(WEAK).contains(col)) {
throw new IllegalArgumentException("Column not configured for weak notifications " + col);
}
@@ -1022,7 +1026,7 @@
HashMap<Bytes, Mutation> mutations = new HashMap<>();
- if (env.getObservers().containsKey(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
+ if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
Flutation m = new Flutation(env, cd.prow);
Notification.put(env, m, cd.pcol, commitTs);
mutations.put(cd.prow, m);
@@ -1031,7 +1035,7 @@
for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
- if (env.getObservers().containsKey(colUpdates.getKey())) {
+ if (observedColumns.contains(colUpdates.getKey())) {
Bytes val = colUpdates.getValue();
if (isWrite(val) && !isDelete(val)) {
Mutation m = mutations.get(rowUpdates.getKey());
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java
new file mode 100644
index 0000000..bac18c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+
+/*
+ * This interface enables abstracting the new and old way on configuring observers.
+ */
+public interface ObserverStore {
+ boolean handles(FluoConfiguration config);
+
+ void clear(CuratorFramework curator) throws Exception;
+
+ void update(CuratorFramework curator, FluoConfiguration config) throws Exception;
+
+ RegisteredObservers load(CuratorFramework curator) throws Exception;
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
new file mode 100644
index 0000000..0e2f94a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.v1.ObserverStoreV1;
+import org.apache.fluo.core.observer.v2.ObserverStoreV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ObserverUtil {
+
+ private static Logger logger = LoggerFactory.getLogger(ObserverUtil.class);
+
+ public static void initialize(CuratorFramework curator, FluoConfiguration config) {
+
+ logger.info("Setting up observers using app config: {}", config.getAppConfiguration());
+
+ ObserverStore ov1 = new ObserverStoreV1();
+ ObserverStore ov2 = new ObserverStoreV2();
+
+ if (ov1.handles(config) && ov2.handles(config)) {
+ throw new IllegalArgumentException(
+ "Old and new observers configuration present. There can only be one.");
+ }
+
+ try {
+ if (ov1.handles(config)) {
+ ov2.clear(curator);
+ ov1.update(curator, config);
+ } else if (ov2.handles(config)) {
+ ov1.clear(curator);
+ ov2.update(curator, config);
+ }
+ } catch (Exception e) {
+ throw new FluoException("Failed to update shared configuration in Zookeeper", e);
+ }
+ }
+
+ public static RegisteredObservers load(CuratorFramework curator) throws Exception {
+ ObserverStore ov1 = new ObserverStoreV1();
+ ObserverStore ov2 = new ObserverStoreV2();
+
+ // try to load observers using old and new config
+ RegisteredObservers co = ov1.load(curator);
+ if (co == null) {
+ co = ov2.load(curator);
+ }
+
+ if (co == null) {
+ // no observers configured, so return an empty provider
+ co = new RegisteredObservers() {
+ @Override
+ public Observers getObservers(Environment env) {
+ return new Observers() {
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void returnObserver(Observer o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Observer getObserver(Column col) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public Set<Column> getObservedColumns(NotificationType nt) {
+ return Collections.emptySet();
+ }
+ };
+ }
+
+ return co;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
new file mode 100644
index 0000000..d4cc366
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+
+public interface Observers extends AutoCloseable {
+ Observer getObserver(Column col);
+
+ void returnObserver(Observer o);
+
+ @Override
+ void close();
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java
new file mode 100644
index 0000000..0b07c1f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import java.util.Set;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.core.impl.Environment;
+
+public interface RegisteredObservers {
+ Set<Column> getObservedColumns(Observer.NotificationType nt);
+
+ Observers getObservers(Environment env);
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
similarity index 96%
rename from modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
rename to modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
index 47d5997..f37f2c1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
@@ -13,7 +13,7 @@
* the License.
*/
-package org.apache.fluo.core.worker;
+package org.apache.fluo.core.observer.v1;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.metrics.MetricsReporter;
@@ -21,6 +21,7 @@
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.metrics.DummyMetricsReporter;
+@SuppressWarnings("deprecation")
public class ObserverContext implements Observer.Context {
private final SimpleConfiguration observerConfig;
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
new file mode 100644
index 0000000..754ca28
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v1;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.Observer.ObservedColumn;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.Observers;
+import org.apache.fluo.core.observer.ObserverStore;
+import org.apache.fluo.core.util.ColumnUtil;
+import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Support for observers configured the old way.
+ */
+@SuppressWarnings("deprecation")
+public class ObserverStoreV1 implements ObserverStore {
+
+ private static final Logger logger = LoggerFactory.getLogger(ObserverStoreV1.class);
+
+ @Override
+ public boolean handles(FluoConfiguration config) {
+ Collection<ObserverSpecification> obsSpecs = config.getObserverSpecifications();
+ return !obsSpecs.isEmpty();
+ }
+
+ @Override
+ public void update(CuratorFramework curator, FluoConfiguration config) throws Exception {
+ Collection<ObserverSpecification> obsSpecs = config.getObserverSpecifications();
+
+ Map<Column, ObserverSpecification> colObservers = new HashMap<>();
+ Map<Column, ObserverSpecification> weakObservers = new HashMap<>();
+
+ for (ObserverSpecification ospec : obsSpecs) {
+ Observer observer;
+ try {
+ observer = Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance();
+ } catch (ClassNotFoundException e1) {
+ throw new FluoException("Observer class '" + ospec.getClassName() + "' was not "
+ + "found. Check for class name misspellings or failure to include "
+ + "the observer jar.", e1);
+ } catch (InstantiationException | IllegalAccessException e2) {
+ throw new FluoException("Observer class '" + ospec.getClassName()
+ + "' could not be created.", e2);
+ }
+
+ SimpleConfiguration oc = ospec.getConfiguration();
+ logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(),
+ oc.toMap());
+ try {
+ observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), oc));
+ } catch (Exception e) {
+ throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized",
+ e);
+ }
+
+ ObservedColumn observedCol = observer.getObservedColumn();
+ if (observedCol.getType() == NotificationType.STRONG) {
+ colObservers.put(observedCol.getColumn(), ospec);
+ } else {
+ weakObservers.put(observedCol.getColumn(), ospec);
+ }
+ }
+
+ updateObservers(curator, colObservers, weakObservers);
+ }
+
+ private static void updateObservers(CuratorFramework curator,
+ Map<Column, ObserverSpecification> colObservers,
+ Map<Column, ObserverSpecification> weakObservers) throws Exception {
+
+ // TODO check that no workers are running... or make workers watch this znode
+
+ String observerPath = ZookeeperPath.CONFIG_FLUO_OBSERVERS1;
+ try {
+ curator.delete().deletingChildrenIfNeeded().forPath(observerPath);
+ } catch (NoNodeException nne) {
+ // it's ok if node doesn't exist
+ } catch (Exception e) {
+ logger.error("An error occurred deleting Zookeeper node. node=[" + observerPath
+ + "], error=[" + e.getMessage() + "]");
+ throw new RuntimeException(e);
+ }
+
+ byte[] serializedObservers = serializeObservers(colObservers, weakObservers);
+ CuratorUtil.putData(curator, observerPath, serializedObservers,
+ CuratorUtil.NodeExistsPolicy.OVERWRITE);
+ }
+
+ private static void serializeObservers(DataOutputStream dos,
+ Map<Column, ObserverSpecification> colObservers) throws IOException {
+ // TODO use a human readable serialized format like json
+
+ Set<Entry<Column, ObserverSpecification>> es = colObservers.entrySet();
+
+ WritableUtils.writeVInt(dos, colObservers.size());
+
+ for (Entry<Column, ObserverSpecification> entry : es) {
+ ColumnUtil.writeColumn(entry.getKey(), dos);
+ dos.writeUTF(entry.getValue().getClassName());
+ Map<String, String> params = entry.getValue().getConfiguration().toMap();
+ WritableUtils.writeVInt(dos, params.size());
+ for (Entry<String, String> pentry : params.entrySet()) {
+ dos.writeUTF(pentry.getKey());
+ dos.writeUTF(pentry.getValue());
+ }
+ }
+ }
+
+ private static byte[] serializeObservers(Map<Column, ObserverSpecification> colObservers,
+ Map<Column, ObserverSpecification> weakObservers) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStream dos = new DataOutputStream(baos)) {
+ serializeObservers(dos, colObservers);
+ serializeObservers(dos, weakObservers);
+ }
+
+ byte[] serializedObservers = baos.toByteArray();
+ return serializedObservers;
+ }
+
+
+ private static Map<Column, ObserverSpecification> readObservers(DataInputStream dis)
+ throws IOException {
+
+ HashMap<Column, ObserverSpecification> omap = new HashMap<>();
+
+ int num = WritableUtils.readVInt(dis);
+ for (int i = 0; i < num; i++) {
+ Column col = ColumnUtil.readColumn(dis);
+ String clazz = dis.readUTF();
+ Map<String, String> params = new HashMap<>();
+ int numParams = WritableUtils.readVInt(dis);
+ for (int j = 0; j < numParams; j++) {
+ String k = dis.readUTF();
+ String v = dis.readUTF();
+ params.put(k, v);
+ }
+
+ ObserverSpecification ospec = new ObserverSpecification(clazz, params);
+ omap.put(col, ospec);
+ }
+
+ return omap;
+ }
+
+ @Override
+ public RegisteredObservers load(CuratorFramework curator) throws Exception {
+
+ Map<Column, ObserverSpecification> observers;
+ Map<Column, ObserverSpecification> weakObservers;
+
+ ByteArrayInputStream bais;
+ try {
+ bais =
+ new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS1));
+ } catch (NoNodeException nne) {
+ return null;
+ }
+ DataInputStream dis = new DataInputStream(bais);
+
+ observers = Collections.unmodifiableMap(readObservers(dis));
+ weakObservers = Collections.unmodifiableMap(readObservers(dis));
+
+
+ return new RegisteredObservers() {
+
+ @Override
+ public Observers getObservers(Environment env) {
+ return new ObserversV1(env, observers, weakObservers);
+ }
+
+ @Override
+ public Set<Column> getObservedColumns(NotificationType nt) {
+ switch (nt) {
+ case STRONG:
+ return observers.keySet();
+ case WEAK:
+ return weakObservers.keySet();
+ default:
+ throw new IllegalArgumentException("Unknown notification type " + nt);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void clear(CuratorFramework curator) throws Exception {
+ try {
+ curator.delete().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS1);
+ } catch (NoNodeException nne) {
+ // nothing to delete
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
similarity index 83%
rename from modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java
rename to modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
index 285a69a..65c1b2f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
@@ -13,7 +13,7 @@
* the License.
*/
-package org.apache.fluo.core.worker;
+package org.apache.fluo.core.observer.v1;
import java.util.ArrayList;
import java.util.HashMap;
@@ -24,15 +24,19 @@
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.Observers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Observers implements AutoCloseable {
+@SuppressWarnings("deprecation")
+class ObserversV1 implements Observers {
- private static final Logger log = LoggerFactory.getLogger(Observers.class);
+ private static final Logger log = LoggerFactory.getLogger(ObserversV1.class);
private Environment env;
Map<Column, List<Observer>> observers = new HashMap<>();
+ Map<Column, ObserverSpecification> strongObservers;
+ Map<Column, ObserverSpecification> weakObservers;
private List<Observer> getObserverList(Column col) {
List<Observer> observerList;
@@ -46,8 +50,11 @@
return observerList;
}
- public Observers(Environment env) {
+ public ObserversV1(Environment env, Map<Column, ObserverSpecification> strongObservers,
+ Map<Column, ObserverSpecification> weakObservers) {
this.env = env;
+ this.strongObservers = strongObservers;
+ this.weakObservers = weakObservers;
}
public Observer getObserver(Column col) {
@@ -63,9 +70,9 @@
Observer observer = null;
- ObserverSpecification observerConfig = env.getObservers().get(col);
+ ObserverSpecification observerConfig = strongObservers.get(col);
if (observerConfig == null) {
- observerConfig = env.getWeakObservers().get(col);
+ observerConfig = weakObservers.get(col);
}
if (observerConfig != null) {
@@ -119,4 +126,5 @@
observers = null;
}
+
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java
new file mode 100644
index 0000000..3787be3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+/**
+ * this class created for json serialization
+ */
+class JsonObservedColumn {
+ private byte[] fam;
+ private byte[] qual;
+ private byte[] vis;
+ private String notificationType;
+
+ JsonObservedColumn(Column col, NotificationType nt) {
+ this.fam = col.getFamily().toArray();
+ this.qual = col.getQualifier().toArray();
+ this.vis = col.getVisibility().toArray();
+ this.notificationType = nt.name();
+ }
+
+ public Column getColumn() {
+ return new Column(Bytes.of(fam), Bytes.of(qual), Bytes.of(vis));
+ }
+
+ public NotificationType getNotificationType() {
+ return NotificationType.valueOf(notificationType);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
new file mode 100644
index 0000000..44f229b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * this class created for json serialization
+ */
+class JsonObservers {
+ String obsProviderClass;
+ List<JsonObservedColumn> observedColumns;
+
+ JsonObservers(String obsProviderClass, Map<Column, NotificationType> columns) {
+ this.obsProviderClass = obsProviderClass;
+ this.observedColumns =
+ columns.entrySet().stream()
+ .map(entry -> new JsonObservedColumn(entry.getKey(), entry.getValue()))
+ .collect(toList());
+ }
+
+ public String getObserverProviderClass() {
+ return obsProviderClass;
+ }
+
+ public Map<Column, NotificationType> getObservedColumns() {
+ return observedColumns.stream().collect(
+ toMap(JsonObservedColumn::getColumn, JsonObservedColumn::getNotificationType));
+ }
+
+ @Override
+ public String toString() {
+ return obsProviderClass + " " + getObservedColumns();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
similarity index 70%
copy from modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
copy to modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
index 47d5997..3d2b1f3 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
@@ -13,30 +13,27 @@
* the License.
*/
-package org.apache.fluo.core.worker;
+package org.apache.fluo.core.observer.v2;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.metrics.MetricsReporter;
-import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.metrics.DummyMetricsReporter;
-public class ObserverContext implements Observer.Context {
+public class ObserverProviderContextImpl implements Context {
- private final SimpleConfiguration observerConfig;
- private final SimpleConfiguration appConfig;
+ private SimpleConfiguration appConfig;
private final Environment env;
- public ObserverContext(SimpleConfiguration appConfig, SimpleConfiguration observerConfig) {
+ public ObserverProviderContextImpl(SimpleConfiguration appConfig) {
this.appConfig = appConfig;
- this.observerConfig = observerConfig;
this.env = null;
}
- public ObserverContext(Environment env, SimpleConfiguration observerConfig) {
+ public ObserverProviderContextImpl(Environment env) {
this.env = env;
this.appConfig = null;
- this.observerConfig = observerConfig;
}
@Override
@@ -48,15 +45,11 @@
}
@Override
- public SimpleConfiguration getObserverConfiguration() {
- return observerConfig;
- }
-
- @Override
public MetricsReporter getMetricsReporter() {
if (env == null) {
return new DummyMetricsReporter();
}
return env.getMetricsReporter();
}
+
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
new file mode 100644
index 0000000..2e544cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.Observers;
+import org.apache.fluo.core.observer.ObserverStore;
+import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.fluo.accumulo.util.ZookeeperPath.CONFIG_FLUO_OBSERVERS2;
+
+/*
+ * Support for observers configured the new way.
+ */
+public class ObserverStoreV2 implements ObserverStore {
+
+ @Override
+ public boolean handles(FluoConfiguration config) {
+ return !config.getObserverProvider().isEmpty();
+ }
+
+ @Override
+ public void update(CuratorFramework curator, FluoConfiguration config) throws Exception {
+ String obsProviderClass = config.getObserverProvider();
+
+ ObserverProvider observerProvider = newObserverProvider(obsProviderClass);
+
+ Map<Column, NotificationType> obsCols = new HashMap<>();
+ BiConsumer<Column, NotificationType> obsColConsumer = (col, nt) -> {
+ Objects.requireNonNull(col, "Observed column must be non-null");
+ Objects.requireNonNull(nt, "Notification type must be non-null");
+ Preconditions.checkArgument(!obsCols.containsKey(col), "Duplicate observed column %s", col);
+ obsCols.put(col, nt);
+ };
+
+ observerProvider.provideColumns(obsColConsumer,
+ new ObserverProviderContextImpl(config.getAppConfiguration()));
+
+ Gson gson = new Gson();
+ String json = gson.toJson(new JsonObservers(obsProviderClass, obsCols));
+ CuratorUtil.putData(curator, CONFIG_FLUO_OBSERVERS2, json.getBytes(UTF_8),
+ CuratorUtil.NodeExistsPolicy.OVERWRITE);
+
+ }
+
+ static ObserverProvider newObserverProvider(String obsProviderClass) {
+ ObserverProvider observerProvider;
+ try {
+ observerProvider =
+ Class.forName(obsProviderClass).asSubclass(ObserverProvider.class).newInstance();
+ } catch (ClassNotFoundException e1) {
+ throw new FluoException("ObserverProvider class '" + obsProviderClass + "' was not "
+ + "found. Check for class name misspellings or failure to include "
+ + "the observer provider jar.", e1);
+ } catch (InstantiationException | IllegalAccessException e2) {
+ throw new FluoException("ObserverProvider class '" + obsProviderClass
+ + "' could not be created.", e2);
+ }
+ return observerProvider;
+ }
+
+ @Override
+ public RegisteredObservers load(CuratorFramework curator) throws Exception {
+ byte[] data;
+ try {
+ data = curator.getData().forPath(CONFIG_FLUO_OBSERVERS2);
+ } catch (NoNodeException nne) {
+ return null;
+ }
+ String json = new String(data, UTF_8);
+ JsonObservers jco = new Gson().fromJson(json, JsonObservers.class);
+
+ Set<Column> weakColumns = new HashSet<>();
+ Set<Column> strongColumns = new HashSet<>();
+
+ for (Entry<Column, NotificationType> entry : jco.getObservedColumns().entrySet()) {
+ switch (entry.getValue()) {
+ case STRONG:
+ strongColumns.add(entry.getKey());
+ break;
+ case WEAK:
+ weakColumns.add(entry.getKey());
+ break;
+ default:
+ throw new IllegalStateException("Unknown notification type " + entry.getValue());
+ }
+ }
+
+ return new RegisteredObservers() {
+
+ @Override
+ public Observers getObservers(Environment env) {
+ return new ObserversV2(env, jco, strongColumns, weakColumns);
+ }
+
+ @Override
+ public Set<Column> getObservedColumns(NotificationType nt) {
+ switch (nt) {
+ case STRONG:
+ return strongColumns;
+ case WEAK:
+ return weakColumns;
+ default:
+ throw new IllegalArgumentException("Unknown notification type " + nt);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void clear(CuratorFramework curator) throws Exception {
+ try {
+ curator.delete().forPath(CONFIG_FLUO_OBSERVERS2);
+ } catch (NoNodeException nne) {
+ // nothing to delete
+ }
+ }
+
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
new file mode 100644
index 0000000..d9d4a97
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.ObserverProvider.Registry;
+import org.apache.fluo.api.observer.StringObserver;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.Observers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ObserversV2 implements Observers {
+
+ private static final Logger log = LoggerFactory.getLogger(ObserversV2.class);
+
+ Map<Column, Observer> observers;
+
+ public ObserversV2(Environment env, JsonObservers jco, Set<Column> strongColumns,
+ Set<Column> weakColumns) {
+ observers = new HashMap<>();
+
+ ObserverProvider obsProvider =
+ ObserverStoreV2.newObserverProvider(jco.getObserverProviderClass());
+
+ ObserverProviderContextImpl ctx = new ObserverProviderContextImpl(env);
+
+ Registry or = new Registry() {
+
+ @Override
+ public void register(Column col, NotificationType nt, Observer obs) {
+ try {
+ Method closeMethod = obs.getClass().getMethod("close");
+ if (!closeMethod.getDeclaringClass().equals(Observer.class)) {
+ log.warn(
+ "Observer {} implements close(). Close is not called on Observers created using ObserverProvider."
+ + " Close is only called on Observers configured the old way.", obs.getClass()
+ .getName());
+ }
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException("Failed to check if close() is implemented", e);
+ }
+
+ if (nt == NotificationType.STRONG && !strongColumns.contains(col)) {
+ throw new IllegalArgumentException("Column " + col
+ + " not previously configured for strong notifications");
+ }
+
+ if (nt == NotificationType.WEAK && !weakColumns.contains(col)) {
+ throw new IllegalArgumentException("Column " + col
+ + " not previously configured for weak notifications");
+ }
+
+ if (observers.containsKey(col)) {
+ throw new IllegalArgumentException("Duplicate observed column " + col);
+ }
+
+ observers.put(col, obs);
+ }
+
+ @Override
+ public void registers(Column col, NotificationType nt, StringObserver obs) {
+ register(col, nt, obs);
+ }
+ };
+
+ obsProvider.provide(or, ctx);
+
+ // the following check ensures observers are provided for all previously configured columns
+ SetView<Column> diff =
+ Sets.difference(observers.keySet(), Sets.union(strongColumns, weakColumns));
+ if (diff.size() > 0) {
+ throw new FluoException("ObserverProvider " + jco.getObserverProviderClass()
+ + " did not provide observers for columns " + diff);
+ }
+ }
+
+ @Override
+ public Observer getObserver(Column col) {
+ return observers.get(col);
+ }
+
+ @Override
+ public void returnObserver(Observer o) {}
+
+ @Override
+ public void close() {}
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index 1834835..56aa78d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -29,6 +29,7 @@
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.observer.Observers;
import org.apache.fluo.core.util.FluoExecutors;
import org.apache.fluo.core.util.Hex;
import org.slf4j.Logger;
@@ -50,7 +51,7 @@
this.queue = new PriorityBlockingQueue<>();
this.executor = FluoExecutors.newFixedThreadPool(numThreads, queue, "ntfyProc");
this.tracker = new NotificationTracker();
- this.observers = new Observers(env);
+ this.observers = env.getConfiguredObservers().getObservers(env);
env.getSharedResources().getMetricRegistry()
.register(env.getMetricNames().getNotificationQueued(), new Gauge<Integer>() {
@Override
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index 895fc21..ff582ed 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -23,6 +23,7 @@
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl;
import org.apache.fluo.core.log.TracingTransaction;
+import org.apache.fluo.core.observer.Observers;
import org.apache.fluo.core.util.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/modules/distribution/src/main/config/fluo.properties b/modules/distribution/src/main/config/fluo.properties
index ad0c97f..f9f603d 100644
--- a/modules/distribution/src/main/config/fluo.properties
+++ b/modules/distribution/src/main/config/fluo.properties
@@ -60,10 +60,9 @@
# Observer properties
# -------------------
-# Specifies observers
-# fluo.observer.0=com.foo.Observer1
-# Can optionally have configuration key values
-# fluo.observer.1=com.foo.Observer2,configKey1=configVal1,configKey2=configVal2
+# Specifies an observer provider. This should be the name of a class that
+# implements org.apache.fluo.api.observer.ObserverProvider.
+#fluo.observer.provider=com.foo.AppObserverProvider
# Transaction properties
# ----------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
index 8f6b425..8f644eb 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
@@ -16,8 +16,6 @@
package org.apache.fluo.integration;
import java.io.File;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.Connector;
@@ -30,8 +28,8 @@
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -80,8 +78,15 @@
conn = miniAccumulo.getConnector(USER, new PasswordToken(PASSWORD));
}
- protected List<ObserverSpecification> getObservers() {
- return Collections.emptyList();
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return null;
+ }
+
+ protected void setupObservers(FluoConfiguration fc) {
+ Class<? extends ObserverProvider> ofc = getObserverProviderClass();
+ if (ofc != null) {
+ fc.setObserverProvider(ofc);
+ }
}
public String getCurTableName() {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
index 656a5d9..481e6e8 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -75,7 +75,7 @@
config.setAccumuloZookeepers(miniAccumulo.getZooKeepers());
config.setInstanceZookeepers(miniAccumulo.getZooKeepers() + "/fluo");
config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
- config.addObservers(getObservers());
+ setupObservers(config);
config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
config.setMiniStartAccumulo(false);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
index 0d28381..bcef827 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
@@ -49,7 +49,7 @@
config.setInstanceZookeepers(miniAccumulo.getZooKeepers() + "/fluo");
config.setAccumuloTable(getNextTableName());
config.setWorkerThreads(5);
- config.addObservers(getObservers());
+ setupObservers(config);
config.setMiniStartAccumulo(false);
setConfig(config);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
index 22f8ea2..7c48cc3 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
@@ -15,9 +15,6 @@
package org.apache.fluo.integration.impl;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
@@ -25,25 +22,28 @@
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
public class AppConfigIT extends ITBaseMini {
+ public static final Column DF_COL = new Column("data", "foo");
+ public static final Column DB_COL = new Column("data", "bar");
+
@Override
protected void setAppConfig(SimpleConfiguration config) {
config.setProperty("myapp.sizeLimit", 50000);
}
@Override
- protected List<ObserverSpecification> getObservers() {
- return Collections.singletonList(new ObserverSpecification(TestObserver.class.getName()));
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return TestObserverProvider.class;
}
@Test
@@ -89,32 +89,24 @@
public void load(TransactionBase tx, Context context) throws Exception {
int limit = context.getAppConfiguration().getInt("myapp.sizeLimit");
if (data < limit) {
- tx.set(row, new Column("data", "foo"), Integer.toString(data));
+ tx.set(row, DF_COL, Integer.toString(data));
}
}
}
- public static class TestObserver extends AbstractObserver {
-
- private int limit;
-
+ public static class TestObserverProvider implements ObserverProvider {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(new Column("data", "foo"), NotificationType.STRONG);
+ public void provide(Registry or, Context ctx) {
+ int limit = ctx.getAppConfiguration().getInt("myapp.sizeLimit");
+
+ or.registers(DF_COL, STRONG, (tx, row, col) -> {
+ int d = Integer.parseInt(tx.gets(row, col));
+ if (2 * d < limit) {
+ tx.set(row.toString(), DB_COL, Integer.toString(2 * d));
+ }
+ });
}
- @Override
- public void init(Context context) {
- limit = context.getAppConfiguration().getInt("myapp.sizeLimit");
- }
-
- @Override
- public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
- int d = Integer.parseInt(tx.gets(row.toString(), col));
- if (2 * d < limit) {
- tx.set(row.toString(), new Column("data", "bar"), Integer.toString(2 * d));
- }
- }
}
@Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
index 3af4fdc..f927102 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -15,9 +15,7 @@
package org.apache.fluo.integration.impl;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
@@ -32,11 +30,9 @@
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.fluo.integration.ITBaseMini;
@@ -81,27 +77,22 @@
}
}
- public static class TotalObserver extends AbstractObserver {
-
+ public static class CollisionObserverProvider implements ObserverProvider {
@Override
- public Observer.ObservedColumn getObservedColumn() {
- return new Observer.ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
- }
+ public void provide(Registry or, Context ctx) {
+ or.registers(STAT_CHANGED, NotificationType.WEAK, (tx, row, col) -> {
+ int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
+ int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
- @Override
- public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
- String row = rowBytes.toString();
- int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
- int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
-
- tx.set(row, STAT_PROCESSED, total + "");
- TestUtil.increment(tx, "all", STAT_TOTAL, total - processed);
+ tx.set(row, STAT_PROCESSED, total + "");
+ TestUtil.increment(tx, "all", STAT_TOTAL, total - processed);
+ });
}
}
@Override
- protected List<ObserverSpecification> getObservers() {
- return Collections.singletonList(new ObserverSpecification(TotalObserver.class.getName()));
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return CollisionObserverProvider.class;
}
@Override
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
index 6b4d279..d86f2ef 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -15,9 +15,7 @@
package org.apache.fluo.integration.impl;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
@@ -32,12 +30,12 @@
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.CommitException;
import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.exceptions.StaleScanException;
import org.apache.fluo.core.impl.Notification;
@@ -54,6 +52,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
import static org.apache.fluo.integration.BankUtil.BALANCE;
public class FailureIT extends ITBaseImpl {
@@ -61,22 +60,21 @@
@Rule
public ExpectedException exception = ExpectedException.none();
- public static class NullObserver extends AbstractObserver {
-
+ public static class NullObserver implements Observer {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {}
+ }
+ public static class FailuresObserverProvider implements ObserverProvider {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(new Column("attr", "lastupdate"), NotificationType.STRONG);
+ public void provide(Registry or, Context ctx) {
+ or.register(new Column("attr", "lastupdate"), STRONG, new NullObserver());
}
}
@Override
- protected List<ObserverSpecification> getObservers() {
- List<ObserverSpecification> observed = new ArrayList<>();
- observed.add(new ObserverSpecification(NullObserver.class.getName()));
- return observed;
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return FailuresObserverProvider.class;
}
@Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index e67d4d9..288bed9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -18,7 +18,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
@@ -28,16 +27,15 @@
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
@@ -52,23 +50,19 @@
public class FluoIT extends ITBaseImpl {
- public static class BalanceObserver extends AbstractObserver {
-
+ public static class FluoITObserverProvider implements ObserverProvider {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(BALANCE, NotificationType.STRONG);
- }
-
- @Override
- public void process(TransactionBase tx, Bytes row, Column col) {
- Assert.fail();
+ public void provide(Registry or, Context ctx) {
+ or.register(BALANCE, NotificationType.STRONG, (tx, row, col) -> {
+ Assert.fail();
+ });
}
}
@Override
- protected List<org.apache.fluo.api.config.ObserverSpecification> getObservers() {
- return Arrays.asList(new ObserverSpecification(BalanceObserver.class.getName()));
- };
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return FluoITObserverProvider.class;
+ }
@Test
public void testFluoFactory() throws Exception {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
index b496955..e0a08ac 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
@@ -15,8 +15,6 @@
package org.apache.fluo.integration.impl;
-import java.util.Collections;
-import java.util.List;
import java.util.Map.Entry;
import com.google.common.collect.Iterables;
@@ -24,14 +22,14 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.util.ColumnConstants;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestTransaction;
-import org.apache.fluo.integration.impl.WeakNotificationIT.SimpleObserver;
+import org.apache.fluo.integration.impl.WeakNotificationIT.WeakNotificationITObserverProvider;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -60,8 +58,8 @@
}
@Override
- protected List<ObserverSpecification> getObservers() {
- return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return WeakNotificationITObserverProvider.class;
}
@Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
index bfd8e25..5c9c02f 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
@@ -23,6 +23,7 @@
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
@@ -35,6 +36,7 @@
import org.junit.Assert;
import org.junit.Test;
+@Deprecated
public class ObserverConfigIT extends ITBaseMini {
public static class ConfigurableObserver extends AbstractObserver {
@@ -95,7 +97,7 @@
}
@Override
- protected List<ObserverSpecification> getObservers() {
+ protected void setupObservers(FluoConfiguration fc) {
List<ObserverSpecification> observers = new ArrayList<>();
observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
@@ -108,7 +110,7 @@
observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
"observedCol", "fam1:col3:" + NotificationType.WEAK, "outputCQ", "col4")));
- return observers;
+ fc.addObservers(observers);
}
@Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
index fe4b0d6..5381952 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
@@ -22,14 +22,16 @@
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
/**
* Test an observer notifying the column its observing. This is a useful pattern for exporting data.
*/
@@ -39,15 +41,9 @@
private static final Column EXPORT_CHECK_COL = new Column("export", "check");
private static final Column EXPORT_COUNT_COL = new Column("export", "count");
- @Override
- protected List<ObserverSpecification> getObservers() {
- return Collections.singletonList(new ObserverSpecification(ExportingObserver.class.getName()));
- }
-
private static List<String> exports = new ArrayList<>();
- public static class ExportingObserver extends AbstractObserver {
-
+ public static class ExportingObserver implements Observer {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
String r = row.toString();
@@ -69,13 +65,20 @@
private void export(Bytes row, String exportCount) {
exports.add(exportCount);
}
+ }
+ public static class SelfNtfyObserverProvider implements ObserverProvider {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(EXPORT_COUNT_COL, NotificationType.STRONG);
+ public void provide(Registry or, Context ctx) {
+ or.register(EXPORT_COUNT_COL, STRONG, new ExportingObserver());
}
}
+ @Override
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return SelfNtfyObserverProvider.class;
+ }
+
@Test
public void test1() throws Exception {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
index 1d065e1..ce002cb 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
@@ -15,15 +15,10 @@
package org.apache.fluo.integration.impl;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.impl.TransactorNode;
@@ -32,30 +27,26 @@
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
public class StrongNotificationIT extends ITBaseMini {
private static final Column OC = new Column("f", "q");
private static final Column RC = new Column("f", "r");
- public static class SimpleObserver extends AbstractObserver {
-
+ public static class StrongNtfyObserverProvider implements ObserverProvider {
@Override
- public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
- String r = row.toString();
-
- String v = tx.gets(r, col);
- tx.set(v, RC, r);
- }
-
- @Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(OC, NotificationType.STRONG);
+ public void provide(Registry or, Context ctx) {
+ or.register(OC, STRONG, (tx, row, col) -> {
+ Bytes v = tx.get(row, col);
+ tx.set(v, RC, row);
+ });
}
}
@Override
- protected List<ObserverSpecification> getObservers() {
- return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return StrongNtfyObserverProvider.class;
}
@Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
index 22c6632..2bd4ce9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
@@ -15,18 +15,15 @@
package org.apache.fluo.integration.impl;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
@@ -36,13 +33,14 @@
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
public class WeakNotificationIT extends ITBaseMini {
private static final Column STAT_COUNT = new Column("stat", "count");
private static final Column STAT_CHECK = new Column("stat", "check");
- public static class SimpleObserver extends AbstractObserver {
-
+ public static class SimpleObserver implements Observer {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
@@ -61,16 +59,18 @@
tx.set(row.toString(), STAT_COUNT, sum + "");
}
}
+ }
+ public static class WeakNotificationITObserverProvider implements ObserverProvider {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(STAT_CHECK, NotificationType.WEAK);
+ public void provide(Registry or, Context ctx) {
+ or.register(STAT_CHECK, WEAK, new SimpleObserver());
}
}
@Override
- protected List<ObserverSpecification> getObservers() {
- return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return WeakNotificationITObserverProvider.class;
}
@Test
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
index 13bb7a7..9dcf6dd 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
@@ -15,9 +15,7 @@
package org.apache.fluo.integration.impl;
-import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.Scanner;
@@ -25,11 +23,10 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
import org.apache.fluo.core.oracle.Stamp;
@@ -39,38 +36,35 @@
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
public class WeakNotificationOverlapIT extends ITBaseImpl {
private static final Column STAT_TOTAL = new Column("stat", "total");
private static final Column STAT_PROCESSED = new Column("stat", "processed");
private static final Column STAT_CHANGED = new Column("stat", "changed");
- public static class TotalObserver extends AbstractObserver {
-
- @Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
+ private static final StringObserver TOTAL_OBSERVER = (tx, row, col) -> {
+ String totalStr = tx.gets(row, STAT_TOTAL);
+ if (totalStr == null) {
+ return;
}
+ Integer total = Integer.parseInt(totalStr);
+ int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
+ tx.set(row, new Column("stat", "processed"), total + "");
+ TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
+ };
+ public static class WeakNtfyObserverProvider implements ObserverProvider {
@Override
- public void process(TransactionBase tx, Bytes row, Column col) {
- String r = row.toString();
- String totalStr = tx.gets(r, STAT_TOTAL);
- if (totalStr == null) {
- return;
- }
- Integer total = Integer.parseInt(totalStr);
- int processed = TestUtil.getOrDefault(tx, r, STAT_PROCESSED, 0);
- tx.set(r, new Column("stat", "processed"), total + "");
- TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
+ public void provide(Registry or, Context ctx) {
+ or.registers(STAT_CHANGED, WEAK, TOTAL_OBSERVER);
}
}
-
-
@Override
- protected List<ObserverSpecification> getObservers() {
- return Collections.singletonList(new ObserverSpecification(TotalObserver.class.getName()));
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return WeakNtfyObserverProvider.class;
}
@Test
@@ -92,7 +86,7 @@
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx2, Bytes.of("1"), STAT_CHANGED);
+ TOTAL_OBSERVER.process(ttx2, Bytes.of("1"), STAT_CHANGED);
// should not delete notification created by ttx3
ttx2.done();
@@ -103,7 +97,7 @@
Assert.assertEquals(1, countNotifications());
TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
- new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
+ TOTAL_OBSERVER.process(ttx4, Bytes.of("1"), STAT_CHANGED);
ttx4.done();
Assert.assertEquals(0, countNotifications());
@@ -132,7 +126,7 @@
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx6, Bytes.of("1"), STAT_CHANGED);
+ TOTAL_OBSERVER.process(ttx6, Bytes.of("1"), STAT_CHANGED);
// should not delete notification created by ttx7
ttx6.done();
@@ -143,7 +137,7 @@
snap3.done();
TestTransaction ttx8 = new TestTransaction(env, "1", STAT_CHANGED);
- new TotalObserver().process(ttx8, Bytes.of("1"), STAT_CHANGED);
+ TOTAL_OBSERVER.process(ttx8, Bytes.of("1"), STAT_CHANGED);
ttx8.done();
Assert.assertEquals(0, countNotifications());
@@ -182,7 +176,7 @@
Assert.assertEquals(1, countNotifications());
- new TotalObserver().process(ttx3, Bytes.of("1"), STAT_CHANGED);
+ TOTAL_OBSERVER.process(ttx3, Bytes.of("1"), STAT_CHANGED);
ttx3.done();
Assert.assertEquals(1, countNotifications());
@@ -191,7 +185,7 @@
}
TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
- new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
+ TOTAL_OBSERVER.process(ttx4, Bytes.of("1"), STAT_CHANGED);
ttx4.done();
Assert.assertEquals(0, countNotifications());
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index 2406b82..9db0ce4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -15,23 +15,19 @@
package org.apache.fluo.integration.impl;
-import java.util.Collections;
-import java.util.List;
-
import com.google.common.collect.Iterables;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl.CommitData;
+import org.apache.fluo.core.observer.Observers;
import org.apache.fluo.core.worker.NotificationFinder;
-import org.apache.fluo.core.worker.Observers;
import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestTransaction;
@@ -39,6 +35,8 @@
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
/**
* A simple test that added links between nodes in a graph. There is an observer that updates an
* index of node degree.
@@ -50,21 +48,10 @@
private static Column observedColumn = LAST_UPDATE;
- @Override
- protected List<ObserverSpecification> getObservers() {
- return Collections.singletonList(new ObserverSpecification(DegreeIndexer.class.getName()));
- }
-
- public static class DegreeIndexer implements Observer {
+ public static class DegreeIndexer implements StringObserver {
@Override
- public void init(Context context) {}
-
- @Override
- public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
-
- String row = rowBytes.toString();
-
+ public void process(TransactionBase tx, String row, Column col) throws Exception {
// get previously calculated degree
String degree = tx.gets(row, DEGREE);
@@ -84,14 +71,18 @@
tx.delete("IDEG" + degree, new Column("node", row));
}
}
+ }
+ public static class WorkerITObserverProvider implements ObserverProvider {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(observedColumn, NotificationType.STRONG);
+ public void provide(Registry or, Context ctx) {
+ or.register(observedColumn, STRONG, new DegreeIndexer());
}
+ }
- @Override
- public void close() {}
+ @Override
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return WorkerITObserverProvider.class;
}
@Test
@@ -152,15 +143,16 @@
public void testDiffObserverConfig() throws Exception {
observedColumn = new Column("attr2", "lastupdate");
try {
- try (Environment env = new Environment(config); Observers observers = new Observers(env)) {
- observers.getObserver(LAST_UPDATE);
+ try (Environment env = new Environment(config);
+ Observers op = env.getConfiguredObservers().getObservers(env)) {
+ op.getObserver(LAST_UPDATE);
}
Assert.fail();
- } catch (IllegalStateException ise) {
+ } catch (IllegalArgumentException ise) {
Assert.assertTrue(ise.getMessage().contains(
- "Mismatch between configured column and class column"));
+ "Column attr2 lastupdate not previously configured for strong notifications"));
} finally {
observedColumn = LAST_UPDATE;
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 02a4fd3..01f04e4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -18,7 +18,6 @@
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
@@ -32,14 +31,15 @@
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestUtil;
import org.apache.log4j.Level;
@@ -49,6 +49,8 @@
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
public class LogIT extends ITBaseMini {
private static final Column STAT_COUNT = new Column("stat", "count");
@@ -100,13 +102,7 @@
}
}
- public static class BinaryObserver extends AbstractObserver {
-
- @Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(bCol2, NotificationType.WEAK);
- }
-
+ public static class BinaryObserver implements Observer {
@Override
public void process(TransactionBase tx, Bytes row, Column col) {
tx.get(bRow1, bCol2);
@@ -115,23 +111,24 @@
}
}
- public static class TestObserver extends AbstractObserver {
-
+ public static class TestObserver implements StringObserver {
@Override
- public ObservedColumn getObservedColumn() {
- return new ObservedColumn(STAT_COUNT, NotificationType.WEAK);
+ public void process(TransactionBase tx, String row, Column col) {
+ TestUtil.increment(tx, "all", col, Integer.parseInt(tx.gets(row, col)));
}
+ }
+ public static class LogItObserverProvider implements ObserverProvider {
@Override
- public void process(TransactionBase tx, Bytes row, Column col) {
- TestUtil.increment(tx, "all", col, Integer.parseInt(tx.gets(row.toString(), col)));
+ public void provide(Registry or, Context ctx) {
+ or.registers(STAT_COUNT, WEAK, new TestObserver());
+ or.register(bCol2, WEAK, new BinaryObserver());
}
}
@Override
- protected List<ObserverSpecification> getObservers() {
- return Arrays.asList(new ObserverSpecification(TestObserver.class.getName()),
- new ObserverSpecification(BinaryObserver.class.getName()));
+ protected Class<? extends ObserverProvider> getObserverProviderClass() {
+ return LogItObserverProvider.class;
}
@Test
diff --git a/pom.xml b/pom.xml
index ef2eb78..9c0a694 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,11 @@
<version>1.32</version>
</dependency>
<dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.0</version>
+ </dependency>
+ <dependency>
<!-- Guava 13.0.1 is required by Twill (due to beta method usage) -->
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>