fixing the garbage collection for the calculus case.
diff --git a/src/main/java/org/apache/ode/jacob/Jacob.java b/src/main/java/org/apache/ode/jacob/Jacob.java
index 680c383..f963efb 100644
--- a/src/main/java/org/apache/ode/jacob/Jacob.java
+++ b/src/main/java/org/apache/ode/jacob/Jacob.java
@@ -71,7 +71,7 @@
* @param channel
* @return
*/
- public static CommChannel newCommChannel(Class<?> channelType, String description) {
+ public static ChannelRef newCommChannel(Class<?> channelType, String description) {
return JacobVPU.activeJacobThread().newCommChannel(channelType, null, description);
}
@@ -103,11 +103,11 @@
JacobVPU.activeJacobThread().sendMessage(message);
}
- public static void subscribe(boolean replicate, CommChannel channel, MessageListener messageListener) throws IllegalArgumentException {
+ public static void subscribe(boolean replicate, ChannelRef channel, MessageListener messageListener) throws IllegalArgumentException {
JacobVPU.activeJacobThread().subscribe(replicate, channel, messageListener);
}
- public static void subscribe(boolean replicate, CommChannel channel, MessageListener[] messageListeners) throws IllegalArgumentException {
+ public static void subscribe(boolean replicate, ChannelRef channel, MessageListener[] messageListeners) throws IllegalArgumentException {
JacobVPU.activeJacobThread().subscribe(replicate, channel, messageListeners);
}
diff --git a/src/main/java/org/apache/ode/jacob/JacobThread.java b/src/main/java/org/apache/ode/jacob/JacobThread.java
index 0b71422..d89168a 100644
--- a/src/main/java/org/apache/ode/jacob/JacobThread.java
+++ b/src/main/java/org/apache/ode/jacob/JacobThread.java
@@ -47,7 +47,7 @@
* @param description
* @return
*/
- public CommChannel newCommChannel(Class<?> channelType, String creator, String description);
+ public ChannelRef newCommChannel(Class<?> channelType, String creator, String description);
/**
* DOCUMENT ME
@@ -71,8 +71,8 @@
*/
public void sendMessage(Message message);
- public void subscribe(boolean replicate, CommChannel channel, MessageListener methodList) throws IllegalArgumentException;
- public void subscribe(boolean replicate, CommChannel channel, MessageListener[] methodList) throws IllegalArgumentException;
+ public void subscribe(boolean replicate, ChannelRef channel, MessageListener methodList) throws IllegalArgumentException;
+ public void subscribe(boolean replicate, ChannelRef channel, MessageListener[] methodList) throws IllegalArgumentException;
// OO oriented API
diff --git a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
index 82679fc..f6a9315 100644
--- a/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
+++ b/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java
@@ -28,7 +28,7 @@
/**
* A unique idefntifer for this object in the queue (should only be set by queue).
*/
- private Object _id;
+ private Integer _id;
/**
* A human-readable description of the object.
@@ -46,14 +46,14 @@
_description = description;
}
- public void setId(Object id) {
+ public void setId(Integer id) {
if (_id != null) {
throw new IllegalStateException("Object id already set for " + this);
}
_id = id;
}
- public Object getId() {
+ public Integer getId() {
return _id;
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
index c28ad23..95948db 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
@@ -19,12 +19,11 @@
package org.apache.ode.jacob.soup.jackson;
import java.io.IOException;
-import java.util.LinkedHashSet;
-import java.util.Set;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ChannelProxy;
import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer;
import org.apache.ode.jacob.vpu.ChannelFactory;
import com.fasterxml.jackson.core.JsonGenerationException;
@@ -45,10 +44,11 @@
*/
public class ChannelProxySerializer extends StdSerializer<ChannelProxy>{
- private final Set<Integer> serializedChannels = new LinkedHashSet<Integer>();
+ private final ExecutionQueueImplSerializer executionQueueImplSerializer;
- protected ChannelProxySerializer() {
+ protected ChannelProxySerializer(ExecutionQueueImplSerializer executionQueueImplSerializer) {
super(ChannelProxy.class);
+ this.executionQueueImplSerializer = executionQueueImplSerializer;
}
@Override
@@ -79,11 +79,7 @@
jgen.writeNumberField("channelId", cid);
// save channel id for garbage collection
- serializedChannels.add(cid);
- }
-
- public Set<Integer> getSerializedChannels() {
- return serializedChannels;
+ executionQueueImplSerializer.markChannelUsed(cid);
}
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
index 4e226f1..4144279 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefSerializer.java
@@ -22,9 +22,10 @@
import java.lang.reflect.Field;
import org.apache.ode.jacob.ChannelRef;
-import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.ChannelRef.Type;
import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -41,8 +42,11 @@
*/
public class ChannelRefSerializer extends StdSerializer<ChannelRef> {
- public ChannelRefSerializer() {
+ private final ExecutionQueueImplSerializer executionQueueImplSerializer;
+
+ public ChannelRefSerializer(ExecutionQueueImplSerializer executionQueueImplSerializer) {
super(ChannelRef.class);
+ this.executionQueueImplSerializer = executionQueueImplSerializer;
}
@Override
@@ -72,6 +76,10 @@
targetField.setAccessible(true);
jgen.writeObjectField("target", targetField.get(value));
+ if (value.getType() == Type.CHANNEL) {
+ executionQueueImplSerializer.markChannelUsed(value.getEndpoint(CommChannel.class).getId());
+ }
+
} catch (Exception ex) {
throw new RuntimeException(ex);
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
index 73f700b..1a1fe8d 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.ode.jacob.Message;
@@ -59,11 +60,14 @@
public static class ExecutionQueueImplSerializer extends StdSerializer<JacksonExecutionQueueImpl> {
- private ChannelProxySerializer channelProxySerializer;
-
- public ExecutionQueueImplSerializer(ChannelProxySerializer cps) {
+ private Set<Integer> usedChannels = new LinkedHashSet<Integer>();
+
+ public ExecutionQueueImplSerializer() {
super(JacksonExecutionQueueImpl.class);
- this.channelProxySerializer = cps;
+ }
+
+ public void markChannelUsed(int channelId) {
+ usedChannels.add(channelId);
}
@Override
@@ -88,7 +92,7 @@
private void serializeContents(JacksonExecutionQueueImpl value, JsonGenerator jgen,
SerializerProvider provider) throws JsonGenerationException, IOException {
- channelProxySerializer.getSerializedChannels().clear();
+ usedChannels.clear();
// write metadata
jgen.writeNumberField("objIdCounter", value._objIdCounter);
@@ -111,14 +115,13 @@
nullgen.writeObjectField("channels", value._channels.values().toArray(new ChannelFrame[] {}));
// remove unreferenced channels (and keep those which have been exported using export()).
- Set<Integer> referencedChannels = channelProxySerializer.getSerializedChannels();
for (Iterator<ChannelFrame> i = value._channels.values().iterator(); i.hasNext();) {
ChannelFrame cframe = i.next();
- if (referencedChannels.contains(cframe.getId()) || cframe.getRefCount() > 0) {
+ if (usedChannels.contains(cframe.getId()) || cframe.getRefCount() > 0) {
// skip
} else {
LOG.debug("GC Channel: {}", cframe);
- //i.remove();
+ i.remove();
}
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
index 0175046..ddab9aa 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobModule.java
@@ -24,6 +24,7 @@
import org.apache.ode.jacob.oo.ChannelProxy;
import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplDeserializer;
import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl.ExecutionQueueImplSerializer;
+import org.apache.ode.jacob.soup.jackson.ChannelProxySerializer;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.BeanDescription;
@@ -46,11 +47,11 @@
public JacobModule() {
super("jacob-module", Version.unknownVersion());
- final ChannelProxySerializer cps = new ChannelProxySerializer();
- addSerializer(ChannelProxy.class, cps);
+ final ExecutionQueueImplSerializer cqis = new ExecutionQueueImplSerializer();
+ addSerializer(ChannelProxy.class, new ChannelProxySerializer(cqis));
addSerializer(Message.class, new MessageSerializer());
- addSerializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplSerializer(cps));
- addSerializer(ChannelRef.class, new ChannelRefSerializer());
+ addSerializer(JacksonExecutionQueueImpl.class, cqis);
+ addSerializer(ChannelRef.class, new ChannelRefSerializer(cqis));
addDeserializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplDeserializer());
addDeserializer(Message.class, new MessageDeserializer());
addDeserializer(Channel.class, new ChannelProxyDeserializer());
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index c5a9e10..8e4265b 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -399,7 +399,7 @@
throw new IllegalArgumentException("The object " + so + " is not new!");
}
- private void assignId(ExecutionQueueObject so, Object id) {
+ private void assignId(ExecutionQueueObject so, Integer id) {
so.setId(id);
}
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index b70e5ab..d0114e6 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Stack;
+import org.apache.ode.jacob.ChannelRef;
import org.apache.ode.jacob.JacobObject;
import org.apache.ode.jacob.JacobThread;
import org.apache.ode.jacob.Message;
@@ -294,7 +295,7 @@
return ret;
}
- public CommChannel newCommChannel(Class<?> channelType, String creator, String description) {
+ public ChannelRef newCommChannel(Class<?> channelType, String creator, String description) {
CommChannel chnl = new CommChannel(channelType);
chnl.setDescription(description);
_executionQueue.add(chnl);
@@ -302,7 +303,7 @@
LOG.trace(">> [{}] : new {}", _cycle, chnl);
_statistics.channelsCreated++;
- return chnl;
+ return new ChannelRef(chnl);
}
public String exportChannel(Channel channel) {
@@ -375,7 +376,8 @@
_executionQueue.add(grp);
}
- public void subscribe(boolean replicate, CommChannel channel, MessageListener listener) {
+ public void subscribe(boolean replicate, ChannelRef channel, MessageListener listener) {
+ assert channel.getType() == ChannelRef.Type.CHANNEL;
if (LOG.isTraceEnabled()) {
StringBuffer msg = new StringBuffer();
msg.append(_cycle);
@@ -389,13 +391,14 @@
_statistics.numContinuations++;
CommGroup grp = new CommGroup(replicate);
- CommRecv recv = new CommRecv(channel, listener);
+ CommRecv recv = new CommRecv(channel.getEndpoint(CommChannel.class), listener);
grp.add(recv);
_executionQueue.add(grp);
}
- public void subscribe(boolean replicate, CommChannel channel, MessageListener listeners[]) {
+ public void subscribe(boolean replicate, ChannelRef channel, MessageListener listeners[]) {
+ assert channel.getType() == ChannelRef.Type.CHANNEL;
if (LOG.isTraceEnabled()) {
StringBuffer msg = new StringBuffer();
msg.append(_cycle);
@@ -413,7 +416,7 @@
CommGroup grp = new CommGroup(replicate);
for (int i = 0; i < listeners.length; ++i) {
- CommRecv recv = new CommRecv(channel, listeners[i]);
+ CommRecv recv = new CommRecv(channel.getEndpoint(CommChannel.class), listeners[i]);
grp.add(recv);
}
_executionQueue.add(grp);
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 51a5a18..15587ac 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -221,9 +221,9 @@
protected void calculusHelloWorld() {
// new(out)
- final CommChannel out = newCommChannel(Val.class, "calculusHelloWorld-out");
+ final ChannelRef out = newCommChannel(Val.class, "calculusHelloWorld-out");
// new(x)
- final CommChannel x = newCommChannel(Val.class, "calculusHelloWorld-x");
+ final ChannelRef x = newCommChannel(Val.class, "calculusHelloWorld-x");
// *(?out(str).!sysout(str))
subscribe(true, out, new PrinterMessageListener());
@@ -244,16 +244,16 @@
}
static class ForwarderMessageListener implements MessageListener {
- private CommChannel to;
+ private ChannelRef to;
@JsonCreator
- public ForwarderMessageListener(@JsonProperty("to") CommChannel to) {
+ public ForwarderMessageListener(@JsonProperty("to") ChannelRef to) {
this.to = to;
}
@Override
public void onMessage(Message msg) {
- Message msg2 = new Message(new ChannelRef(to), null, msg.getAction());
+ Message msg2 = new Message(to, null, msg.getAction());
msg2.setBody(msg.getBody());
sendMessage(msg2);
}
@@ -261,16 +261,16 @@
static class StringEmitterRunnable extends JacobObject implements Runnable {
private String str;
- private CommChannel to;
+ private ChannelRef to;
@JsonCreator
- public StringEmitterRunnable(@JsonProperty("str") String str, @JsonProperty("to") CommChannel to) {
+ public StringEmitterRunnable(@JsonProperty("str") String str, @JsonProperty("to") ChannelRef to) {
this.str = str;
this.to = to;
}
public void run() {
- Message msg = new Message(new ChannelRef(to), null, "printHW");
+ Message msg = new Message(to, null, "printHW");
msg.setBody(str);
sendMessage(msg);
}