Some refactorings towards a pi-message-oriented soup.
- Refactoring of the soup to remove Method/args and use Messages instead.
- Refactoring of ChannelRefs
- Jackson serialization fixed (which works but could use some more love to become less space-consuming)
diff --git a/src/main/java/org/apache/ode/jacob/ChannelRef.java b/src/main/java/org/apache/ode/jacob/ChannelRef.java
index ef755cc..57adcda 100644
--- a/src/main/java/org/apache/ode/jacob/ChannelRef.java
+++ b/src/main/java/org/apache/ode/jacob/ChannelRef.java
@@ -18,10 +18,42 @@
*/
package org.apache.ode.jacob;
+import java.io.Serializable;
+
+import org.apache.ode.jacob.oo.Channel;
+
+
/**
* TODO: Document...
*/
-public interface ChannelRef {
+public class ChannelRef implements Serializable {
+ public enum Type { JACOB_OBJECT, CHANNEL }
+
+ private static final long serialVersionUID = 1L;
+
+ private final Type type;
+ private final Object target;
+
+ public ChannelRef(Object target) {
+ type = target instanceof Channel ? Type.CHANNEL : Type.JACOB_OBJECT;
+ this.target = target;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getEndpoint(Class<T> clazz) {
+ if (type.equals(Type.JACOB_OBJECT) && JacobObject.class.isAssignableFrom(clazz)) {
+ return (T)target;
+ } else if (type.equals(Type.CHANNEL) && Channel.class.isAssignableFrom(clazz)) {
+ return (T)target;
+ }
+
+ return null;
+ }
+
}
diff --git a/src/main/java/org/apache/ode/jacob/Message.java b/src/main/java/org/apache/ode/jacob/Message.java
index 6eaa356..dc6ece6 100644
--- a/src/main/java/org/apache/ode/jacob/Message.java
+++ b/src/main/java/org/apache/ode/jacob/Message.java
@@ -18,6 +18,7 @@
*/
package org.apache.ode.jacob;
+import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,8 +28,10 @@
* TODO: should anything be final here? the class itself?
*/
-public class Message {
- private long id;
+public class Message implements Serializable {
+ private static final long serialVersionUID = -2118625760125445959L;
+
+ private long id;
private ChannelRef to;
private ChannelRef replyTo;
private String action;
@@ -94,4 +97,19 @@
public boolean containsHeader(String header) {
return headers.containsKey(header);
}
+
+ public static Message copyFrom(Message message) {
+ Message result = new Message();
+
+ //XXX: generate id
+ result.setId(0);
+
+ result.setAction(message.getAction());
+ result.setBody(message.getBody());
+ result.setHeaders(message.getHeaders());
+ result.setTo(message.getTo());
+ result.setReplyTo(message.getReplyTo());
+
+ return result;
+ }
}
diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java b/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java
deleted file mode 100644
index 72b2bb8..0000000
--- a/src/main/java/org/apache/ode/jacob/oo/ChannelChannelRef.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ode.jacob.oo;
-
-import org.apache.ode.jacob.ChannelRef;
-
-
-/**
- * TODO: Document...
- */
-
-public class ChannelChannelRef implements ChannelRef {
- private final Channel ref;
-
- public ChannelChannelRef(Channel channel) {
- ref = channel;
- }
-
- public Channel getRef() {
- return ref;
- }
-
-}
diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
index 511ca73..bad88fa 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
@@ -57,8 +57,17 @@
public static Message createMessage(JacobObject target, String action, Object[] args, Channel replyTo) {
Message message = new Message();
- message.setTo(new JacobObjectChannelRef(target));
- message.setReplyTo(replyTo == null ? null : new ChannelChannelRef(replyTo));
+ message.setTo(new ChannelRef(target));
+ message.setReplyTo(replyTo == null ? null : new ChannelRef(replyTo));
+ message.setAction(action);
+ message.setBody(args);
+ return message;
+ }
+
+ public static Message createMessage(Channel target, String action, Object[] args, Channel replyTo) {
+ Message message = new Message();
+ message.setTo(new ChannelRef(target));
+ message.setReplyTo(replyTo == null ? null : new ChannelRef(replyTo));
message.setAction(action);
message.setBody(args);
return message;
@@ -130,24 +139,6 @@
};
}
- public static Expression target() {
- return new Expression() {
- @SuppressWarnings("unchecked")
- public <T> T evaluate(Message message, Class<T> type) {
- ChannelRef ref = message.getTo();
- if (JacobObject.class.equals(type) && ref instanceof JacobObjectChannelRef) {
- return (T)((JacobObjectChannelRef)ref).getRef();
- } else if (Channel.class.equals(type) && ref instanceof ChannelChannelRef) {
- return (T)((ChannelChannelRef)ref).getRef();
- }
- return null;
- }
- };
- }
-
- public static JacobObject getMessageClosure(Message message) {
- return target().evaluate(message, JacobObject.class);
- }
public static Set<Method> getImplementedMethods(Set<Method> methods, Class<?> clazz) {
// TODO: this can be optimized (some 20 times faster in my tests) by keeping a private
diff --git a/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java b/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java
deleted file mode 100644
index a7b7ee7..0000000
--- a/src/main/java/org/apache/ode/jacob/oo/JacobObjectChannelRef.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ode.jacob.oo;
-
-import org.apache.ode.jacob.ChannelRef;
-import org.apache.ode.jacob.JacobObject;
-
-
-/**
- * TODO: Document...
- */
-
-public class JacobObjectChannelRef implements ChannelRef {
- private final JacobObject ref;
-
- public JacobObjectChannelRef(JacobObject channel) {
- ref = channel;
- }
-
- public JacobObject getRef() {
- return ref;
- }
-
-}
diff --git a/src/main/java/org/apache/ode/jacob/soup/CommSend.java b/src/main/java/org/apache/ode/jacob/soup/CommSend.java
index 182b18a..cf3aa86 100644
--- a/src/main/java/org/apache/ode/jacob/soup/CommSend.java
+++ b/src/main/java/org/apache/ode/jacob/soup/CommSend.java
@@ -18,9 +18,7 @@
*/
package org.apache.ode.jacob.soup;
-import java.lang.reflect.Method;
-
-import org.apache.ode.jacob.oo.Channel;
+import org.apache.ode.jacob.Message;
/**
* Persistent store representation of a message (i.e. method application /
@@ -33,39 +31,24 @@
*/
public class CommSend extends Comm {
- private Method _method;
- private Object[] _args;
- private Channel replyChannel;
+ private Message msg;
protected CommSend() {
}
- public CommSend(CommChannel chnl, Method method, Object[] args, Channel replyChannel) {
+ public CommSend(CommChannel chnl, Message msg) {
super(null, chnl);
- _method = method;
- _args = args;
- this.replyChannel = replyChannel;
+ this.msg = msg;
}
- public Method getMethod() {
- return _method;
+ public Message getMessage() {
+ return msg;
}
- public Object[] getArgs() {
- return _args;
- }
-
- public Channel getReplyChannel() {
- return replyChannel;
- }
-
public String toString() {
StringBuffer buf = new StringBuffer(getChannel().toString());
- buf.append(" ! ").append(_method.getName()).append('(');
- for (int i = 0; _args != null && i < _args.length; ++i) {
- if (i != 0) buf.append(',');
- buf.append(_args[i]);
- }
+ buf.append(" ! ").append(msg.getAction()).append('(');
+ buf.append(msg.getBody());
buf.append(')');
return buf.toString();
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java
index 9fafc45..9f423cb 100644
--- a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java
+++ b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java
@@ -44,9 +44,9 @@
* broth the "original" continuation.
* @param message the {@link Message} to add to the broth
*/
- public void enqueueReaction(Message message);
+ public void enqueueMessage(Message message);
- public Message dequeueReaction();
+ public Message dequeueMessage();
public void add(CommChannel channel);
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefDeserializer.java
new file mode 100644
index 0000000..1f21d59
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefDeserializer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.io.IOException;
+
+import org.apache.ode.jacob.ChannelRef;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+/**
+ * Jackson deserializer for Message objects.
+ *
+ * @author Tammo van Lessen
+ *
+ */
+public class ChannelRefDeserializer extends StdDeserializer<ChannelRef> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected ChannelRefDeserializer() {
+ super(ChannelRef.class);
+ }
+
+ @Override
+ public ChannelRef deserialize(JsonParser jp,
+ DeserializationContext ctxt) throws IOException,
+ JsonProcessingException {
+
+ Object target = null;
+
+ while (jp.nextToken() != JsonToken.END_OBJECT) {
+ String fieldname = jp.getCurrentName();
+ if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
+ // if we're not already on the field, advance by one.
+ jp.nextToken();
+ }
+
+ if ("target".equals(fieldname)) {
+ target = jp.readValueAs(Object.class);
+ }
+ }
+
+ if (target == null) {
+ throw ctxt.mappingException(ChannelRef.class);
+ }
+
+ return new ChannelRef(target);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
new file mode 100644
index 0000000..bdc4432
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.io.IOException;
+
+import org.apache.ode.jacob.ChannelRef;
+import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.Message;
+import org.apache.ode.jacob.oo.Channel;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+/**
+ * Jackson serializer for {@link Message} objects.
+ *
+ * @author Tammo van Lessen
+ *
+ */
+public class ChannelRefSerializer extends StdSerializer<ChannelRef> {
+
+ public ChannelRefSerializer() {
+ super(ChannelRef.class);
+ }
+
+ @Override
+ public void serialize(ChannelRef value, JsonGenerator jgen,
+ SerializerProvider provider) throws IOException,
+ JsonGenerationException {
+ jgen.writeStartObject();
+ serializeContents(value, jgen, provider);
+ jgen.writeEndObject();
+ }
+
+
+ @Override
+ public void serializeWithType(ChannelRef value, JsonGenerator jgen,
+ SerializerProvider provider, TypeSerializer typeSer)
+ throws IOException, JsonProcessingException {
+ typeSer.writeTypePrefixForObject(value, jgen);
+ serializeContents(value, jgen, provider);
+ typeSer.writeTypeSuffixForObject(value, jgen);
+ }
+
+ private void serializeContents(ChannelRef value, JsonGenerator jgen,
+ SerializerProvider provider) throws JsonGenerationException, IOException {
+
+ jgen.writeObjectField("target", value.getEndpoint(value.getType() == ChannelRef.Type.CHANNEL
+ ? Channel.class : JacobObject.class));
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
index 0ccb46a..00076e9 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
@@ -95,7 +95,7 @@
jgen.writeNumberField("currentCycle", value._currentCycle);
// write continuations
- jgen.writeObjectField("continuations", value._reactions.toArray(new Message[]{}));
+ jgen.writeObjectField("messages", value._messages.toArray(new Message[]{}));
// channel garbage collection
@@ -156,9 +156,9 @@
soup._objIdCounter = jp.getIntValue();
} else if ("currentCycle".equals(fieldname)) {
soup._currentCycle = jp.getIntValue();
- } else if ("continuations".equals(fieldname)) {
+ } else if ("messages".equals(fieldname)) {
Message[] cs = (Message[])jp.readValueAs(Message[].class);
- soup._reactions = new HashSet<Message>(Arrays.asList(cs));
+ soup._messages = new HashSet<Message>(Arrays.asList(cs));
} else if ("channels".equals(fieldname)) {
soup._channels = new HashMap<Integer, ChannelFrame>();
ChannelFrame[] frames = jp.readValueAs(ChannelFrame[].class);
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
index 2d77f2c..0175046 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
@@ -18,6 +18,7 @@
*/
package org.apache.ode.jacob.soup.jackson;
+import org.apache.ode.jacob.ChannelRef;
import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ChannelProxy;
@@ -49,9 +50,11 @@
addSerializer(ChannelProxy.class, cps);
addSerializer(Message.class, new MessageSerializer());
addSerializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplSerializer(cps));
+ addSerializer(ChannelRef.class, new ChannelRefSerializer());
addDeserializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplDeserializer());
addDeserializer(Message.class, new MessageDeserializer());
addDeserializer(Channel.class, new ChannelProxyDeserializer());
+ addDeserializer(ChannelRef.class, new ChannelRefDeserializer());
setDeserializerModifier(new BeanDeserializerModifier() {
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
index 66520eb..73b77c3 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
@@ -20,6 +20,7 @@
import java.util.Collection;
+import org.apache.ode.jacob.ChannelRef;
import org.apache.ode.jacob.JacobObject;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ChannelProxy;
@@ -39,7 +40,6 @@
import com.fasterxml.jackson.databind.jsontype.impl.ClassNameIdResolver;
import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder;
import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
-
import com.fasterxml.jackson.databind.type.TypeFactory;
/**
@@ -90,7 +90,11 @@
return true;
}
- if (t.getRawClass() == Object.class) {
+ if (ChannelRef.class.isAssignableFrom(t.getRawClass())) {
+ return true;
+ }
+
+ if (t.getRawClass() == Object.class || t.isArrayType()) {
return true;
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
index 0f74fcb..94b53cb 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
@@ -19,15 +19,16 @@
package org.apache.ode.jacob.soup.jackson;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.ChannelRef;
import org.apache.ode.jacob.Message;
-import org.apache.ode.jacob.oo.Channel;
-import org.apache.ode.jacob.oo.ClassUtil;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
@@ -40,6 +41,7 @@
public class MessageDeserializer extends StdDeserializer<Message> {
private static final long serialVersionUID = 1L;
+ private TypeReference<HashMap<String,Object>> mapTypeRef = new TypeReference<HashMap<String, Object>>() {};
protected MessageDeserializer() {
super(Message.class);
@@ -50,10 +52,12 @@
DeserializationContext ctxt) throws IOException,
JsonProcessingException {
- JacobObject target = null;
+ long id = 0;
String action = null;
- Object[] args = null;
- Channel replyTo = null;
+ ChannelRef to = null;
+ ChannelRef replyTo = null;
+ Map<String, Object> headers = null;
+ Object body = null;
while (jp.nextToken() != JsonToken.END_OBJECT) {
String fieldname = jp.getCurrentName();
@@ -62,26 +66,34 @@
jp.nextToken();
}
- if ("target".equals(fieldname)) {
- target = jp.readValueAs(JacobObject.class);
- } else if ("method".equals(fieldname)) {
+ if ("id".equals(fieldname)) {
+ id = jp.getLongValue();
+ } else if ("action".equals(fieldname)) {
action = jp.getText();
- } else if ("args".equals(fieldname)) {
- args = jp.readValueAs(Object[].class);
+ } else if ("to".equals(fieldname)) {
+ to = jp.readValueAs(ChannelRef.class);
} else if ("replyTo".equals(fieldname)) {
- replyTo = jp.readValueAs(Channel.class);
+ replyTo = jp.readValueAs(ChannelRef.class);
+ } else if ("headers".equals(fieldname)) {
+ headers = jp.readValueAs(mapTypeRef);
+ } else if ("body".equals(fieldname)) {
+ body = jp.readValueAs(Object.class);
}
}
- if (target == null) {
- throw ctxt.mappingException(Message.class);
- }
-
if (action == null) {
throw ctxt.mappingException(Message.class);
}
- // TODO: pass the replyTo channel to the Message
- return ClassUtil.createMessage(target, action, args, replyTo);
+ if (to == null) {
+ throw ctxt.mappingException(Message.class);
+ }
+
+ Message msg = new Message(to, replyTo, action);
+ msg.setHeaders(headers);
+ msg.setBody(body);
+ msg.setId(id);
+
+ return msg;
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageSerializer.java
index 06c7602..ee3ddd5 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageSerializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageSerializer.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import org.apache.ode.jacob.Message;
-import org.apache.ode.jacob.oo.ClassUtil;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -63,9 +62,19 @@
private void serializeContents(Message value, JsonGenerator jgen,
SerializerProvider provider) throws JsonGenerationException, IOException {
-
- jgen.writeObjectField("target", ClassUtil.getMessageClosure(value));
- jgen.writeStringField("method", value.getAction());
- jgen.writeObjectField("args", value.getBody());
+
+ jgen.writeNumberField("id", value.getId());
+ jgen.writeStringField("action", value.getAction());
+ jgen.writeObjectField("to", value.getTo());
+ if (value.getReplyTo() != null) {
+ jgen.writeObjectField("replyTo", value.getTo());
+ }
+ if (value.getHeaders() != null && !value.getHeaders().isEmpty()) {
+ jgen.writeObjectField("headers", value.getHeaders());
+ }
+ if (value.getBody() != null) {
+ jgen.writeObjectField("body", value.getBody());
+ }
+
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index db667f5..6f64bbb 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -71,7 +71,7 @@
/**
* Cached set of enqueued {@link Message} objects (i.e. those read using
- * {@link #enqueueReaction(Message)}).
+ * {@link #enqueueMessage(Message)}).
* These reactions are "cached"--that is it is not sent directly to the DAO
* layer--to minimize unnecessary serialization/deserialization of closures.
* This is a pretty useful optimization, as most {@link Message}s are
@@ -82,7 +82,7 @@
* forward progress; this scenario would occur if a maximum processign
* time-per-instance policy were in effect.
*/
- protected Set<Message> _reactions = new LinkedHashSet<Message>();
+ protected Set<Message> _messages = new LinkedHashSet<Message>();
protected Map<Integer, ChannelFrame> _channels = new LinkedHashMap<Integer, ChannelFrame>();
@@ -126,18 +126,18 @@
assignId(channel, cframe.getId());
}
- public void enqueueReaction(Message message) {
+ public void enqueueMessage(Message message) {
LOG.trace(">> enqueueReaction (message={})", message);
- _reactions.add(message);
+ _messages.add(message);
}
- public Message dequeueReaction() {
+ public Message dequeueMessage() {
LOG.trace(">> dequeueReaction ()");
Message message = null;
- if (!_reactions.isEmpty()) {
- Iterator<Message> it = _reactions.iterator();
+ if (!_messages.isEmpty()) {
+ Iterator<Message> it = _messages.iterator();
message = it.next();
it.remove();
}
@@ -162,8 +162,7 @@
chnlFrame.replicatedSend = true;
CommSend commSend = (CommSend) comm;
- MessageFrame mframe = new MessageFrame(commGroupFrame, chnlFrame, commSend.getMethod().getName(),
- commSend.getArgs(), commSend.getReplyChannel());
+ MessageFrame mframe = new MessageFrame(commGroupFrame, chnlFrame, commSend.getMessage());
commGroupFrame.commFrames.add(mframe);
chnlFrame.msgFrames.add(mframe);
} else if (comm instanceof CommRecv) {
@@ -222,7 +221,7 @@
}
public boolean hasReactions() {
- return !_reactions.isEmpty();
+ return !_messages.isEmpty();
}
public void flush() {
@@ -231,7 +230,7 @@
public void read(InputStream iis) throws IOException, ClassNotFoundException {
_channels.clear();
- _reactions.clear();
+ _messages.clear();
_index.clear();
ExecutionQueueInputStream sis = new ExecutionQueueInputStream(iis);
@@ -249,7 +248,7 @@
args[j] = sis.readObject();
}
Channel replyTo = (Channel) sis.readObject();
- _reactions.add(ClassUtil.createMessage(closure, ClassUtil.getActionForMethod(method), args, replyTo));
+ _messages.add(ClassUtil.createMessage(closure, ClassUtil.getActionForMethod(method), args, replyTo));
}
int numChannels = sis.readInt();
@@ -292,9 +291,9 @@
sos.writeInt(_currentCycle);
// Write out the reactions.
- sos.writeInt(_reactions.size());
- for (Message m : _reactions) {
- sos.writeObject(ClassUtil.getMessageClosure(m));
+ sos.writeInt(_messages.size());
+ for (Message m : _messages) {
+ sos.writeObject(m.getTo().getEndpoint(JacobObject.class));
// TODO: we need to write the replyTo object too
sos.writeUTF(m.getAction());
Object[] args = (Object[])m.getBody();
@@ -343,7 +342,7 @@
public boolean isComplete() {
// If we have more reactions we're not done.
- if (!_reactions.isEmpty()) {
+ if (!_messages.isEmpty()) {
return false;
}
@@ -362,11 +361,11 @@
ps.println(" state dump:");
ps.println("-- GENERAL INFO");
ps.println(" Current Cycle : " + _currentCycle);
- ps.println(" Num. Reactions : " + _reactions.size());
- if (!_reactions.isEmpty()) {
+ ps.println(" Num. Reactions : " + _messages.size());
+ if (!_messages.isEmpty()) {
ps.println("-- REACTIONS");
int cnt = 0;
- for (Message m : _reactions) {
+ for (Message m : _messages) {
ps.println(" #" + (++cnt) + ": " + m.toString());
}
}
@@ -388,10 +387,10 @@
MessageFrame mframe = cframe.msgFrames.iterator().next();
ObjectFrame oframe = cframe.objFrames.iterator().next();
- Message message = ClassUtil.createMessage(oframe._continuation,
- ClassUtil.getActionForMethod(oframe._continuation.getMethod(mframe.method)),
- mframe.args, mframe.replyChannel);
- enqueueReaction(message);
+ Message msg = Message.copyFrom(mframe.message);
+ msg.setTo(new org.apache.ode.jacob.ChannelRef(oframe._continuation));
+
+ enqueueMessage(msg);
if (!mframe.commGroupFrame.replicated) {
removeCommGroup(mframe.commGroupFrame);
}
@@ -611,38 +610,25 @@
protected static class MessageFrame extends CommFrame implements Externalizable {
private static final long serialVersionUID = -1112437852498126297L;
- String method;
- Object[] args;
- Channel replyChannel;
+ Message message;
// Used for deserialization
public MessageFrame() {
}
- public MessageFrame(CommGroupFrame commFrame, ChannelFrame channelFrame, String method, Object[] args, Channel replyChannel) {
+ public MessageFrame(CommGroupFrame commFrame, ChannelFrame channelFrame, Message msg) {
super(commFrame, channelFrame);
- this.method = method;
- this.args = args == null ? new Class[]{} : args;
- this.replyChannel = replyChannel;
+ this.message = msg;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- method = in.readUTF();
- int numArgs = in.readInt();
- args = new Object[numArgs];
- for (int i = 0; i < numArgs; ++i) {
- args[i] = in.readObject();
- }
+ message = (Message)in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(method);
- out.writeInt(args.length);
- for (int i = 0; i < args.length; ++i) {
- out.writeObject(args[i]);
- }
+ out.writeObject(message);
}
}
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index eb66ccf..38e7dd8 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -93,7 +93,7 @@
}
_cycle = _executionQueue.cycle();
- Message rqe = _executionQueue.dequeueReaction();
+ Message rqe = _executionQueue.dequeueMessage();
JacobThreadImpl jt = new JacobThreadImpl(rqe);
long ctime = System.currentTimeMillis();
@@ -145,7 +145,7 @@
public void addReaction(JacobObject jo, String action, Object[] args, String desc) {
LOG.trace(">> addReaction (jo={}, method={}, args={}, desc={})", jo, action, args, desc);
- _executionQueue.enqueueReaction(ClassUtil.createMessage(jo, action, args, null));
+ _executionQueue.enqueueMessage(ClassUtil.createMessage(jo, action, args, null));
++_statistics.runQueueEntries;
}
@@ -267,7 +267,10 @@
}
CommChannel chnl = (CommChannel) ChannelFactory.getBackend((Channel)channel);
CommGroup grp = new CommGroup(false);
- CommSend send = new CommSend(chnl, method, args, replyChannel);
+
+ Message msg = ClassUtil.createMessage(channel, ClassUtil.getActionForMethod(method), args, replyChannel);
+
+ CommSend send = new CommSend(chnl, msg);
grp.add(send);
_executionQueue.add(grp);
return replyChannel;
@@ -387,18 +390,18 @@
LOG.trace(">> [{}] : {}", _cycle, _source);
stackThread();
- Synch replyTo = (Synch)ClassUtil.target().evaluate(message, Channel.class);
+ Channel replyTo = message.getReplyTo() != null ? message.getReplyTo().getEndpoint(Channel.class) : null;
long ctime = System.currentTimeMillis();
try {
- JacobObject target = ClassUtil.getMessageClosure(message);
+ JacobObject target = message.getTo().getEndpoint(JacobObject.class);
if (target instanceof ReceiveProcess) {
((ReceiveProcess)target).onMessage(message);
} else {
((Runnable)target).run();
}
if (replyTo != null) {
- replyTo.ret();
+ ((Synch)replyTo).ret();
}
} finally {
ctime = System.currentTimeMillis() - ctime;
diff --git a/src/test/java/org/apache/ode/jacob/soup/jackson/JacksonSoupTest.java b/src/test/java/org/apache/ode/jacob/soup/jackson/JacksonSoupTest.java
index 6fc3ca0..8a09f50 100644
--- a/src/test/java/org/apache/ode/jacob/soup/jackson/JacksonSoupTest.java
+++ b/src/test/java/org/apache/ode/jacob/soup/jackson/JacksonSoupTest.java
@@ -53,20 +53,20 @@
queue = new JacksonExecutionQueueImpl();
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":1,\"continuations\":[{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess\",\"@id\":1,\"_in\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}},\"method\":\"java.lang.Runnable#run\",\"args\":[]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess\",\"@id\":1,\"in\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2},\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}},\"method\":\"java.lang.Runnable#run\",\"args\":[]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"Hello\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}},\"method\":\"java.lang.Runnable#run\",\"args\":[]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"World\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}},\"method\":\"java.lang.Runnable#run\",\"args\":[]}],\"channels\":[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":false,\"objFrames\":[],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"},{\"@id\":2,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":2,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":false,\"objFrames\":[],\"msgFrames\":[],\"description\":\"simpleHelloWorld-x\"}],\"global\":null}");
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":2,\"continuations\":[{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess\",\"@id\":1,\"in\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2},\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}},\"method\":\"java.lang.Runnable#run\",\"args\":[]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"Hello\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}},\"method\":\"java.lang.Runnable#run\",\"args\":[]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"World\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}},\"method\":\"java.lang.Runnable#run\",\"args\":[]}],\"channels\":[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"},{\"@id\":5,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":2,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":false,\"objFrames\":[],\"msgFrames\":[],\"description\":\"simpleHelloWorld-x\"}],\"global\":null}");
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":3,\"continuations\":[{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"Hello\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}},\"method\":\"java.lang.Runnable#run\",\"args\":[]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"World\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}},\"method\":\"java.lang.Runnable#run\",\"args\":[]}],\"channels\":[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"},{\"@id\":5,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":2,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":6,\"commGroupFrame\":{\"@id\":7,\"replicated\":true,\"commFrames\":[6]},\"channelFrame\":5,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":8,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":9,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-x\"}],\"global\":null}");
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":4,\"continuations\":[{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"World\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}},\"method\":\"java.lang.Runnable#run\",\"args\":[]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":2,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}},\"method\":\"org.apache.ode.jacob.oo.Val#val\",\"args\":[\"Hello\"]}],\"channels\":[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"},{\"@id\":5,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":2,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":6,\"commGroupFrame\":{\"@id\":7,\"replicated\":true,\"commFrames\":[6]},\"channelFrame\":5,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":8,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":9,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-x\"}],\"global\":null}");
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":5,\"continuations\":[{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":2,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}},\"method\":\"org.apache.ode.jacob.oo.Val#val\",\"args\":[\"Hello\"]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":2,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}},\"method\":\"org.apache.ode.jacob.oo.Val#val\",\"args\":[\"World\"]}],\"channels\":[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"}],\"global\":null}");
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":6,\"continuations\":[{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":2,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}},\"method\":\"org.apache.ode.jacob.oo.Val#val\",\"args\":[\"World\"]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}},\"method\":\"org.apache.ode.jacob.oo.Val#val\",\"args\":[\"Hello\"]}],\"channels\":[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"}],\"global\":null}");
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":7,\"continuations\":[{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}},\"method\":\"org.apache.ode.jacob.oo.Val#val\",\"args\":[\"Hello\"]},{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}},\"method\":\"org.apache.ode.jacob.oo.Val#val\",\"args\":[\"World\"]}],\"channels\":[],\"global\":null}");
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":8,\"continuations\":[{\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}},\"method\":\"org.apache.ode.jacob.oo.Val#val\",\"args\":[\"World\"]}],\"channels\":[],\"global\":null}");
- fixtures.add("{\"objIdCounter\":2,\"currentCycle\":9,\"continuations\":[],\"channels\":[],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":1,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess\",\"@id\":1,\"_in\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}},\"body\":[\"[Ljava.lang.Class;\",[]]},{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess\",\"@id\":1,\"in\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2},\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}},\"body\":[\"[Ljava.lang.Class;\",[]]},{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"Hello\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}}},\"body\":[\"[Ljava.lang.Class;\",[]]},{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"World\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}}},\"body\":[\"[Ljava.lang.Class;\",[]]}]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":false,\"objFrames\":[],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"},{\"@id\":2,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":2,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":false,\"objFrames\":[],\"msgFrames\":[],\"description\":\"simpleHelloWorld-x\"}]],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":2,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess\",\"@id\":1,\"in\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2},\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}},\"body\":[\"[Ljava.lang.Class;\",[]]},{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"Hello\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}}},\"body\":[\"[Ljava.lang.Class;\",[]]},{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"World\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}}},\"body\":[\"[Ljava.lang.Class;\",[]]}]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"},{\"@id\":5,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":2,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":false,\"objFrames\":[],\"msgFrames\":[],\"description\":\"simpleHelloWorld-x\"}]],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":3,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"Hello\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}}},\"body\":[\"[Ljava.lang.Class;\",[]]},{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"World\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}}},\"body\":[\"[Ljava.lang.Class;\",[]]}]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"},{\"@id\":5,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":2,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":6,\"commGroupFrame\":{\"@id\":7,\"replicated\":true,\"commFrames\":[6]},\"channelFrame\":5,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":8,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":9,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-x\"}]],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":4,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[{\"id\":0,\"action\":\"java.lang.Runnable#run\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$StringEmitterProcess\",\"@id\":1,\"str\":\"World\",\"to\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":2}}},\"body\":[\"[Ljava.lang.Class;\",[]]},{\"id\":0,\"action\":\"org.apache.ode.jacob.oo.Val#val\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":2,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}}},\"body\":[\"[Ljava.lang.Object;\",[\"Hello\"]]}]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"},{\"@id\":5,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":2,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":6,\"commGroupFrame\":{\"@id\":7,\"replicated\":true,\"commFrames\":[6]},\"channelFrame\":5,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":8,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":9,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-x\"}]],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":5,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[{\"id\":0,\"action\":\"org.apache.ode.jacob.oo.Val#val\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":2,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}}},\"body\":[\"[Ljava.lang.Object;\",[\"Hello\"]]},{\"id\":0,\"action\":\"org.apache.ode.jacob.oo.Val#val\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":2,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}}},\"body\":[\"[Ljava.lang.Object;\",[\"World\"]]}]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"}]],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":6,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[{\"id\":0,\"action\":\"org.apache.ode.jacob.oo.Val#val\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$ForwarderProcess$ForwarderProcessVal\",\"@id\":2,\"out\":{\"@class\":\"org.apache.ode.jacob.oo.Val\",\"channelType\":\"org.apache.ode.jacob.oo.Val\",\"channelId\":1}}}},\"body\":[\"[Ljava.lang.Object;\",[\"World\"]]},{\"id\":0,\"action\":\"org.apache.ode.jacob.oo.Val#val\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}},\"body\":[\"[Ljava.lang.Object;\",[\"Hello\"]]}]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[{\"@id\":1,\"type\":\"org.apache.ode.jacob.oo.Val\",\"id\":1,\"refCount\":0,\"replicatedSend\":false,\"replicatedRecv\":true,\"objFrames\":[{\"@id\":2,\"commGroupFrame\":{\"@id\":3,\"replicated\":true,\"commFrames\":[2]},\"channelFrame\":1,\"_continuation\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":4,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}}],\"msgFrames\":[],\"description\":\"simpleHelloWorld-out\"}]],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":7,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[{\"id\":0,\"action\":\"org.apache.ode.jacob.oo.Val#val\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}},\"body\":[\"[Ljava.lang.Object;\",[\"Hello\"]]},{\"id\":0,\"action\":\"org.apache.ode.jacob.oo.Val#val\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}},\"body\":[\"[Ljava.lang.Object;\",[\"World\"]]}]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[]],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":8,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[{\"id\":0,\"action\":\"org.apache.ode.jacob.oo.Val#val\",\"to\":{\"@class\":\"org.apache.ode.jacob.ChannelRef\",\"target\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessReceiveProcess\",\"@id\":1,\"receiver\":{\"@class\":\"org.apache.ode.jacob.examples.helloworld.HelloWorld$PrinterProcess$PrinterProcessVal\"}}},\"body\":[\"[Ljava.lang.Object;\",[\"World\"]]}]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[]],\"global\":null}");
+ fixtures.add("{\"objIdCounter\":2,\"currentCycle\":9,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[]],\"global\":null}");
}
@Test
public void testEmptySerialization() throws Exception {
- Assert.assertEquals("{\"objIdCounter\":0,\"currentCycle\":0,\"continuations\":[],\"channels\":[],\"global\":null}", mapper.writeValueAsString(queue));
+ Assert.assertEquals("{\"objIdCounter\":0,\"currentCycle\":0,\"messages\":[\"[Lorg.apache.ode.jacob.Message;\",[]],\"channels\":[\"[Lorg.apache.ode.jacob.vpu.ExecutionQueueImpl$ChannelFrame;\",[]],\"global\":null}", mapper.writeValueAsString(queue));
}