Completely replaced the Continuation/Method based reactions with a Message based model
diff --git a/src/main/java/org/apache/ode/jacob/soup/Continuation.java b/src/main/java/org/apache/ode/jacob/soup/Continuation.java
deleted file mode 100644
index b325b78..0000000
--- a/src/main/java/org/apache/ode/jacob/soup/Continuation.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ode.jacob.soup;
-
-import org.apache.ode.jacob.JacobObject;
-import org.apache.ode.jacob.Message;
-import org.apache.ode.jacob.oo.Channel;
-import org.apache.ode.jacob.oo.ClassUtil;
-
-
-/**
- * DOCUMENTME.
- * <p>Created on Feb 16, 2004 at 9:23:40 PM.</p>
- *
- * @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
- */
-public class Continuation extends ExecutionQueueObject {
- private final Message message;
-
- public Continuation(JacobObject target, String action, Object[] args, Channel replyTo) {
- message = ClassUtil.createMessage(target, action, args, replyTo);
- }
-
- public Message getMessage() {
- return message;
- }
-
- public String toString () {
- return new StringBuilder("{")
- .append(this.getClass().getSimpleName())
- .append(", method=").append(message.getAction())
- .append("}").toString();
- }
-
-}
diff --git a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java
index e32ca7c..9fafc45 100644
--- a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java
+++ b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java
@@ -20,6 +20,8 @@
import java.io.PrintStream;
+import org.apache.ode.jacob.Message;
+
/**
* The soup, the reactive "broth" that underlies the JACOB system. The {@link ExecutionQueue}
* implementation is responsible for implementing the JACOB reactive rules and
@@ -40,11 +42,11 @@
* Add a continuation to the broth. This operation is sometimes
* referred to as an "injection"; it can be used to inject into the
* broth the "original" continuation.
- * @param continuation the {@link Continuation} to add to the broth
+ * @param message the {@link Message} to add to the broth
*/
- public void enqueueReaction(Continuation continuation);
+ public void enqueueReaction(Message message);
- public Continuation dequeueReaction();
+ public Message dequeueReaction();
public void add(CommChannel channel);
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
index c3d100d..0ccb46a 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
@@ -26,7 +26,7 @@
import java.util.Iterator;
import java.util.Set;
-import org.apache.ode.jacob.soup.Continuation;
+import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,7 +95,7 @@
jgen.writeNumberField("currentCycle", value._currentCycle);
// write continuations
- jgen.writeObjectField("continuations", value._reactions.toArray(new Continuation[] {}));
+ jgen.writeObjectField("continuations", value._reactions.toArray(new Message[]{}));
// channel garbage collection
@@ -157,8 +157,8 @@
} else if ("currentCycle".equals(fieldname)) {
soup._currentCycle = jp.getIntValue();
} else if ("continuations".equals(fieldname)) {
- Continuation[] cs = (Continuation[])jp.readValueAs(Continuation[].class);
- soup._reactions = new HashSet<Continuation>(Arrays.asList(cs));
+ Message[] cs = (Message[])jp.readValueAs(Message[].class);
+ soup._reactions = new HashSet<Message>(Arrays.asList(cs));
} else if ("channels".equals(fieldname)) {
soup._channels = new HashMap<Integer, ChannelFrame>();
ChannelFrame[] frames = jp.readValueAs(ChannelFrame[].class);
@@ -168,7 +168,6 @@
} else if ("global".equals(fieldname)) {
soup._gdata = jp.readValueAs(Serializable.class);
}
-
}
return soup;
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
index f42b4fe..2d77f2c 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
@@ -18,9 +18,9 @@
*/
package org.apache.ode.jacob.soup.jackson;
+import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ChannelProxy;
-import org.apache.ode.jacob.soup.Continuation;
import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplDeserializer;
import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer;
@@ -47,10 +47,10 @@
final ChannelProxySerializer cps = new ChannelProxySerializer();
addSerializer(ChannelProxy.class, cps);
- addSerializer(Continuation.class, new MessageSerializer());
+ addSerializer(Message.class, new MessageSerializer());
addSerializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplSerializer(cps));
addDeserializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplDeserializer());
- addDeserializer(Continuation.class, new MessageDeserializer());
+ addDeserializer(Message.class, new MessageDeserializer());
addDeserializer(Channel.class, new ChannelProxyDeserializer());
setDeserializerModifier(new BeanDeserializerModifier() {
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
index 28e71c1..0f74fcb 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
@@ -21,8 +21,9 @@
import java.io.IOException;
import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.oo.Channel;
-import org.apache.ode.jacob.soup.Continuation;
+import org.apache.ode.jacob.oo.ClassUtil;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -31,21 +32,21 @@
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
/**
- * Jackson deserializer for Continuation objects.
+ * Jackson deserializer for Message objects.
*
* @author Tammo van Lessen
*
*/
-public class MessageDeserializer extends StdDeserializer<Continuation> {
+public class MessageDeserializer extends StdDeserializer<Message> {
private static final long serialVersionUID = 1L;
protected MessageDeserializer() {
- super(Continuation.class);
+ super(Message.class);
}
@Override
- public Continuation deserialize(JsonParser jp,
+ public Message deserialize(JsonParser jp,
DeserializationContext ctxt) throws IOException,
JsonProcessingException {
@@ -73,14 +74,14 @@
}
if (target == null) {
- throw ctxt.mappingException(Continuation.class);
+ throw ctxt.mappingException(Message.class);
}
if (action == null) {
- throw ctxt.mappingException(Continuation.class);
+ throw ctxt.mappingException(Message.class);
}
- // TODO: pass the replyTo channel to the Continuation
- return new Continuation(target, action, args, replyTo);
+ // TODO: pass the replyTo channel to the Message
+ return ClassUtil.createMessage(target, action, args, replyTo);
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageSerializer.java
index 81525c8..06c7602 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageSerializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageSerializer.java
@@ -20,8 +20,8 @@
import java.io.IOException;
+import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.oo.ClassUtil;
-import org.apache.ode.jacob.soup.Continuation;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -31,19 +31,19 @@
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
/**
- * Jackson serializer for Continuation objects.
+ * Jackson serializer for {@link Message} objects.
*
* @author Tammo van Lessen
*
*/
-public class MessageSerializer extends StdSerializer<Continuation> {
+public class MessageSerializer extends StdSerializer<Message> {
public MessageSerializer() {
- super(Continuation.class);
+ super(Message.class);
}
@Override
- public void serialize(Continuation value, JsonGenerator jgen,
+ public void serialize(Message value, JsonGenerator jgen,
SerializerProvider provider) throws IOException,
JsonGenerationException {
jgen.writeStartObject();
@@ -53,7 +53,7 @@
@Override
- public void serializeWithType(Continuation value, JsonGenerator jgen,
+ public void serializeWithType(Message value, JsonGenerator jgen,
SerializerProvider provider, TypeSerializer typeSer)
throws IOException, JsonProcessingException {
typeSer.writeTypePrefixForObject(value, jgen);
@@ -61,11 +61,11 @@
typeSer.writeTypeSuffixForObject(value, jgen);
}
- private void serializeContents(Continuation value, JsonGenerator jgen,
+ private void serializeContents(Message value, JsonGenerator jgen,
SerializerProvider provider) throws JsonGenerationException, IOException {
- jgen.writeObjectField("target", ClassUtil.getMessageClosure(value.getMessage()));
- jgen.writeStringField("method", value.getMessage().getAction());
- jgen.writeObjectField("args", value.getMessage().getBody());
+ jgen.writeObjectField("target", ClassUtil.getMessageClosure(value));
+ jgen.writeStringField("method", value.getAction());
+ jgen.writeObjectField("args", value.getBody());
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index 80a2e82..db667f5 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -43,6 +43,7 @@
import org.apache.ode.jacob.IndexedObject;
import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ChannelListener;
import org.apache.ode.jacob.oo.ClassUtil;
@@ -51,7 +52,6 @@
import org.apache.ode.jacob.soup.CommGroup;
import org.apache.ode.jacob.soup.CommRecv;
import org.apache.ode.jacob.soup.CommSend;
-import org.apache.ode.jacob.soup.Continuation;
import org.apache.ode.jacob.soup.ExecutionQueue;
import org.apache.ode.jacob.soup.ExecutionQueueObject;
import org.apache.ode.jacob.soup.ReplacementMap;
@@ -70,19 +70,19 @@
private ClassLoader _classLoader;
/**
- * Cached set of enqueued {@link Continuation} objects (i.e. those read using
- * {@link #enqueueReaction(org.apache.ode.jacob.soup.Continuation)}).
+ * Cached set of enqueued {@link Message} objects (i.e. those read using
+ * {@link #enqueueReaction(Message)}).
* These reactions are "cached"--that is it is not sent directly to the DAO
* layer--to minimize unnecessary serialization/deserialization of closures.
- * This is a pretty useful optimization, as most {@link Continuation}s are
+ * This is a pretty useful optimization, as most {@link Message}s are
* enqueued, and then immediately dequeued in the next cycle. By caching
- * {@link Continuation}s, we eliminate practically all serialization of
+ * {@link Message}s, we eliminate practically all serialization of
* these objects, the only exception being cases where the system decides to
* stop processing a particular soup despite the soup being able to make
* forward progress; this scenario would occur if a maximum processign
* time-per-instance policy were in effect.
*/
- protected Set<Continuation> _reactions = new LinkedHashSet<Continuation>();
+ protected Set<Message> _reactions = new LinkedHashSet<Message>();
protected Map<Integer, ChannelFrame> _channels = new LinkedHashMap<Integer, ChannelFrame>();
@@ -126,23 +126,22 @@
assignId(channel, cframe.getId());
}
- public void enqueueReaction(Continuation continuation) {
- LOG.trace(">> enqueueReaction (continuation={})", continuation);
+ public void enqueueReaction(Message message) {
+ LOG.trace(">> enqueueReaction (message={})", message);
- verifyNew(continuation);
- _reactions.add(continuation);
+ _reactions.add(message);
}
- public Continuation dequeueReaction() {
+ public Message dequeueReaction() {
LOG.trace(">> dequeueReaction ()");
- Continuation continuation = null;
+ Message message = null;
if (!_reactions.isEmpty()) {
- Iterator<Continuation> it = _reactions.iterator();
- continuation = it.next();
+ Iterator<Message> it = _reactions.iterator();
+ message = it.next();
it.remove();
}
- return continuation;
+ return message;
}
public void add(CommGroup group) {
@@ -250,7 +249,7 @@
args[j] = sis.readObject();
}
Channel replyTo = (Channel) sis.readObject();
- _reactions.add(new Continuation(closure, ClassUtil.getActionForMethod(method), args, replyTo));
+ _reactions.add(ClassUtil.createMessage(closure, ClassUtil.getActionForMethod(method), args, replyTo));
}
int numChannels = sis.readInt();
@@ -294,10 +293,11 @@
// Write out the reactions.
sos.writeInt(_reactions.size());
- for (Continuation c : _reactions) {
- sos.writeObject(ClassUtil.getMessageClosure(c.getMessage()));
- sos.writeUTF(c.getMessage().getAction());
- Object[] args = (Object[])c.getMessage().getBody();
+ for (Message m : _reactions) {
+ sos.writeObject(ClassUtil.getMessageClosure(m));
+ // TODO: we need to write the replyTo object too
+ sos.writeUTF(m.getAction());
+ Object[] args = (Object[])m.getBody();
sos.writeInt(args == null ? 0 : args.length);
for (Object a : args) {
sos.writeObject(a);
@@ -366,8 +366,8 @@
if (!_reactions.isEmpty()) {
ps.println("-- REACTIONS");
int cnt = 0;
- for (Continuation continuation : _reactions) {
- ps.println(" #" + (++cnt) + ": " + continuation.toString());
+ for (Message m : _reactions) {
+ ps.println(" #" + (++cnt) + ": " + m.toString());
}
}
if (!_channels.isEmpty()) {
@@ -388,12 +388,10 @@
MessageFrame mframe = cframe.msgFrames.iterator().next();
ObjectFrame oframe = cframe.objFrames.iterator().next();
- Continuation continuation = new Continuation(oframe._continuation,
- ClassUtil.getActionForMethod(oframe._continuation.getMethod(mframe.method)), mframe.args, mframe.replyChannel);
- if (LOG.isInfoEnabled()) {
- continuation.setDescription(channel + " ? {...} | " + channel + " ! " + mframe.method + "(...)");
- }
- enqueueReaction(continuation);
+ Message message = ClassUtil.createMessage(oframe._continuation,
+ ClassUtil.getActionForMethod(oframe._continuation.getMethod(mframe.method)),
+ mframe.args, mframe.replyChannel);
+ enqueueReaction(message);
if (!mframe.commGroupFrame.replicated) {
removeCommGroup(mframe.commGroupFrame);
}
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index 0e3a081..eb66ccf 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -18,7 +18,6 @@
*/
package org.apache.ode.jacob.vpu;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
@@ -37,7 +36,6 @@
import org.apache.ode.jacob.soup.CommGroup;
import org.apache.ode.jacob.soup.CommRecv;
import org.apache.ode.jacob.soup.CommSend;
-import org.apache.ode.jacob.soup.Continuation;
import org.apache.ode.jacob.soup.ExecutionQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,7 +93,7 @@
}
_cycle = _executionQueue.cycle();
- Continuation rqe = _executionQueue.dequeueReaction();
+ Message rqe = _executionQueue.dequeueReaction();
JacobThreadImpl jt = new JacobThreadImpl(rqe);
long ctime = System.currentTimeMillis();
@@ -147,9 +145,7 @@
public void addReaction(JacobObject jo, String action, Object[] args, String desc) {
LOG.trace(">> addReaction (jo={}, method={}, args={}, desc={})", jo, action, args, desc);
- Continuation continuation = new Continuation(jo, action, args, null);
- continuation.setDescription(desc);
- _executionQueue.enqueueReaction(continuation);
+ _executionQueue.enqueueReaction(ClassUtil.createMessage(jo, action, args, null));
++_statistics.runQueueEntries;
}
@@ -233,7 +229,6 @@
}
private class JacobThreadImpl implements Runnable, JacobThread {
- private final JacobObject _methodBody;
private final Message message;
/** Text string identifying the left side of the reduction (for debug). */
@@ -242,13 +237,9 @@
/** Text string identifying the target class and method (for debug) . */
private String _targetStr = "Unknown";
- JacobThreadImpl(Continuation rqe) {
- assert rqe != null;
-
- _methodBody = ClassUtil.getMessageClosure(rqe.getMessage());
- message = rqe.getMessage();
- _source = rqe.getDescription();
- _targetStr = rqe.getMessage().getAction();
+ JacobThreadImpl(Message msg) {
+ message = msg;
+ _targetStr = msg.getAction();
}
public void instance(Runnable template) {
@@ -400,24 +391,15 @@
long ctime = System.currentTimeMillis();
try {
- if (_methodBody instanceof ReceiveProcess) {
- ((ReceiveProcess)_methodBody).onMessage(message);
- // _method.invoke(((ReceiveProcess)_methodBody).getReceiver(), args);
+ JacobObject target = ClassUtil.getMessageClosure(message);
+ if (target instanceof ReceiveProcess) {
+ ((ReceiveProcess)target).onMessage(message);
} else {
- ((Runnable)_methodBody).run();
+ ((Runnable)target).run();
}
if (replyTo != null) {
replyTo.ret();
}
-/*
- } catch (IllegalAccessException iae) {
- throw new RuntimeException("MethodNotAccessible: " + _method.getName() + " in " + _method.getDeclaringClass().getName(), iae);
- } catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
- throw (target instanceof RuntimeException)
- ? (RuntimeException) target
- : new RuntimeException("ClientMethodException: " + _method.getName() + " in " + _methodBody.getClass().getName(), target);
-*/
} finally {
ctime = System.currentTimeMillis() - ctime;
_statistics.totalClientTimeMs += ctime;
@@ -426,7 +408,7 @@
}
public String toString() {
- return "PT[ " + _methodBody + " ]";
+ return "PT[ " + message.getAction() + " ]";
}
private void stackThread() {