[JACOB-6] Add dsl support for synchronizing and sequencing processes
diff --git a/src/main/java/org/apache/ode/jacob/Jacob.java b/src/main/java/org/apache/ode/jacob/Jacob.java
index cc7570c..51967c0 100644
--- a/src/main/java/org/apache/ode/jacob/Jacob.java
+++ b/src/main/java/org/apache/ode/jacob/Jacob.java
@@ -41,7 +41,7 @@
      *

      * @param concretion the concretion of a process template

      */

-    public static void instance(Runnable concretion) {

+    public static void instance(RunnableProcess concretion) {

         JacobVPU.activeJacobThread().instance(concretion);

     }

 

diff --git a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
index b7e61a6..5b9fabd 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
@@ -18,10 +18,22 @@
  */
 package org.apache.ode.jacob.oo;
 
+import java.util.Arrays;
+
+import org.apache.ode.jacob.RunnableProcess;
 import org.apache.ode.jacob.vpu.JacobVPU;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ode.jacob.Jacob.instance;
+import static org.apache.ode.jacob.Jacob.newChannel;
+import static org.apache.ode.jacob.Jacob.object;
 
 
 public final class ProcessUtil {
+	// TODO: add more logging at TRACE level
+    private static final Logger LOG = LoggerFactory.getLogger(ProcessUtil.class);
+
     private ProcessUtil() {
         // Utility class
     }
@@ -49,4 +61,178 @@
     	return new ReceiveProcess().setChannel(proxy).setReceiver(listener);
     }
 
+	/**
+	 * 
+	 * @return	A noop RunnableProcess
+	 */
+	public static RunnableProcess nil() {
+		return new Nil();
+	}
+
+	/**
+	 * 
+	 * @param callback
+	 * @return a RunnableProcess that wraps a return notification in a separate process
+	 * 
+	 */
+	public static RunnableProcess terminator(final Synch callback) {
+		return callback != null ? new Terminator(callback) : null;
+	}
+
+	/**
+	 * 
+	 * @param callback
+	 * @param process
+	 * @return
+	 * 
+	 * Returns a synchronized process embedding the runnable process. Once the process finishes it 
+	 * will notify that on the callback return channel
+	 */
+    public static Synchronized sync(final Synch callback, final RunnableProcess process) {
+    	return new SynchronizedWrapper(callback, process);
+    }
+
+	/**
+	 * 
+	 * @param callback
+	 * @param process
+	 * @return
+	 * 
+	 * Intercepts the execution of a synchronized process and executes an interceptor before the 
+	 * termination of the process is actually signaled
+	 */
+	public static Synchronized intercept(final Synchronized process, final RunnableProcess interceptor) {
+		if (interceptor == null) {
+			return process;
+		}
+		Synch callback = newChannel(Synch.class, "");
+		object(receive(callback, new InterceptorSynch(process.getCallback(), interceptor)));
+		process.setCallback(callback);
+		return process;
+    }
+
+	/**
+	 * 
+	 * @param processes
+	 * @return a Synchronized process
+	 * 
+	 * Ensures the sequential execution of processes
+	 */
+	public static Synchronized sequence(final RunnableProcess... processes) {
+    	return sequence(null, processes);
+    }
+
+	/**
+	 * 
+	 * @param callback
+	 * @param processes
+	 * @return
+	 * 
+	 * Ensures the sequential execution of processes. After the execution is complete a 
+	 * notification is sent to the callback channel
+	 */
+    public static Synchronized sequence(final Synch callback, final RunnableProcess... processes) {
+    	return new SequenceProcess(callback, processes);
+    }
+
+    // Helpers Process composers
+
+    /**
+     * TODO: Document me
+     */
+    public static class Nil extends RunnableProcess {
+		private static final long serialVersionUID = 1L;
+		public void run() {
+			// do nothing
+		}
+    }
+
+    /**
+     * TODO: Document me
+     */
+    public static class Terminator extends RunnableProcess {
+		private static final long serialVersionUID = 1L;
+		protected Synch callback;
+		public Terminator(final Synch callback) {
+			this.callback = callback;
+		}
+		public Synch getCallback() {
+			return callback;
+		}
+		public void run() {
+			callback.ret();
+		}
+    }
+
+    public static abstract class Synchronized extends RunnableProcess {
+		private static final long serialVersionUID = 1L;
+		protected Synch callback;
+
+		public abstract void execute();
+
+		public Synchronized(final Synch callback) {
+			setCallback(callback);
+		}
+		public Synch getCallback() {
+			return callback;
+		}
+		public void setCallback(final Synch callback) {
+			this.callback = callback;
+		}
+		public void run() {
+			execute();
+			if (callback != null) {
+				callback.ret();
+			}
+		}
+    }
+
+    public static class SynchronizedWrapper extends Synchronized {
+		private static final long serialVersionUID = 1L;
+		protected final RunnableProcess process;
+		public SynchronizedWrapper(final Synch callback, final RunnableProcess process) {
+			super(callback);
+			this.process = process;
+		}
+		public void execute() {
+			process.run();
+		}
+    }
+
+    public static final class InterceptorSynch implements Synch {
+		private static final long serialVersionUID = 1L;
+		protected final RunnableProcess interceptor;
+		private final Synch target;
+		public InterceptorSynch(final Synch target, final RunnableProcess interceptor) {
+			this.target = target;
+			this.interceptor = interceptor;
+		}
+		public void ret() {
+			instance(sync(target, interceptor));
+		}
+    }
+
+    public static final class SequenceProcess extends Synchronized {
+		private static final long serialVersionUID = 1L;
+		private final RunnableProcess[] processes;
+
+		public SequenceProcess(final Synch callback, final RunnableProcess[] processes) {
+			super(callback);
+			this.processes = processes;
+		}
+
+		public void execute() {
+			// can only sequence synchronized processes
+			final Synchronized current = ensureSynchronized(processes[0]);
+			instance(intercept(current, processes.length > 1 ? 
+				sequence(this.callback, Arrays.copyOfRange(processes, 1, processes.length)) :
+				terminator(this.callback)));
+			this.callback = null;
+		}
+		
+		public Synchronized ensureSynchronized(RunnableProcess process) {
+			return process instanceof Synchronized ? (Synchronized)process : sync(null, process);
+		}
+    }
+
 }
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index c709a95..72e33ce 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -27,6 +27,7 @@
 import org.apache.ode.jacob.JacobThread;
 import org.apache.ode.jacob.Message;
 import org.apache.ode.jacob.MessageListener;
