More clean up
- toString / hashCode / equals goodness
- unneeded doubled commgroup references removed
- some logging tweaks
diff --git a/src/main/java/org/apache/ode/jacob/ChannelRef.java b/src/main/java/org/apache/ode/jacob/ChannelRef.java
index 78fff21..1209bcb 100644
--- a/src/main/java/org/apache/ode/jacob/ChannelRef.java
+++ b/src/main/java/org/apache/ode/jacob/ChannelRef.java
@@ -76,5 +76,35 @@
return null;
}
-
+
+ @Override
+ public String toString() {
+ return "ChannelRef [target=" + target + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((target == null) ? 0 : target.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ChannelRef other = (ChannelRef) obj;
+ if (target == null) {
+ if (other.target != null)
+ return false;
+ } else if (!target.equals(other.target))
+ return false;
+ return true;
+ }
+
}
diff --git a/src/main/java/org/apache/ode/jacob/Message.java b/src/main/java/org/apache/ode/jacob/Message.java
index 5d62135..ea3e901 100644
--- a/src/main/java/org/apache/ode/jacob/Message.java
+++ b/src/main/java/org/apache/ode/jacob/Message.java
@@ -110,6 +110,26 @@
+ (headers != null ? "headers=" + headers + ", " : "")
+ (body != null ? "body=" + body : "") + "]";
}
+
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (id ^ (id >>> 32));
+ return result;
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Message other = (Message) obj;
+ if (id != other.id)
+ return false;
+ return true;
+ }
public static Message copyFrom(Message message) {
Message result = new Message();
diff --git a/src/main/java/org/apache/ode/jacob/soup/Comm.java b/src/main/java/org/apache/ode/jacob/soup/Comm.java
index ce842cb..d2a6a98 100644
--- a/src/main/java/org/apache/ode/jacob/soup/Comm.java
+++ b/src/main/java/org/apache/ode/jacob/soup/Comm.java
@@ -29,7 +29,6 @@
*/
public abstract class Comm extends ExecutionQueueObject {
private CommChannel _channel;
- private CommGroup _group;
protected Comm() {
}
@@ -42,27 +41,11 @@
return _channel;
}
- public void setChannel(CommChannel channel) {
- _channel = channel;
- }
-
- public CommGroup getGroup() {
- return _group;
- }
-
- public void setGroup(CommGroup group) {
- if (_group != null) {
- throw new IllegalStateException("Attempted to call setGroup() twice!");
- }
- _group = group;
- }
-
public String toString() {
// TODO: maybe find a better way to do a toString and replace ObjectPrinter
return new StringBuilder("{")
.append(this.getClass().getSimpleName())
.append(" chnl=").append(_channel)
- .append(", group=").append(_group)
.append(" }").toString();
}
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/CommGroup.java b/src/main/java/org/apache/ode/jacob/soup/CommGroup.java
index bd6c10a..de5e602 100644
--- a/src/main/java/org/apache/ode/jacob/soup/CommGroup.java
+++ b/src/main/java/org/apache/ode/jacob/soup/CommGroup.java
@@ -55,7 +55,6 @@
}
public void add(Comm comm) {
- comm.setGroup(this);
_comms.add(comm);
}
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index 8e4265b..d788afe 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -118,20 +118,20 @@
LOG.trace(">> add (channel={})", channel);
verifyNew(channel);
- ChannelFrame cframe = new ChannelFrame(channel.getType(), ++_objIdCounter, channel.getType().getName(), channel
- .getDescription());
+ ChannelFrame cframe = new ChannelFrame(channel.getType(), ++_objIdCounter,
+ channel.getDescription());
_channels.put(cframe.getId(), cframe);
assignId(channel, cframe.getId());
}
public void enqueueMessage(Message message) {
- LOG.trace(">> enqueueReaction (message={})", message);
+ LOG.trace(">> enqueueMessage (message={})", message);
_messages.add(message);
}
public Message dequeueMessage() {
- LOG.trace(">> dequeueReaction ()");
+ LOG.trace(">> dequeueMessage ()");
Message message = null;
if (!_messages.isEmpty()) {
@@ -156,8 +156,9 @@
throw new IllegalStateException("Send attempted on channel containing replicated send! Channel= "
+ comm.getChannel());
}
- if (group.isReplicated())
+ if (group.isReplicated()) {
chnlFrame.replicatedSend = true;
+ }
CommSend commSend = (CommSend) comm;
MessageFrame mframe = new MessageFrame(commGroupFrame, chnlFrame, commSend.getMessage());
@@ -169,10 +170,11 @@
throw new IllegalStateException(
"Receive attempted on channel containing replicated receive! Channel= " + comm.getChannel());
}
- if (group.isReplicated())
+ if (group.isReplicated()) {
chnlFrame.replicatedRecv = true;
+ }
CommRecv commRecv = (CommRecv) comm;
- ObjectFrame oframe = new ObjectFrame(commGroupFrame, chnlFrame, commRecv.getListener());
+ ListenerFrame oframe = new ListenerFrame(commGroupFrame, chnlFrame, commRecv.getListener());
commGroupFrame.commFrames.add(oframe);
chnlFrame.objFrames.add(oframe);
}
@@ -289,7 +291,7 @@
for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) {
ChannelFrame cframe = i.next();
sos.writeInt(cframe.objFrames.size());
- for (Iterator<ObjectFrame> j = cframe.objFrames.iterator(); j.hasNext();) {
+ for (Iterator<ListenerFrame> j = cframe.objFrames.iterator(); j.hasNext();) {
sos.writeObject(j.next());
}
sos.writeInt(cframe.msgFrames.size());
@@ -367,10 +369,10 @@
ChannelFrame cframe = _channels.get(channel.getId());
while (cframe != null && !cframe.msgFrames.isEmpty() && !cframe.objFrames.isEmpty()) {
MessageFrame mframe = cframe.msgFrames.iterator().next();
- ObjectFrame oframe = cframe.objFrames.iterator().next();
+ ListenerFrame oframe = cframe.objFrames.iterator().next();
Message msg = Message.copyFrom(mframe.message);
- msg.setTo(new org.apache.ode.jacob.ChannelRef(oframe._continuation));
+ msg.setTo(new org.apache.ode.jacob.ChannelRef(oframe.listener));
enqueueMessage(msg);
if (!mframe.commGroupFrame.replicated) {
@@ -407,7 +409,7 @@
// Add all channels reference in the group to the GC candidate set.
for (Iterator<CommFrame> i = groupFrame.commFrames.iterator(); i.hasNext();) {
CommFrame frame = i.next();
- if (frame instanceof ObjectFrame) {
+ if (frame instanceof ListenerFrame) {
assert frame.channelFrame.objFrames.contains(frame);
frame.channelFrame.objFrames.remove(frame);
} else {
@@ -438,7 +440,7 @@
boolean replicatedRecv;
- Set<ObjectFrame> objFrames = new LinkedHashSet<ObjectFrame>();
+ Set<ListenerFrame> objFrames = new LinkedHashSet<ListenerFrame>();
Set<MessageFrame> msgFrames = new LinkedHashSet<MessageFrame>();
@@ -448,7 +450,7 @@
public ChannelFrame() {
}
- public ChannelFrame(Class<?> type, int id, String name, String description) {
+ public ChannelFrame(Class<?> type, int id, String description) {
this.type = type;
this.id = id;
this.description = description;
@@ -462,7 +464,7 @@
return refCount;
}
- public Set<ObjectFrame> getObjFrames() {
+ public Set<ListenerFrame> getObjFrames() {
return objFrames;
}
@@ -479,7 +481,7 @@
replicatedRecv = in.readBoolean();
int cnt = in.readInt();
for (int i = 0; i < cnt; ++i) {
- objFrames.add((ObjectFrame) in.readObject());
+ objFrames.add((ListenerFrame) in.readObject());
}
cnt = in.readInt();
for (int i = 0; i < cnt; ++i) {
@@ -495,7 +497,7 @@
out.writeBoolean(replicatedSend);
out.writeBoolean(replicatedRecv);
out.writeInt(objFrames.size());
- for (Iterator<ObjectFrame> i = objFrames.iterator(); i.hasNext();) {
+ for (Iterator<ListenerFrame> i = objFrames.iterator(); i.hasNext();) {
out.writeObject(i.next());
}
out.writeInt(msgFrames.size());
@@ -564,28 +566,28 @@
}
}
- protected static class ObjectFrame extends CommFrame implements Externalizable {
+ protected static class ListenerFrame extends CommFrame implements Externalizable {
private static final long serialVersionUID = -7212430608484116919L;
- MessageListener _continuation;
+ MessageListener listener;
// Used for deserialization
- public ObjectFrame() {
+ public ListenerFrame() {
}
- public ObjectFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, MessageListener continuation) {
+ public ListenerFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, MessageListener listener) {
super(commGroupFrame, channelFrame);
- this._continuation = continuation;
+ this.listener = listener;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- _continuation = (ChannelListener)in.readObject();
+ listener = (ChannelListener)in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeObject(_continuation);
+ out.writeObject(listener);
}
}
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index 3b709e4..5b8a07d 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -196,18 +196,23 @@
return buf.toString();
}
- static String stringify(Object[] list) {
- if (list == null) {
+ static String stringify(Object obj) {
+ if (obj == null) {
return "";
}
- StringBuffer buf = new StringBuffer();
- for (int i = 0; i < list.length; ++i) {
- if (i > 0) {
- buf.append(',');
+
+ if (obj instanceof Object[]) {
+ StringBuffer buf = new StringBuffer();
+ for (int i = 0; i < ((Object[])obj).length; ++i) {
+ if (i > 0) {
+ buf.append(',');
+ }
+ buf.append(((Object[])obj)[i]);
}
- buf.append(list[i]);
+ return buf.toString();
+ } else {
+ return obj.toString();
}
- return buf.toString();
}
public void setClassLoader(ClassLoader classLoader) {
@@ -252,11 +257,6 @@
}
public Channel message(Channel channel, Method method, Object[] args) {
- LOG.trace(">> [{}] : {} ! {} ({})", _cycle, channel, method.getName(),
- LOG.isTraceEnabled() ? stringify(args) : null);
-
- _statistics.messagesSent++;
-
Channel replyChannel = null;
CommChannel replyCommChannel = null;
// Check for synchronous methods; create a synchronization channel
@@ -277,6 +277,11 @@
}
public void sendMessage(Message msg) {
+ LOG.trace(">> [{}] : {} ! {} ({})", _cycle, msg.getTo(), msg.getAction(),
+ LOG.isTraceEnabled() ? stringify(msg.getBody()) : null);
+
+ _statistics.messagesSent++;
+
CommGroup grp = new CommGroup(false);
CommSend send = new CommSend(msg);
grp.add(send);
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 0c6ef73..40f4440 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -25,6 +25,12 @@
import static org.apache.ode.jacob.Jacob.sendMessage;
import static org.apache.ode.jacob.Jacob.subscribe;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.ode.jacob.ChannelRef;
import org.apache.ode.jacob.JacobObject;
import org.apache.ode.jacob.Message;
@@ -321,14 +327,19 @@
@Override
public void run() {
simpleHelloWorld();
-// reliableHelloWorld();
-// sequencedHelloWorld();
+ reliableHelloWorld();
+ sequencedHelloWorld();
calculusHelloWorld();
}
public static void main(String args[]) throws Exception {
// enable logging
- //BasicConfigurator.configure();
+ // BasicConfigurator.configure();
+ List<Logger> loggers = Collections.<Logger>list(LogManager.getCurrentLoggers());
+ loggers.add(LogManager.getRootLogger());
+ for ( Logger logger : loggers ) {
+ logger.setLevel(Level.OFF);
+ }
SmileFactory sf = null;
// // enable smile:
@@ -360,7 +371,7 @@
public static JacksonExecutionQueueImpl loadAndRestoreQueue(ObjectMapper mapper, JacksonExecutionQueueImpl in) throws Exception {
byte[] json = mapper.writeValueAsBytes(in);
// print json
- System.out.println(new String(json));
+ // System.out.println(new String(json));
JacksonExecutionQueueImpl q2 = mapper.readValue(json, JacksonExecutionQueueImpl.class);
return q2;
}