fixes - #833 (#863) made internal column data immutable
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
index 754ca28..6f814b3 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
@@ -21,12 +21,12 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import com.google.common.collect.ImmutableMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.api.config.FluoConfiguration;
@@ -38,9 +38,9 @@
import org.apache.fluo.api.observer.Observer.NotificationType;
import org.apache.fluo.api.observer.Observer.ObservedColumn;
import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.observer.RegisteredObservers;
-import org.apache.fluo.core.observer.Observers;
import org.apache.fluo.core.observer.ObserverStore;
+import org.apache.fluo.core.observer.Observers;
+import org.apache.fluo.core.observer.RegisteredObservers;
import org.apache.fluo.core.util.ColumnUtil;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.hadoop.io.WritableUtils;
@@ -161,7 +161,8 @@
private static Map<Column, ObserverSpecification> readObservers(DataInputStream dis)
throws IOException {
- HashMap<Column, ObserverSpecification> omap = new HashMap<>();
+ ImmutableMap.Builder<Column, ObserverSpecification> omapBuilder =
+ new ImmutableMap.Builder<Column, ObserverSpecification>();
int num = WritableUtils.readVInt(dis);
for (int i = 0; i < num; i++) {
@@ -176,10 +177,9 @@
}
ObserverSpecification ospec = new ObserverSpecification(clazz, params);
- omap.put(col, ospec);
+ omapBuilder.put(col, ospec);
}
-
- return omap;
+ return omapBuilder.build();
}
@Override
@@ -197,8 +197,8 @@
}
DataInputStream dis = new DataInputStream(bais);
- observers = Collections.unmodifiableMap(readObservers(dis));
- weakObservers = Collections.unmodifiableMap(readObservers(dis));
+ observers = readObservers(dis);
+ weakObservers = readObservers(dis);
return new RegisteredObservers() {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
index 2e544cd..768271c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
@@ -16,7 +16,6 @@
package org.apache.fluo.core.observer.v2;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -24,6 +23,7 @@
import java.util.function.BiConsumer;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.api.config.FluoConfiguration;
@@ -32,9 +32,9 @@
import org.apache.fluo.api.observer.Observer.NotificationType;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.observer.RegisteredObservers;
-import org.apache.fluo.core.observer.Observers;
import org.apache.fluo.core.observer.ObserverStore;
+import org.apache.fluo.core.observer.Observers;
+import org.apache.fluo.core.observer.RegisteredObservers;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -46,6 +46,9 @@
*/
public class ObserverStoreV2 implements ObserverStore {
+ ImmutableSet<Column> weakColumns = ImmutableSet.of();
+ ImmutableSet<Column> strongColumns = ImmutableSet.of();
+
@Override
public boolean handles(FluoConfiguration config) {
return !config.getObserverProvider().isEmpty();
@@ -102,22 +105,25 @@
String json = new String(data, UTF_8);
JsonObservers jco = new Gson().fromJson(json, JsonObservers.class);
- Set<Column> weakColumns = new HashSet<>();
- Set<Column> strongColumns = new HashSet<>();
+ ImmutableSet.Builder<Column> weakColumnsBuilder = new ImmutableSet.Builder<Column>();
+ ImmutableSet.Builder<Column> strongColumnsBuilder = new ImmutableSet.Builder<Column>();
for (Entry<Column, NotificationType> entry : jco.getObservedColumns().entrySet()) {
switch (entry.getValue()) {
case STRONG:
- strongColumns.add(entry.getKey());
+ strongColumnsBuilder.add(entry.getKey());
break;
case WEAK:
- weakColumns.add(entry.getKey());
+ weakColumnsBuilder.add(entry.getKey());
break;
default:
throw new IllegalStateException("Unknown notification type " + entry.getValue());
}
}
+ strongColumns = strongColumnsBuilder.build();
+ weakColumns = weakColumnsBuilder.build();
+
return new RegisteredObservers() {
@Override