+import org.apache.ode.jacob.RunnableProcess;
 import org.apache.ode.jacob.oo.Channel;
 import org.apache.ode.jacob.oo.ChannelListener;
 import org.apache.ode.jacob.oo.ClassUtil;
@@ -175,7 +176,7 @@
      * the injected process. This method is equivalent to the parallel operator,
      * but is intended to be used from outside of an active {@link JacobThread}.
      */
-    public void inject(Runnable concretion) {
+    public void inject(RunnableProcess concretion) {
         LOG.debug("injecting {}", concretion);
         addReaction(concretion, ClassUtil.RUN_METHOD_ACTION, new Class[]{},
             (LOG.isInfoEnabled() ? concretion.toString() : null));
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 4dac54a..5165f8b 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -296,7 +296,7 @@
 		}
 
 		@Override
-		protected Runnable doStep(int step, Synch done) {
+		protected RunnableProcess doStep(int step, Synch done) {
 			return new SequenceItemEmitter(greetings[step], done, out);
         }
 
diff --git a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
index 8ea152c..c7b7e6c 100644
--- a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
+++ b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
@@ -68,7 +68,7 @@
      * @param done notification after step completion
      * @return runnable process
      */
-    protected abstract Runnable doStep(int step, Synch done);
+    protected abstract RunnableProcess doStep(int step, Synch done);
 
     public static class SequenceData {
         public int _steps;
diff --git a/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java b/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java
index c68c7f4..e11a517 100644
--- a/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java
+++ b/src/test/java/org/apache/ode/jacob/oo/JacobChannelsTest.java
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.ode.jacob.RunnableProcess;
 import org.apache.ode.jacob.soup.CommChannel;
 import org.apache.ode.jacob.vpu.ChannelFactory;
 import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
@@ -55,7 +56,7 @@
         vpu.setContext(new ExecutionQueueImpl());
 
         final List<String> result = new ArrayList<String>();
-        vpu.inject(new Runnable() {
+        vpu.inject(new RunnableProcess() {
             public void run() {
                 Val v = (Val)vpu.newChannel(Val.class, "");
                 object(receive(v, new Val() {
@@ -88,7 +89,7 @@
         vpu.setContext(new ExecutionQueueImpl());
 
         final List<String> result = new ArrayList<String>();
-        vpu.inject(new Runnable() {
+        vpu.inject(new RunnableProcess() {
             public void run() {
                 Val v = (Val)vpu.newChannel(Val.class, "");
                 object(ProcessUtil.compose(receive(v, new Val() {
diff --git a/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java b/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java
new file mode 100644
index 0000000..1bd938c
--- /dev/null
+++ b/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.oo;
+
+
+import org.apache.ode.jacob.Jacob;
+import org.apache.ode.jacob.RunnableProcess;
+import org.apache.ode.jacob.oo.ProcessUtil.Synchronized;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.ode.jacob.vpu.JacobVPU;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.ode.jacob.Jacob.instance;
+import static org.apache.ode.jacob.Jacob.object;
+import static org.apache.ode.jacob.oo.ProcessUtil.intercept;
+import static org.apache.ode.jacob.oo.ProcessUtil.receive;
+import static org.apache.ode.jacob.oo.ProcessUtil.sequence;
+import static org.apache.ode.jacob.oo.ProcessUtil.sync;
+
+
+public class SequentialProcessingTest {
+
+    @SuppressWarnings("serial")
+	@Test
+    public void testParallelProcesses() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+				instance(intercept(sync(null, atomicProcess("A", out)), atomicProcess("0", out)));
+				instance(intercept(sync(null, atomicProcess("B", out)), atomicProcess("1", out)));
+			}
+        });
+    	// parallelism is proven by process "B" being executed before "A0" and "A1"
+    	Assert.assertEquals("AB01", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+	@Test
+    public void testSynchronizeProcess() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+		        Synch c1 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c1, new Synch() {
+					public void ret() {
+						out.append("x");
+					}
+				}));
+				instance(sync(c1, atomicProcess("A", out)));
+			}
+        });
+    	// Return hook "x" is executed after process "A" 
+    	Assert.assertEquals("Ax", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+	@Test
+    public void testSynchronizeSynchronizedProcess() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+		        Synch c1 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c1, new Synch() {
+					public void ret() {
+						out.append("x");
+					}
+				}));
+		        Synch c2 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c2, new Synch() {
+					public void ret() {
+						out.append("y");
+					}
+				}));
+		        Synchronized process = synchronizedProcess(c1, "S", out);
+				instance(sync(c2, process));
+			}
+        });
+    	// Both return hooks "x" and "y" are executed after synchronized process "S" 
+    	Assert.assertEquals("Sxy", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+	@Test
+    public void testInterceptProcess() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+		        Synch c1 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c1, new Synch() {
+					public void ret() {
+						out.append("x");
+					}
+				}));
+				instance(intercept(sync(c1, atomicProcess("A", out)), atomicProcess("B", out)));
+			}
+        });
+    	// Return interceptor "B" is executed after process "A", but before the hook "x" 
+    	Assert.assertEquals("ABx", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+	@Test
+    public void testInterceptSynchronizedProcess() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+		        Synch c1 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c1, new Synch() {
+					public void ret() {
+						out.append("x");
+					}
+				}));
+		        Synch c2 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c2, new Synch() {
+					public void ret() {
+						out.append("y");
+					}
+				}));
+				instance(intercept(sync(c1, atomicProcess("A", out)), sync(c2, atomicProcess("B", out))));
+			}
+        });
+    	// Return interceptor "B" is executed after process "A", but before the hook "x" 
+    	Assert.assertEquals("AByx", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testSimpleSequence() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+				Jacob.instance(sequence(
+			        atomicProcess("A", out), 
+			        atomicProcess("B", out), 
+			        atomicProcess("C", out)));
+			}
+        });
+    	// TODO: explain 
+    	Assert.assertEquals("ABC", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testSynchronizedSequence() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+		        Synch c1 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c1, new Synch() {
+					public void ret() {
+						out.append("x");
+					}
+				}));
+				Jacob.instance(sequence(c1,
+			        atomicProcess("A", out), 
+			        atomicProcess("B", out), 
+			        atomicProcess("C", out), 
+			        atomicProcess("D", out)));
+			}
+        });
+    	// TODO: explain 
+    	Assert.assertEquals("ABCDx", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testTransitiveSequence() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+		        Synch c1 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c1, new Synch() {
+					public void ret() {
+						out.append("x");
+					}
+				}));
+				Jacob.instance(sequence(c1,
+			        atomicProcess("A", out), 
+			        sequence(atomicProcess("B", out), atomicProcess("C", out)), 
+			        atomicProcess("D", out)));
+			}
+        });
+    	// TODO: explain 
+    	Assert.assertEquals("ABCDx", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+    @Test
+    public void testSequenceComposition() {
+    	final StringBuffer out = new StringBuffer();
+    	executeProcess(new RunnableProcess() {
+			public void run() {
+		        Synch c1 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c1, new Synch() {
+					public void ret() {
+						out.append("x");
+					}
+				}));
+		        Synch c2 = Jacob.newChannel(Synch.class, "");
+		        object(receive(c2, new Synch() {
+					public void ret() {
+						out.append("y");
+					}
+				}));
+		        // just test a more complex scenario once
+				Jacob.instance(sequence(c1,
+				    sequence(
+				        sequence(
+				        	atomicProcess("A", out),
+				        	atomicProcess("B", out),
+				        	atomicProcess("C", out)),
+				        atomicProcess("D", out)), 
+				    atomicProcess("E", out), 
+			        sequence(c2,
+			        	atomicProcess("F", out), 
+		        	    sequence(
+		        	    	atomicProcess("G", out),
+        	                atomicProcess("H", out)))));
+			}
+        });
+    	// TODO: explain 
+    	Assert.assertEquals("ABCDEFGHxy", out.toString());
+    }
+
+    @SuppressWarnings("serial")
+	protected RunnableProcess atomicProcess(final String id, final StringBuffer out) {
+    	return new RunnableProcess() {
+			public void run() {
+				out.append(id);
+			}
+    	};
+    }
+
+    @SuppressWarnings("serial")
+	protected Synchronized synchronizedProcess(final Synch callback, final String id, final StringBuffer out) {
+    	return new Synchronized(callback) {
+			public void execute() {
+				out.append(id);
+			}
+    	};
+    }
+    
+    protected void executeProcess(final RunnableProcess process) {
+        final JacobVPU vpu = new JacobVPU();
+        vpu.setContext(new ExecutionQueueImpl());
+        vpu.inject(process);
+        while (vpu.execute()) {
+            // keep doing it...
+        }
+    }
+
+}
diff --git a/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java b/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java
index b50e45b..50ccad9 100644
--- a/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java
+++ b/src/test/java/org/apache/ode/jacob/vpu/ChannelRefTest.java
@@ -1,69 +1,71 @@
-/*

- * Licensed to the Apache Software Foundation (ASF) under one

- * or more contributor license agreements.  See the NOTICE file

- * distributed with this work for additional information

- * regarding copyright ownership.  The ASF licenses this file

- * to you under the Apache License, Version 2.0 (the

- * "License"); you may not use this file except in compliance

- * with the License.  You may obtain a copy of the License at

- *

- *    http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing,

- * software distributed under the License is distributed on an

- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

- * KIND, either express or implied.  See the License for the

- * specific language governing permissions and limitations

- * under the License.

- */

