GOSSIP-89 - Refactor gossip examples; common code in base class.
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/ExampleCommon.java b/gossip-examples/src/main/java/org/apache/gossip/examples/ExampleCommon.java
deleted file mode 100644
index 279bff1..0000000
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/ExampleCommon.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.examples;
-
-public class ExampleCommon {
-
- private boolean clearTerminalScreen = true;
-
- /*
- * Look for -s in args. If there, suppress terminal-clear on write results Shift args for
- * positional args, if necessary
- */
- public String[] checkArgsForClearFlag(String[] args) {
- int pos = 0;
- for (int i = 0; i < args.length; i++) {
- if (args[i].equals("-s")) {
- clearTerminalScreen = false;
- } else {
- // in the case of the -s flag, shift args
- // down by one slot; this will end up with
- // a duplicate entry in the last position of args,
- // but this is ok, because it will be ignored
- args[pos++] = args[i];
- }
- }
- return args;
- }
-
- public void optionallyClearTerminal() {
- if (clearTerminalScreen) {
- System.out.print("\033[H\033[2J");
- System.out.flush();
- }
- }
-}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/RunStandardExamples.java b/gossip-examples/src/main/java/org/apache/gossip/examples/RunStandardExamples.java
new file mode 100644
index 0000000..21861bb
--- /dev/null
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/RunStandardExamples.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.io.IOException;
+
+public class RunStandardExamples {
+
+ private static boolean WILL_READ = true;
+
+ private static boolean WILL_NOT_READ = false;
+
+ public static void main(String[] args) {
+ if ((args.length < 1) || args[0].equals("-h") || args[0].equals("--help") || args.length < 2) {
+ System.out.print(usage());
+ return;
+ }
+ try {
+ int example = intFromString(args[0]);
+ int channel = intFromString(args[1]);
+ if ((example < 1) || (example > 4) || (channel < 0) || (channel > 2)) {
+ System.out.print(usage());
+ return;
+ }
+ runExaple(example, channel);
+ } catch (Exception e) {
+ System.out.print(usage());
+ }
+ }
+
+ private static void runExaple(int exampleNumber, int channel) throws IOException {
+ String[] args = stanardArgs(channel, new String[4]);
+ if (exampleNumber == 1) {
+ StandAloneNode example = new StandAloneNode(args);
+ example.exec(WILL_NOT_READ);
+ } else if (exampleNumber == 2) {
+ StandAloneNodeCrdtOrSet example = new StandAloneNodeCrdtOrSet(args);
+ example.exec(WILL_READ);
+ } else if (exampleNumber == 3) {
+ StandAlonePNCounter example = new StandAlonePNCounter(args);
+ example.exec(WILL_READ);
+ } else if (exampleNumber == 4) {
+ args = extendedArgs(channel, new String[6]);
+ StandAloneDatacenterAndRack example = new StandAloneDatacenterAndRack(args);
+ example.exec(WILL_READ);
+ }
+ }
+
+ private static String[] stanardArgs(int channel, String[] args) {
+ // see README.md for examples
+ args[0] = "udp://localhost:1000" + channel;
+ args[1] = "" + channel;
+ args[2] = "udp://localhost:10000";
+ args[3] = "0";
+ return args;
+ }
+
+ private static String[] extendedArgs(int channel, String[] args) {
+ args = stanardArgs(channel, args);
+ // see README.md for examples
+ if (channel == 0) {
+ args[4] = "1";
+ args[5] = "2";
+ }
+ if (channel == 1) {
+ args[4] = "1";
+ args[5] = "3";
+ }
+ if (channel == 2) {
+ args[4] = "2";
+ args[5] = "2";
+ }
+ return args;
+ }
+
+ private static int intFromString(String string) {
+ return Integer.parseInt(string);
+ }
+
+ private static String usage() {
+ return "Select and run (usually in a seperate terminal window) \n"
+ + "one of the the standard Examples,\n" + " 1. StandAloneNode\n"
+ + " 2. StandAloneNodeCrdtOrSet\n" + " 3. StandAlonePNCounter\n"
+ + " 4. StandAloneDatacenterAndRack\n" + "(See README.md in this modules)\n" + "\n"
+ + "Usage: mvn exec:java -Dexec.mainClass=org.apache.gossip.examples.RunStandardExamples -Dexec.args=\"s c\"\n"
+ + "where...\n" + " s - int - the example number from above\n"
+ + " c - int - the channel number: 0, 1, or 2\n";
+ }
+
+}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
index 1a2643c..2336e87 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
@@ -18,8 +18,8 @@
package org.apache.gossip.examples;
+import java.io.IOException;
import java.net.URI;
-import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -30,12 +30,20 @@
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
-public class StandAloneDatacenterAndRack {
+public class StandAloneDatacenterAndRack extends StandAloneExampleBase {
- private static ExampleCommon common = new ExampleCommon();
+ public static void main(String[] args) throws InterruptedException, IOException {
+ StandAloneDatacenterAndRack example = new StandAloneDatacenterAndRack(args);
+ boolean willRead = true;
+ example.exec(willRead);
+ }
- public static void main(String[] args) throws UnknownHostException, InterruptedException {
- args = common.checkArgsForClearFlag(args);
+ StandAloneDatacenterAndRack(String[] args) {
+ args = super.checkArgsForClearFlag(args);
+ initGossipManager(args);
+ }
+
+ void initGossipManager(String[] args) {
GossipSettings s = new GossipSettings();
s.setWindowSize(1000);
s.setGossipInterval(100);
@@ -48,20 +56,17 @@
props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]);
props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]);
GossipManager manager = GossipManagerBuilder.newBuilder().cluster("mycluster")
- .uri(URI.create(args[0]))
- .id(args[1])
- .gossipSettings(s)
+ .uri(URI.create(args[0])).id(args[1]).gossipSettings(s)
.gossipMembers(
Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
- .properties(props)
- .build();
+ .properties(props).build();
manager.init();
- while (true) {
- common.optionallyClearTerminal();
- System.out.println("Live: " + manager.getLiveMembers());
- System.out.println("Dead: " + manager.getDeadMembers());
- Thread.sleep(2000);
- }
+ setGossipService(manager);
+ }
+
+ @Override
+ void printValues(GossipManager gossipService) {
+ return;
}
}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneExampleBase.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneExampleBase.java
new file mode 100644
index 0000000..02c2ee7
--- /dev/null
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneExampleBase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.RemoteMember;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+
+abstract class StandAloneExampleBase {
+ private String lastInput = "{none}";
+
+ private boolean clearTerminalScreen = true;
+
+ private GossipManager gossipService = null;
+
+ abstract void printValues(GossipManager gossipService);
+
+ boolean processReadLoopInput(String line) {
+ return true;
+ }
+
+ void exec(boolean willRead) throws IOException {
+ gossipService.init();
+ startMonitorLoop(gossipService);
+ if (willRead) {
+ startBlockingReadLoop();
+ }
+ }
+
+ /*
+ * Look for -s in args. If there, suppress terminal-clear on write results: shift args for
+ * positional args, if necessary
+ */
+ String[] checkArgsForClearFlag(String[] args) {
+ int pos = 0;
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-s")) {
+ clearTerminalScreen = false;
+ } else {
+ // in the case of the -s flag, shift args
+ // down by one slot; this will end up with
+ // a duplicate entry in the last position of args,
+ // but this is ok, because it will be ignored
+ args[pos++] = args[i];
+ }
+ }
+ return args;
+ }
+
+ private void optionallyClearTerminal() {
+ if (clearTerminalScreen) {
+ System.out.print("\033[H\033[2J");
+ System.out.flush();
+ }
+ }
+
+ private void setLastInput(String input, boolean valid) {
+ lastInput = input;
+ if (!valid) {
+ lastInput += " (invalid)";
+ }
+ }
+
+ String getLastInput() {
+ return lastInput;
+ }
+
+ private void startMonitorLoop(GossipManager gossipService) {
+ new Thread(() -> {
+ while (true) {
+ optionallyClearTerminal();
+ printLiveMembers(gossipService);
+ printDeadMambers(gossipService);
+ printValues(gossipService);
+ try {
+ Thread.sleep(2000);
+ } catch (Exception ignore) {
+ }
+ }
+ }).start();
+ }
+
+ private void printLiveMembers(GossipManager gossipService) {
+ List<LocalMember> members = gossipService.getLiveMembers();
+ if (members.isEmpty()) {
+ System.out.println("Live: (none)");
+ return;
+ }
+ System.out.println("Live: " + members.get(0));
+ for (int i = 1; i < members.size(); i++) {
+ System.out.println(" : " + members.get(i));
+ }
+ }
+
+ private void printDeadMambers(GossipManager gossipService) {
+ List<LocalMember> members = gossipService.getDeadMembers();
+ if (members.isEmpty()) {
+ System.out.println("Dead: (none)");
+ return;
+ }
+ System.out.println("Dead: " + members.get(0));
+ for (int i = 1; i < members.size(); i++) {
+ System.out.println(" : " + members.get(i));
+ }
+ }
+
+ private void startBlockingReadLoop() throws IOException {
+ String line;
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
+ while ((line = br.readLine()) != null) {
+ System.out.println(line);
+ boolean valid = processReadLoopInput(line);
+ setLastInput(line, valid);
+ }
+ }
+ }
+
+ void initGossipManager(String[] args) {
+ GossipSettings s = new GossipSettings();
+ s.setWindowSize(1000);
+ s.setGossipInterval(100);
+ GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster("mycluster")
+ .uri(URI.create(args[0])).id(args[1])
+ .gossipMembers(Collections
+ .singletonList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
+ .gossipSettings(s).build();
+ setGossipService(gossipService);
+ }
+
+ void setGossipService(GossipManager gossipService) {
+ this.gossipService = gossipService;
+ }
+
+ GossipManager getGossipManager() {
+ return this.gossipService;
+ }
+
+}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNode.java
index 70c3e4d..953e784 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNode.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNode.java
@@ -17,37 +17,26 @@
*/
package org.apache.gossip.examples;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteMember;
+import java.io.IOException;
+
import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
-public class StandAloneNode {
+public class StandAloneNode extends StandAloneExampleBase {
- private static ExampleCommon common = new ExampleCommon();
+ private static boolean WILL_READ = false;
- public static void main(String[] args) throws UnknownHostException, InterruptedException {
- args = common.checkArgsForClearFlag(args);
- GossipSettings s = new GossipSettings();
- s.setWindowSize(1000);
- s.setGossipInterval(100);
- GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster("mycluster")
- .uri(URI.create(args[0]))
- .id(args[1])
- .gossipMembers(
- Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
- .gossipSettings(s)
- .build();
- gossipService.init();
- while (true) {
- common.optionallyClearTerminal();
- System.out.println("Live: " + gossipService.getLiveMembers());
- System.out.println("Dead: " + gossipService.getDeadMembers());
- Thread.sleep(2000);
- }
+ public static void main(String[] args) throws InterruptedException, IOException {
+ StandAloneNode example = new StandAloneNode(args);
+ example.exec(WILL_READ);
+ }
+
+ StandAloneNode(String[] args) {
+ args = super.checkArgsForClearFlag(args);
+ super.initGossipManager(args);
+ }
+
+ @Override
+ void printValues(GossipManager gossipService) {
}
}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
index 78c7782..a184bc4 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java
@@ -17,85 +17,89 @@
*/
package org.apache.gossip.examples;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.Arrays;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.RemoteMember;
+
import org.apache.gossip.crdt.GrowOnlyCounter;
import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.SharedDataMessage;
-public class StandAloneNodeCrdtOrSet {
+public class StandAloneNodeCrdtOrSet extends StandAloneExampleBase {
- private static ExampleCommon common = new ExampleCommon();
+ private static final String INDEX_KEY_FOR_SET = "abc";
+
+ private static final String INDEX_KEY_FOR_COUNTER = "def";
public static void main(String[] args) throws InterruptedException, IOException {
- args = common.checkArgsForClearFlag(args);
- GossipSettings s = new GossipSettings();
- s.setWindowSize(1000);
- s.setGossipInterval(100);
- GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster("mycluster")
- .uri(URI.create(args[0]))
- .id(args[1])
- .gossipMembers(
- Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
- .gossipSettings(s)
- .build();
- gossipService.init();
-
- new Thread(() -> {
- while (true) {
- common.optionallyClearTerminal();
- System.out.println("Live: " + gossipService.getLiveMembers());
- System.out.println("Dead: " + gossipService.getDeadMembers());
- System.out.println("---------- " + (gossipService.findCrdt("abc") == null ? ""
- : gossipService.findCrdt("abc").value()));
- System.out.println("********** " + gossipService.findCrdt("abc"));
- System.out.println("^^^^^^^^^^ " + (gossipService.findCrdt("def") == null ? ""
- : gossipService.findCrdt("def").value()));
- System.out.println("$$$$$$$$$$ " + gossipService.findCrdt("def"));
- try {
- Thread.sleep(2000);
- } catch (Exception e) {
- }
- }
- }).start();
-
- String line = null;
- try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
- while ((line = br.readLine()) != null) {
- System.out.println(line);
- char op = line.charAt(0);
- String val = line.substring(2);
- if (op == 'a') {
- addData(val, gossipService);
- } else if (op == 'r') {
- removeData(val, gossipService);
- } else if (op == 'g') {
- gcount(val, gossipService);
- }
- if (op == 'l') {
- listen(val, gossipService);
- }
- }
- }
+ StandAloneNodeCrdtOrSet example = new StandAloneNodeCrdtOrSet(args);
+ boolean willRead = true;
+ example.exec(willRead);
}
-
+
+ StandAloneNodeCrdtOrSet(String[] args) {
+ args = super.checkArgsForClearFlag(args);
+ super.initGossipManager(args);
+ }
+
+ void printValues(GossipManager gossipService) {
+ System.out.println("Last Input: " + getLastInput());
+ System.out.println("---------- Or Set " + (gossipService.findCrdt(INDEX_KEY_FOR_SET) == null
+ ? "" : gossipService.findCrdt(INDEX_KEY_FOR_SET).value()));
+ System.out.println("********** " + gossipService.findCrdt(INDEX_KEY_FOR_SET));
+ System.out.println(
+ "^^^^^^^^^^ Grow Only Counter" + (gossipService.findCrdt(INDEX_KEY_FOR_COUNTER) == null
+ ? "" : gossipService.findCrdt(INDEX_KEY_FOR_COUNTER).value()));
+ System.out.println("$$$$$$$$$$ " + gossipService.findCrdt(INDEX_KEY_FOR_COUNTER));
+ }
+
+ boolean processReadLoopInput(String line) {
+ boolean valid = true;
+ char op = line.charAt(0);
+ String val = line.substring(2);
+ if (op == 'a') {
+ addData(val, getGossipManager());
+ } else if (op == 'r') {
+ removeData(val, getGossipManager());
+ } else if (op == 'g') {
+ if (isNonNegativeNumber(val)) {
+ gcount(val, getGossipManager());
+ } else {
+ valid = false;
+ }
+ } else if (op == 'l') {
+ if ((val == INDEX_KEY_FOR_SET) || (val == INDEX_KEY_FOR_COUNTER)) {
+ listen(val, getGossipManager());
+ } else {
+ valid = false;
+ }
+ } else {
+ valid = false;
+ }
+ return valid;
+ }
+
+ private boolean isNonNegativeNumber(String val) {
+ long l = 0;
+ try {
+ Long n = Long.parseLong(val);
+ l = n.longValue();
+ } catch (Exception e) {
+ return false;
+ }
+ return (l >= 0);
+ }
+
private static void listen(String val, GossipManager gossipManager) {
gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
if (key.equals(val)) {
- System.out.println("Event Handler fired! " + oldValue + " " + newValue);
+ System.out.println(
+ "Event Handler fired for key = '" + key + "'! " + oldValue + " " + newValue);
}
});
}
-
+
private static void gcount(String val, GossipManager gossipManager) {
- GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def");
+ GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt(INDEX_KEY_FOR_COUNTER);
Long l = Long.valueOf(val);
if (c == null) {
c = new GrowOnlyCounter(new GrowOnlyCounter.Builder(gossipManager).increment((l)));
@@ -104,7 +108,7 @@
}
SharedDataMessage m = new SharedDataMessage();
m.setExpireAt(Long.MAX_VALUE);
- m.setKey("def");
+ m.setKey(INDEX_KEY_FOR_COUNTER);
m.setPayload(c);
m.setTimestamp(System.currentTimeMillis());
gossipManager.merge(m);
@@ -112,10 +116,10 @@
private static void removeData(String val, GossipManager gossipService) {
@SuppressWarnings("unchecked")
- OrSet<String> s = (OrSet<String>) gossipService.findCrdt("abc");
+ OrSet<String> s = (OrSet<String>) gossipService.findCrdt(INDEX_KEY_FOR_SET);
SharedDataMessage m = new SharedDataMessage();
m.setExpireAt(Long.MAX_VALUE);
- m.setKey("abc");
+ m.setKey(INDEX_KEY_FOR_SET);
m.setPayload(new OrSet<String>(s, new OrSet.Builder<String>().remove(val)));
m.setTimestamp(System.currentTimeMillis());
gossipService.merge(m);
@@ -124,7 +128,7 @@
private static void addData(String val, GossipManager gossipService) {
SharedDataMessage m = new SharedDataMessage();
m.setExpireAt(Long.MAX_VALUE);
- m.setKey("abc");
+ m.setKey(INDEX_KEY_FOR_SET);
m.setPayload(new OrSet<String>(val));
m.setTimestamp(System.currentTimeMillis());
gossipService.merge(m);
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
index b0015be..23e949b 100644
--- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
@@ -17,116 +17,55 @@
*/
package org.apache.gossip.examples;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.gossip.GossipSettings;
-import org.apache.gossip.LocalMember;
-import org.apache.gossip.RemoteMember;
import org.apache.gossip.crdt.PNCounter;
import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.SharedDataMessage;
-public class StandAlonePNCounter {
- private static ExampleCommon common = new ExampleCommon();
- private static String lastInput = "{None}";
+public class StandAlonePNCounter extends StandAloneExampleBase {
public static void main(String[] args) throws InterruptedException, IOException {
- args = common.checkArgsForClearFlag(args);
- GossipSettings s = new GossipSettings();
- s.setWindowSize(1000);
- s.setGossipInterval(100);
- GossipManager gossipService = GossipManagerBuilder
- .newBuilder()
- .cluster("mycluster")
- .uri(URI.create(args[0])).id(args[1])
- .gossipMembers(
- Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
- .gossipSettings(s)
- .build();
- gossipService.init();
-
- new Thread(() -> {
- while (true) {
- common.optionallyClearTerminal();
- printLiveMembers(gossipService);
- printDeadMambers(gossipService);
- printValues(gossipService);
- try {
- Thread.sleep(2000);
- } catch (Exception ignore) {
- }
- }
- }).start();
-
- String line = null;
- try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
- while ((line = br.readLine()) != null) {
- System.out.println(line);
- char op = line.charAt(0);
- char blank = line.charAt(1);
- String val = line.substring(2);
- Long l = null;
- boolean valid = true;
- try {
- l = Long.valueOf(val);
- } catch (NumberFormatException ex) {
- valid = false;
- }
- valid = valid &&
- (
- (blank == ' ') &&
- ((op == 'i') || (op == 'd'))
- );
- if (valid) {
- if (op == 'i') {
- increment(l, gossipService);
- } else if (op == 'd') {
- decrement(l, gossipService);
- }
- }
- setLastInput(line,valid);
- }
- }
+ StandAlonePNCounter example = new StandAlonePNCounter(args);
+ boolean willRead = true;
+ example.exec(willRead);
}
- private static void printValues(GossipManager gossipService) {
+ StandAlonePNCounter(String[] args) {
+ args = super.checkArgsForClearFlag(args);
+ super.initGossipManager(args);
+ }
+
+ void printValues(GossipManager gossipService) {
System.out.println("Last Input: " + getLastInput());
System.out.println("---------- " + (gossipService.findCrdt("myPNCounter") == null ? ""
: gossipService.findCrdt("myPNCounter").value()));
System.out.println("********** " + gossipService.findCrdt("myPNCounter"));
}
- private static void printDeadMambers(GossipManager gossipService) {
- List<LocalMember> members = gossipService.getDeadMembers();
- if (members.isEmpty()) {
- System.out.println("Dead: (none)");
- return;
+ boolean processReadLoopInput(String line) {
+ char op = line.charAt(0);
+ char blank = line.charAt(1);
+ String val = line.substring(2);
+ Long l = null;
+ boolean valid = true;
+ try {
+ l = Long.valueOf(val);
+ } catch (NumberFormatException ex) {
+ valid = false;
}
- System.out.println("Dead: " + members.get(0));
- for (int i = 1; i < members.size(); i++) {
- System.out.println(" : " + members.get(i));
+ valid = valid && ((blank == ' ') && ((op == 'i') || (op == 'd')));
+ if (valid) {
+ if (op == 'i') {
+ increment(l, getGossipManager());
+ } else if (op == 'd') {
+ decrement(l, getGossipManager());
+ }
}
+ return valid;
}
- private static void printLiveMembers(GossipManager gossipService) {
- List<LocalMember> members = gossipService.getLiveMembers();
- if (members.isEmpty()) {
- System.out.println("Live: (none)");
- return;
- }
- System.out.println("Live: " + members.get(0));
- for (int i = 1; i < members.size(); i++) {
- System.out.println(" : " + members.get(i));
- }
- }
-
- private static void increment(Long l, GossipManager gossipManager) {
+ void increment(Long l, GossipManager gossipManager) {
PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter");
if (c == null) {
c = new PNCounter(new PNCounter.Builder(gossipManager).increment((l)));
@@ -141,7 +80,7 @@
gossipManager.merge(m);
}
- private static void decrement(Long l, GossipManager gossipManager) {
+ void decrement(Long l, GossipManager gossipManager) {
PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter");
if (c == null) {
c = new PNCounter(new PNCounter.Builder(gossipManager).decrement((l)));
@@ -155,16 +94,5 @@
m.setTimestamp(System.currentTimeMillis());
gossipManager.merge(m);
}
-
- private static void setLastInput(String input, boolean valid) {
- lastInput = input;
- if (! valid) {
- lastInput += " (invalid)";
- }
- }
-
- private static String getLastInput() {
- return lastInput;
- }
}
\ No newline at end of file