fixes #829 Changed API to allow giving observers an optional ID
diff --git a/docs/applications.md b/docs/applications.md
index cbf12ba..45e3412 100644
--- a/docs/applications.md
+++ b/docs/applications.md
@@ -118,24 +118,22 @@
```
2. Create a class that implements [ObserverProvider] like the example below. The purpose of this
class is associate a set Observers with columns that trigger the observers. The class can
- create multiple observers.
+ register multiple observers.
```java
class AppObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
//setup InvertObserver to be triggered when the column obs:data is modified
- or.register(new Column("obs", "data"),
- NotificationType.STRONG,
- new InvertObserver());
+ or.forColumn(new Column("obs", "data"), NotificationType.STRONG)
+ .useObserver(new InvertObserver());
//Observer is a Functional interface. So Obsevers can be written as lambdas.
- or.register(new Column("new","data"),
- NotificationType.WEAK,
- (tx,row,col) -> {
- Bytes combined = combineNewAndOld(tx,row);
- tx.set(row, new Column("current","data"), combined);
- });
+ or.forColumn(new Column("new","data"), NotificationType.WEAK)
+ .useObserver((tx,row,col) -> {
+ Bytes combined = combineNewAndOld(tx,row);
+ tx.set(row, new Column("current","data"), combined);
+ });
}
}
```
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java b/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java
index 903e2f6..b4512b9 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java
@@ -24,6 +24,7 @@
*
* @since 1.0.0
*/
+@FunctionalInterface
public interface Loader {
/**
@@ -37,7 +38,7 @@
SimpleConfiguration getAppConfiguration();
/**
- * @return A {@link MetricsReporter} to report application metrics from this observer
+ * @return A {@link MetricsReporter} to report application metrics from this loader
*/
MetricsReporter getMetricsReporter();
}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
index c06e072..a593d46 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
@@ -30,6 +30,15 @@
void execute(Loader loader);
/**
+ * Same as {@link #execute(Loader)}, but allows specifing an identity. The identity is used in
+ * metrics and trace logging. When an identity is not supplied, the class name is used. In the
+ * case of lambdas the class name may not be the same in different processes.
+ *
+ * @since 1.1.0
+ */
+ void execute(String identity, Loader loader);
+
+ /**
* Waits for all queued and running Loader task to complete, then cleans up resources.
*/
@Override
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/ColumnProviderRegistry.java b/modules/api/src/main/java/org/apache/fluo/api/observer/ColumnProviderRegistry.java
new file mode 100644
index 0000000..2c2f772
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/ColumnProviderRegistry.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.observer;
+
+import java.util.function.BiConsumer;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider.Registry.ObserverArgument;
+
+// Intentionally package private
+class ColumnProviderRegistry implements ObserverProvider.Registry.ObserverArgument,
+ ObserverProvider.Registry.IdentityOption {
+
+ private BiConsumer<Column, NotificationType> colRegistry;
+ private NotificationType nt;
+ private Column col;
+
+ ColumnProviderRegistry(Column col, NotificationType nt,
+ BiConsumer<Column, NotificationType> colRegistry) {
+ this.col = col;
+ this.nt = nt;
+ this.colRegistry = colRegistry;
+ }
+
+ @Override
+ public ObserverArgument withId(String alias) {
+ return this;
+ }
+
+ @Override
+ public void useObserver(Observer observer) {
+ colRegistry.accept(col, nt);
+ }
+
+ @Override
+ public void useStrObserver(StringObserver observer) {
+ useObserver(observer);
+ }
+
+}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
index c172268..a4b42bb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
@@ -64,23 +64,59 @@
* @since 1.1.0
*/
interface Registry {
- void register(Column observedColumn, NotificationType ntfyType, Observer observer);
/**
- * This method was created to allow Observers written as lambda to be passed {@link String}
- * instead of {@link Bytes} for the row.
+ * The terminal part of a Fluent API for registering observers.
*
- * <pre>
- * <code>
- * void provide(ObserverRegistry or, Context ctx) {
- * or.registers(someColumn, WEAK, (tx,row,col) -> {
- * //row is of type String
- * };
- * }
- * </code>
- * </pre>
+ * @since 1.1.0
*/
- void registers(Column observedColumn, NotificationType ntfyType, StringObserver observer);
+ interface ObserverArgument {
+
+ /**
+ * Calling this method registers the given observer using the parameters previously passed to
+ * the Fluent API.
+ *
+ */
+ void useObserver(Observer observer);
+
+ /**
+ * Calling this method registers the given observer using the parameters previously passed to
+ * the Fluent API.
+ *
+ * <p>
+ * This method was created to allow Observers written as lambda to be passed {@link String}
+ * instead of {@link Bytes} for the row.
+ *
+ * <pre>
+ * <code>
+ * void provide(ObserverRegistry or, Context ctx) {
+ * or.forColumn(someColumn, WEAK).useStrObserver((tx,row,col) -> {
+ * //row is of type String
+ * };
+ * }
+ * </code>
+ * </pre>
+ */
+ void useStrObserver(StringObserver observer);
+ }
+
+ /**
+ * One part of a Fluent API for registering observers.
+ *
+ * @since 1.1.0
+ */
+ interface IdentityOption extends ObserverArgument {
+ /**
+ * Optionally set the name used to identify the observer in logging and metrics. If not set,
+ * the column name is used.
+ */
+ ObserverArgument withId(String identity);
+ }
+
+ /**
+ * A fluent entry point for registering an observer.
+ */
+ IdentityOption forColumn(Column observedColumn, NotificationType ntfyType);
}
/**
@@ -103,13 +139,8 @@
default void provideColumns(BiConsumer<Column, NotificationType> colRegistry, Context ctx) {
Registry or = new Registry() {
@Override
- public void registers(Column oc, NotificationType nt, StringObserver obs) {
- colRegistry.accept(oc, nt);
- }
-
- @Override
- public void register(Column oc, NotificationType nt, Observer obs) {
- colRegistry.accept(oc, nt);
+ public IdentityOption forColumn(Column observedColumn, NotificationType ntfyType) {
+ return new ColumnProviderRegistry(observedColumn, ntfyType, colRegistry);
}
};
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
index 403fb66..1b589cf 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
@@ -60,13 +60,13 @@
private final AsyncCommitObserver aco;
private final int size;
private final AtomicBoolean finished = new AtomicBoolean(false);
- private final Class<?> txExecClass;
+ private final String alias;
private void finish(TxResult status) {
if (finished.compareAndSet(false, true)) {
commitingTransactions.decrementAndGet();
tx.getStats().setCommitFinishTime(System.currentTimeMillis());
- tx.getStats().report(status.toString(), txExecClass);
+ tx.getStats().report(status.toString(), alias);
memoryLimit.release(size);
try {
tx.close();
@@ -76,12 +76,11 @@
}
}
- public CQCommitObserver(AsyncTransaction tx, AsyncCommitObserver aco, Class<?> txExecClass,
- int size) {
+ public CQCommitObserver(AsyncTransaction tx, AsyncCommitObserver aco, String alias, int size) {
this.tx = tx;
this.aco = aco;
this.size = size;
- this.txExecClass = txExecClass;
+ this.alias = alias;
}
@Override
@@ -123,15 +122,15 @@
}
- public void beginCommit(AsyncTransaction tx, Class<?> txExecClass, AsyncCommitObserver aco) {
+ public void beginCommit(AsyncTransaction tx, String alias, AsyncCommitObserver aco) {
Objects.requireNonNull(tx);
- Objects.requireNonNull(txExecClass);
+ Objects.requireNonNull(alias);
Objects.requireNonNull(aco);
int size = tx.getSize();
memoryLimit.acquire(size);
commitingTransactions.incrementAndGet();
- CQCommitObserver myAco = new CQCommitObserver(tx, aco, txExecClass, size);
+ CQCommitObserver myAco = new CQCommitObserver(tx, aco, alias, size);
tx.getStats().setCommitBeginTime(System.currentTimeMillis());
tx.commitAsync(myAco);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index ad36fd2..916dcca 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -61,6 +61,7 @@
AsyncTransaction txi;
Loader loader;
private AtomicBoolean done = new AtomicBoolean(false);
+ private String identity;
private void close() {
txi = null;
@@ -75,7 +76,8 @@
}
- public LoaderCommitObserver(Loader loader2) {
+ public LoaderCommitObserver(String alias, Loader loader2) {
+ this.identity = alias;
this.loader = loader2;
}
@@ -111,7 +113,7 @@
txi = new TransactionImpl(env);
if (TracingTransaction.isTracingEnabled()) {
- txi = new TracingTransaction(txi, loader.getClass());
+ txi = new TracingTransaction(txi, loader.getClass(), identity);
}
Loader.Context context = new Loader.Context() {
@@ -128,7 +130,7 @@
try {
loader.load(txi, context);
- env.getSharedResources().getCommitManager().beginCommit(txi, loader.getClass(), this);
+ env.getSharedResources().getCommitManager().beginCommit(txi, identity, this);
} catch (Exception e) {
setException(e);
close();
@@ -183,6 +185,11 @@
@Override
public void execute(Loader loader) {
+ execute(loader.getClass().getSimpleName(), loader);
+ }
+
+ @Override
+ public void execute(String alias, Loader loader) {
if (exceptionRef.get() != null) {
throw new RuntimeException("Previous failure", exceptionRef.get());
}
@@ -199,7 +206,7 @@
try {
commiting.increment();
- executor.execute(new QueueReleaseRunnable(new LoaderCommitObserver(loader)));
+ executor.execute(new QueueReleaseRunnable(new LoaderCommitObserver(alias, loader)));
} catch (RejectedExecutionException rje) {
semaphore.release();
commiting.decrement();
@@ -235,5 +242,4 @@
env.getSharedResources().getBatchWriter().waitForAsyncFlush();
}
}
-
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
index f1ae480..030014c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
@@ -142,29 +142,28 @@
timedOutLocks += amt;
}
- public void report(String status, Class<?> execClass) {
+ public void report(String status, String alias) {
MetricNames names = env.getMetricNames();
MetricRegistry registry = env.getSharedResources().getMetricRegistry();
- String sn = execClass.getSimpleName();
if (getLockWaitTime() > 0) {
- MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxLockWaitTime(sn)).update(
- getLockWaitTime(), TimeUnit.MILLISECONDS);
+ MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxLockWaitTime(alias))
+ .update(getLockWaitTime(), TimeUnit.MILLISECONDS);
}
- MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxExecTime(sn)).update(
+ MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxExecTime(alias)).update(
getReadTime(), TimeUnit.MILLISECONDS);
if (getCollisions() > 0) {
- registry.meter(names.getTxWithCollision(sn)).mark();
- registry.meter(names.getTxCollisions(sn)).mark(getCollisions());
+ registry.meter(names.getTxWithCollision(alias)).mark();
+ registry.meter(names.getTxCollisions(alias)).mark(getCollisions());
}
- registry.meter(names.getTxEntriesSet(sn)).mark(getEntriesSet());
- registry.meter(names.getTxEntriesRead(sn)).mark(getEntriesReturned());
+ registry.meter(names.getTxEntriesSet(alias)).mark(getEntriesSet());
+ registry.meter(names.getTxEntriesRead(alias)).mark(getEntriesReturned());
if (getTimedOutLocks() > 0) {
- registry.meter(names.getTxLocksTimedout(sn)).mark(getTimedOutLocks());
+ registry.meter(names.getTxLocksTimedout(alias)).mark(getTimedOutLocks());
}
if (getDeadLocks() > 0) {
- registry.meter(names.getTxLocksDead(sn)).mark(getDeadLocks());
+ registry.meter(names.getTxLocksDead(alias)).mark(getDeadLocks());
}
- registry.meter(names.getTxStatus(status.toLowerCase(), sn)).mark();
+ registry.meter(names.getTxStatus(status.toLowerCase(), alias)).mark();
}
public void setCommitBeginTime(long t) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index 646113f..62f61a1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -61,11 +61,11 @@
}
public TracingTransaction(AsyncTransaction tx) {
- this(tx, null, null);
+ this(tx, null, null, null);
}
- public TracingTransaction(AsyncTransaction tx, Class<?> clazz) {
- this(tx, null, clazz);
+ public TracingTransaction(AsyncTransaction tx, Class<?> clazz, String txExecId) {
+ this(tx, null, clazz, txExecId);
}
private String encB(Collection<Bytes> columns) {
@@ -96,7 +96,8 @@
+ "=" + enc(e.getValue())));
}
- public TracingTransaction(AsyncTransaction tx, Notification notification, Class<?> clazz) {
+ public TracingTransaction(AsyncTransaction tx, Notification notification, Class<?> clazz,
+ String txExecId) {
this.tx = tx;
this.txid = tx.getStartTimestamp();
@@ -114,6 +115,10 @@
if (clazz != null) {
log.trace("txid: {} class: {}", txid, clazz.getName());
}
+
+ if (txExecId != null) {
+ log.trace("txid: {} identity: {}", txid, txExecId);
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
index 0e2f94a..b311282 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
@@ -90,6 +90,11 @@
public Observer getObserver(Column col) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public String getObserverId(Column col) {
+ throw new UnsupportedOperationException();
+ }
};
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
index d4cc366..09576f0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
@@ -19,6 +19,8 @@
import org.apache.fluo.api.observer.Observer;
public interface Observers extends AutoCloseable {
+ String getObserverId(Column col);
+
Observer getObserver(Column col);
void returnObserver(Observer o);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
index 65c1b2f..f75377e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
@@ -19,7 +19,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.Observer;
@@ -37,6 +39,7 @@
Map<Column, List<Observer>> observers = new HashMap<>();
Map<Column, ObserverSpecification> strongObservers;
Map<Column, ObserverSpecification> weakObservers;
+ Map<Column, String> aliases;
private List<Observer> getObserverList(Column col) {
List<Observer> observerList;
@@ -55,6 +58,19 @@
this.env = env;
this.strongObservers = strongObservers;
this.weakObservers = weakObservers;
+ this.aliases = new HashMap<>();
+
+ for (Entry<Column, ObserverSpecification> e : Iterables.concat(strongObservers.entrySet(),
+ weakObservers.entrySet())) {
+ ObserverSpecification observerConfig = e.getValue();
+ try {
+ String alias =
+ Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).getSimpleName();
+ aliases.put(e.getKey(), alias);
+ } catch (ClassNotFoundException e1) {
+ throw new RuntimeException(e1);
+ }
+ }
}
public Observer getObserver(Column col) {
@@ -127,4 +143,8 @@
observers = null;
}
+ @Override
+ public String getObserverId(Column col) {
+ return aliases.get(col);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
new file mode 100644
index 0000000..979b867
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
+
+public class ObserverRegistry implements ObserverProvider.Registry {
+
+ private static final Logger log = LoggerFactory.getLogger(ObserverRegistry.class);
+
+ Map<Column, Observer> observers;
+ Map<Column, String> aliases;
+ private Set<Column> strongColumns;
+ private Set<Column> weakColumns;
+
+ private class FluentRegistration implements ObserverProvider.Registry.IdentityOption,
+ ObserverProvider.Registry.ObserverArgument {
+
+ private Column col;
+ private NotificationType ntfyType;
+ private String alias;
+
+ FluentRegistration(Column col, NotificationType ntfyType) {
+ this.col = col;
+ this.ntfyType = ntfyType;
+ }
+
+ @Override
+ public void useObserver(Observer observer) {
+ register(col, ntfyType, alias, observer);
+ }
+
+ @Override
+ public void useStrObserver(StringObserver observer) {
+ register(col, ntfyType, alias, observer);
+ }
+
+ @Override
+ public ObserverArgument withId(String alias) {
+ this.alias = alias;
+ return this;
+ }
+ }
+
+ ObserverRegistry(Set<Column> strongColumns, Set<Column> weakColumns) {
+ this.observers = new HashMap<>();
+ this.aliases = new HashMap<>();
+ this.strongColumns = strongColumns;
+ this.weakColumns = weakColumns;
+ }
+
+ @Override
+ public IdentityOption forColumn(Column observedColumn, NotificationType ntfyType) {
+ return new FluentRegistration(observedColumn, ntfyType);
+ }
+
+ private void register(Column col, NotificationType nt, String alias, Observer obs) {
+ try {
+ Method closeMethod = obs.getClass().getMethod("close");
+ if (!closeMethod.getDeclaringClass().equals(Observer.class)) {
+ log.warn(
+ "Observer {} implements close(). Close is not called on Observers registered using ObserverProvider."
+ + " Close is only called on Observers configured the old way.", obs.getClass()
+ .getName());
+ }
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException("Failed to check if close() is implemented", e);
+ }
+
+ if (nt == NotificationType.STRONG && !strongColumns.contains(col)) {
+ throw new IllegalArgumentException("Column " + col
+ + " not previously configured for strong notifications");
+ }
+
+ if (nt == NotificationType.WEAK && !weakColumns.contains(col)) {
+ throw new IllegalArgumentException("Column " + col
+ + " not previously configured for weak notifications");
+ }
+
+ if (observers.containsKey(col)) {
+ throw new IllegalArgumentException("Duplicate observed column " + col);
+ }
+
+ observers.put(col, obs);
+ aliases.put(col, alias);
+ }
+
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
index d9d4a97..22b4fab 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
@@ -15,8 +15,6 @@
package org.apache.fluo.core.observer.v2;
-import java.lang.reflect.Method;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -25,71 +23,31 @@
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.api.observer.Observer.NotificationType;
import org.apache.fluo.api.observer.ObserverProvider;
-import org.apache.fluo.api.observer.ObserverProvider.Registry;
-import org.apache.fluo.api.observer.StringObserver;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.observer.Observers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.fluo.core.util.Hex;
class ObserversV2 implements Observers {
- private static final Logger log = LoggerFactory.getLogger(ObserversV2.class);
-
Map<Column, Observer> observers;
+ Map<Column, String> aliases;
public ObserversV2(Environment env, JsonObservers jco, Set<Column> strongColumns,
Set<Column> weakColumns) {
- observers = new HashMap<>();
ObserverProvider obsProvider =
ObserverStoreV2.newObserverProvider(jco.getObserverProviderClass());
ObserverProviderContextImpl ctx = new ObserverProviderContextImpl(env);
- Registry or = new Registry() {
-
- @Override
- public void register(Column col, NotificationType nt, Observer obs) {
- try {
- Method closeMethod = obs.getClass().getMethod("close");
- if (!closeMethod.getDeclaringClass().equals(Observer.class)) {
- log.warn(
- "Observer {} implements close(). Close is not called on Observers created using ObserverProvider."
- + " Close is only called on Observers configured the old way.", obs.getClass()
- .getName());
- }
- } catch (NoSuchMethodException | SecurityException e) {
- throw new RuntimeException("Failed to check if close() is implemented", e);
- }
-
- if (nt == NotificationType.STRONG && !strongColumns.contains(col)) {
- throw new IllegalArgumentException("Column " + col
- + " not previously configured for strong notifications");
- }
-
- if (nt == NotificationType.WEAK && !weakColumns.contains(col)) {
- throw new IllegalArgumentException("Column " + col
- + " not previously configured for weak notifications");
- }
-
- if (observers.containsKey(col)) {
- throw new IllegalArgumentException("Duplicate observed column " + col);
- }
-
- observers.put(col, obs);
- }
-
- @Override
- public void registers(Column col, NotificationType nt, StringObserver obs) {
- register(col, nt, obs);
- }
- };
-
+ ObserverRegistry or = new ObserverRegistry(strongColumns, weakColumns);
obsProvider.provide(or, ctx);
+ this.observers = or.observers;
+ this.aliases = or.aliases;
+ this.observers.forEach((k, v) -> aliases.computeIfAbsent(k, col -> Hex.encNonAscii(col, ":")));
+
// the following check ensures observers are provided for all previously configured columns
SetView<Column> diff =
Sets.difference(observers.keySet(), Sets.union(strongColumns, weakColumns));
@@ -109,4 +67,9 @@
@Override
public void close() {}
+
+ @Override
+ public String getObserverId(Column col) {
+ return aliases.get(col);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java b/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
index d2c1bd7..fefa2a5 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
@@ -56,8 +56,12 @@
}
public static String encNonAscii(Column col) {
+ return encNonAscii(col, " ");
+ }
+
+ public static String encNonAscii(Column col, String sep) {
StringBuilder sb = new StringBuilder();
- encNonAscii(sb, col, " ");
+ encNonAscii(sb, col, sep);
return sb.toString();
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index ff582ed..76c6c66 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -77,17 +77,18 @@
@Override
public void run() {
Observer observer = observers.getObserver(notification.getColumn());
+ String observerId = observers.getObserverId(notification.getColumn());
try {
AsyncTransaction atx = new TransactionImpl(env, notification);
if (TracingTransaction.isTracingEnabled()) {
- atx = new TracingTransaction(atx, notification, observer.getClass());
+ atx = new TracingTransaction(atx, notification, observer.getClass(), observerId);
}
observer.process(atx, notification.getRow(), notification.getColumn());
CommitManager commitManager = env.getSharedResources().getCommitManager();
- commitManager.beginCommit(atx, observer.getClass(), new WorkTaskCommitObserver());
+ commitManager.beginCommit(atx, observerId, new WorkTaskCommitObserver());
} catch (Exception e) {
log.error("Failed to process work " + Hex.encNonAscii(notification), e);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
index 7c48cc3..809eba7 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
@@ -99,7 +99,7 @@
public void provide(Registry or, Context ctx) {
int limit = ctx.getAppConfiguration().getInt("myapp.sizeLimit");
- or.registers(DF_COL, STRONG, (tx, row, col) -> {
+ or.forColumn(DF_COL, STRONG).useStrObserver((tx, row, col) -> {
int d = Integer.parseInt(tx.gets(row, col));
if (2 * d < limit) {
tx.set(row.toString(), DB_COL, Integer.toString(2 * d));
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
index f927102..e8e7aac 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -80,7 +80,7 @@
public static class CollisionObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.registers(STAT_CHANGED, NotificationType.WEAK, (tx, row, col) -> {
+ or.forColumn(STAT_CHANGED, NotificationType.WEAK).useStrObserver((tx, row, col) -> {
int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
index d86f2ef..e18d72a 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -68,7 +68,7 @@
public static class FailuresObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.register(new Column("attr", "lastupdate"), STRONG, new NullObserver());
+ or.forColumn(new Column("attr", "lastupdate"), STRONG).useObserver(new NullObserver());
}
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index 288bed9..0265d0e 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -53,7 +53,7 @@
public static class FluoITObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.register(BALANCE, NotificationType.STRONG, (tx, row, col) -> {
+ or.forColumn(BALANCE, NotificationType.STRONG).useObserver((tx, row, col) -> {
Assert.fail();
});
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
index 5381952..345bde2 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
@@ -70,7 +70,7 @@
public static class SelfNtfyObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.register(EXPORT_COUNT_COL, STRONG, new ExportingObserver());
+ or.forColumn(EXPORT_COUNT_COL, STRONG).useObserver(new ExportingObserver());
}
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
index ce002cb..820f48c 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
@@ -37,7 +37,7 @@
public static class StrongNtfyObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.register(OC, STRONG, (tx, row, col) -> {
+ or.forColumn(OC, STRONG).useObserver((tx, row, col) -> {
Bytes v = tx.get(row, col);
tx.set(v, RC, row);
});
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
index 2bd4ce9..52803ef 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
@@ -64,7 +64,7 @@
public static class WeakNotificationITObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.register(STAT_CHECK, WEAK, new SimpleObserver());
+ or.forColumn(STAT_CHECK, WEAK).useObserver(new SimpleObserver());
}
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
index 9dcf6dd..0711c72 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
@@ -58,7 +58,7 @@
public static class WeakNtfyObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.registers(STAT_CHANGED, WEAK, TOTAL_OBSERVER);
+ or.forColumn(STAT_CHANGED, WEAK).useStrObserver(TOTAL_OBSERVER);
}
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index 66791b5..cd2584a 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -76,7 +76,7 @@
public static class WorkerITObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.register(observedColumn, STRONG, new DegreeIndexer());
+ or.forColumn(observedColumn, STRONG).useObserver(new DegreeIndexer());
}
}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 01f04e4..7e1e279 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -121,8 +121,8 @@
public static class LogItObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
- or.registers(STAT_COUNT, WEAK, new TestObserver());
- or.register(bCol2, WEAK, new BinaryObserver());
+ or.forColumn(STAT_COUNT, WEAK).useStrObserver(new TestObserver());
+ or.forColumn(bCol2, WEAK).useObserver(new BinaryObserver());
}
}