-package org.apache.ode.jacob.vpu;

-

-import static org.apache.ode.jacob.Jacob.newCommChannel;

-import static org.junit.Assert.*;

-import org.apache.ode.jacob.ChannelRef;

-import org.apache.ode.jacob.oo.Synch;

-import org.apache.ode.jacob.oo.Val;

-import org.apache.ode.jacob.soup.CommChannel;

-

-import org.junit.Test;

-

-public class ChannelRefTest {

-

-    @Test

-    public void testConnectWithInterface() {

-        JacobVPU vpu = new JacobVPU();

-        vpu.setContext(new ExecutionQueueImpl());

-        

-        vpu.inject(new Runnable() {

-            @Override

-            public void run() {

-                ChannelRef cref = newCommChannel("unbound channel");

-                CommChannel cchannel = cref.getEndpoint(CommChannel.class);

-                assertNotNull(cchannel);

-                assertNull(cchannel.getType());

-                

-                // now connect it to Val.class

-                Val val = cref.getEndpoint(Val.class);

-                assertNotNull(val);

-                assertEquals(Val.class, cchannel.getType());

-                

-                // now try to associate it with a different channel interface

-                try {

-                    cref.getEndpoint(Synch.class);

-                    fail("we should get an IllegalStateException");

-                } catch (IllegalStateException e) {

-                    assertEquals("ChannelRef is already associated with a channel of a different type", e.getMessage());

-                }

-                

-                // now try to associate with the same channel

-                Val val2 = cref.getEndpoint(Val.class);

-                assertNotNull(val2);

-                assertSame(val, val2);

-            }

-        });

-        

-        assertEquals(true, vpu.getContext().hasReactions());

-        vpu.execute();

-        assertEquals(false, vpu.getContext().hasReactions());

-    }

-}

+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.vpu;
+
+import static org.apache.ode.jacob.Jacob.newCommChannel;
+import static org.junit.Assert.*;
+
+import org.apache.ode.jacob.ChannelRef;
+import org.apache.ode.jacob.RunnableProcess;
+import org.apache.ode.jacob.oo.Synch;
+import org.apache.ode.jacob.oo.Val;
+import org.apache.ode.jacob.soup.CommChannel;
+import org.junit.Test;
+
+public class ChannelRefTest {
+
+    @Test
+    public void testConnectWithInterface() {
+        JacobVPU vpu = new JacobVPU();
+        vpu.setContext(new ExecutionQueueImpl());
+
+        vpu.inject(new RunnableProcess() {
+            @Override
+            public void run() {
+                ChannelRef cref = newCommChannel("unbound channel");
+                CommChannel cchannel = cref.getEndpoint(CommChannel.class);
+                assertNotNull(cchannel);
+                assertNull(cchannel.getType());
+
+                // now connect it to Val.class
+                Val val = cref.getEndpoint(Val.class);
+                assertNotNull(val);
+                assertEquals(Val.class, cchannel.getType());
+
+                // now try to associate it with a different channel interface
+                try {
+                    cref.getEndpoint(Synch.class);
+                    fail("we should get an IllegalStateException");
+                } catch (IllegalStateException e) {
+                    assertEquals("ChannelRef is already associated with a channel of a different type", e.getMessage());
+                }
+
+                // now try to associate with the same channel
+                Val val2 = cref.getEndpoint(Val.class);
+                assertNotNull(val2);
+                assertSame(val, val2);
+            }
+        });
+
+        assertEquals(true, vpu.getContext().hasReactions());
+        vpu.execute();
+        assertEquals(false, vpu.getContext().hasReactions());
+    }
+
+}
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
index 91bd688..c1d9d01 100644
--- a/src/test/resources/log4j.properties
+++ b/src/test/resources/log4j.properties
@@ -16,10 +16,11 @@
 #
 
 # Set root logger level to WARN and its only appender to CONSOLE
-log4j.rootLogger=TRACE, file
+log4j.rootLogger=INFO, file
 
 # log4j properties to work with command line tools.
-log4j.category.org.apache.ode=TRACE
+log4j.category.org.apache.ode=DEBUG
+#log4j.category.org.apache.ode.jacob.oo=TRACE
 
 # Console appender
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender