[cosmetic] Remove annoying tabs from code
diff --git a/src/main/java/org/apache/ode/jacob/Expression.java b/src/main/java/org/apache/ode/jacob/Expression.java
index e6d6a7e..917c09b 100644
--- a/src/main/java/org/apache/ode/jacob/Expression.java
+++ b/src/main/java/org/apache/ode/jacob/Expression.java
@@ -25,5 +25,5 @@
*/
public interface Expression {
- <T> T evaluate(Message message, Class<T> type);
+ <T> T evaluate(Message message, Class<T> type);
}
diff --git a/src/main/java/org/apache/ode/jacob/Message.java b/src/main/java/org/apache/ode/jacob/Message.java
index ea3e901..bb1d791 100644
--- a/src/main/java/org/apache/ode/jacob/Message.java
+++ b/src/main/java/org/apache/ode/jacob/Message.java
@@ -35,73 +35,73 @@
private static final AtomicLong idGen = new AtomicLong();
private long id;
- private ChannelRef to;
+ private ChannelRef to;
private ChannelRef replyTo;
private String action;
private Map<String, Object> headers;
- private Object body;
+ private Object body;
- public Message() {
- id = idGen.incrementAndGet();
- // TODO: always allocating headers may not be a good idea
- // checking for non-null headers in the getters below is
- // not great either; should look into a better option later
- // after finishing pi-calculus refactoring and running some
- // perf tests
- headers = new ConcurrentHashMap<String, Object>();
- }
+ public Message() {
+ id = idGen.incrementAndGet();
+ // TODO: always allocating headers may not be a good idea
+ // checking for non-null headers in the getters below is
+ // not great either; should look into a better option later
+ // after finishing pi-calculus refactoring and running some
+ // perf tests
+ headers = new ConcurrentHashMap<String, Object>();
+ }
- public Message(ChannelRef to, ChannelRef replyTo, String action) {
- this();
- this.to = to;
- this.replyTo = replyTo;
- this.action = action;
- }
+ public Message(ChannelRef to, ChannelRef replyTo, String action) {
+ this();
+ this.to = to;
+ this.replyTo = replyTo;
+ this.action = action;
+ }
- // TODO: add any other convenience methods like addHeader, removeHeader?
- public long getId() {
- return id;
- }
- public void setId(long id) {
- this.id = id;
- }
- public ChannelRef getTo() {
- return to;
- }
- public void setTo(ChannelRef to) {
- this.to = to;
- }
- public ChannelRef getReplyTo() {
- return replyTo;
- }
- public void setReplyTo(ChannelRef replyTo) {
- this.replyTo = replyTo;
- }
- public String getAction() {
- return action;
- }
- public void setAction(String action) {
- this.action = action;
- }
+ // TODO: add any other convenience methods like addHeader, removeHeader?
+ public long getId() {
+ return id;
+ }
+ public void setId(long id) {
+ this.id = id;
+ }
+ public ChannelRef getTo() {
+ return to;
+ }
+ public void setTo(ChannelRef to) {
+ this.to = to;
+ }
+ public ChannelRef getReplyTo() {
+ return replyTo;
+ }
+ public void setReplyTo(ChannelRef replyTo) {
+ this.replyTo = replyTo;
+ }
+ public String getAction() {
+ return action;
+ }
+ public void setAction(String action) {
+ this.action = action;
+ }
- public Map<String, Object> getHeaders() {
- return headers;
- }
- public void setHeaders(Map<String, Object> headers) {
- this.headers = headers;
- }
- public Object getBody() {
- return body;
- }
- public void setBody(Object body) {
- this.body = body;
- }
+ public Map<String, Object> getHeaders() {
+ return headers;
+ }
+ public void setHeaders(Map<String, Object> headers) {
+ this.headers = headers;
+ }
+ public Object getBody() {
+ return body;
+ }
+ public void setBody(Object body) {
+ this.body = body;
+ }
- public boolean containsHeader(String header) {
- return headers.containsKey(header);
- }
-
-
+ public boolean containsHeader(String header) {
+ return headers.containsKey(header);
+ }
+
+
public String toString() {
return "Message [id=" + id + ", "
+ (to != null ? "to=" + to + ", " : "")
@@ -132,16 +132,16 @@
}
public static Message copyFrom(Message message) {
- Message result = new Message();
-
- result.setId(idGen.incrementAndGet());
-
- result.setAction(message.getAction());
- result.setBody(message.getBody());
- result.setHeaders(message.getHeaders());
- result.setTo(message.getTo());
- result.setReplyTo(message.getReplyTo());
-
- return result;
- }
+ Message result = new Message();
+
+ result.setId(idGen.incrementAndGet());
+
+ result.setAction(message.getAction());
+ result.setBody(message.getBody());
+ result.setHeaders(message.getHeaders());
+ result.setTo(message.getTo());
+ result.setReplyTo(message.getReplyTo());
+
+ return result;
+ }
}
diff --git a/src/main/java/org/apache/ode/jacob/MessageListener.java b/src/main/java/org/apache/ode/jacob/MessageListener.java
index 0e2073a..d7818ed 100644
--- a/src/main/java/org/apache/ode/jacob/MessageListener.java
+++ b/src/main/java/org/apache/ode/jacob/MessageListener.java
@@ -26,5 +26,5 @@
*/
public interface MessageListener extends Process, Serializable {
- void onMessage(Message msg);
+ void onMessage(Message msg);
}
diff --git a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
index 3cd6916..1a9813e 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ChannelListener.java
@@ -33,20 +33,20 @@
@SuppressWarnings("serial")
public abstract class ChannelListener implements MessageListener {
- public void onMessage(Message msg) {
- Method action = ClassUtil.findActionMethod(getImplementedMethods()).evaluate(msg, Method.class);
- try {
- if (action != null && this instanceof ReceiveProcess) {
- action.invoke(((ReceiveProcess)this).getReceiver(), (Object[])msg.getBody());
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public Set<Method> getImplementedMethods() {
- return null;
- }
+ public void onMessage(Message msg) {
+ Method action = ClassUtil.findActionMethod(getImplementedMethods()).evaluate(msg, Method.class);
+ try {
+ if (action != null && this instanceof ReceiveProcess) {
+ action.invoke(((ReceiveProcess)this).getReceiver(), (Object[])msg.getBody());
+ }
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public Set<Method> getImplementedMethods() {
+ return null;
+ }
}
diff --git a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
index 24d6701..cd4954c 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ClassUtil.java
@@ -53,11 +53,11 @@
}
public static Set<Method> runMethodSet() {
- return RUN_METHOD_SET;
+ return RUN_METHOD_SET;
}
public static Message createMessage(Runnable target, String action, Object[] args, Channel replyTo) {
- Message message = new Message();
+ Message message = new Message();
message.setTo(new ChannelRef(target));
message.setReplyTo(replyTo == null ? null : new ChannelRef(replyTo));
message.setAction(action);
@@ -75,32 +75,32 @@
}
public static String getActionForMethod(Method channelMethod) {
- if (channelMethod == null) {
- return null;
- }
- MessageHandler handler = channelMethod.getAnnotation(MessageHandler.class);
- if (handler != null) {
- return handler.value();
- }
- Class<?> clazz = channelMethod.getDeclaringClass();
- if (Runnable.class.isAssignableFrom(clazz)
- && channelMethod.getName() == "run"
- && channelMethod.getParameterTypes().length == 0) {
- return RUN_METHOD_ACTION;
- }
- if (!Channel.class.isAssignableFrom(clazz)) {
- LOG.trace("Action '{}' can only be defined for a Channel extension", channelMethod.getName());
- return null;
- }
- LOG.trace("Probing Channel class '{}' for declaration of method '{}'", clazz.getName(), channelMethod.getName());
- for (Class<?> c : clazz.getInterfaces()) {
- String action = getChannelMethodAction(c, channelMethod);
- if (action != null) {
- return action;
- }
- }
- // ... if clazz is a Channel interface itself
- return getChannelMethodAction(clazz, channelMethod);
+ if (channelMethod == null) {
+ return null;
+ }
+ MessageHandler handler = channelMethod.getAnnotation(MessageHandler.class);
+ if (handler != null) {
+ return handler.value();
+ }
+ Class<?> clazz = channelMethod.getDeclaringClass();
+ if (Runnable.class.isAssignableFrom(clazz)
+ && channelMethod.getName() == "run"
+ && channelMethod.getParameterTypes().length == 0) {
+ return RUN_METHOD_ACTION;
+ }
+ if (!Channel.class.isAssignableFrom(clazz)) {
+ LOG.trace("Action '{}' can only be defined for a Channel extension", channelMethod.getName());
+ return null;
+ }
+ LOG.trace("Probing Channel class '{}' for declaration of method '{}'", clazz.getName(), channelMethod.getName());
+ for (Class<?> c : clazz.getInterfaces()) {
+ String action = getChannelMethodAction(c, channelMethod);
+ if (action != null) {
+ return action;
+ }
+ }
+ // ... if clazz is a Channel interface itself
+ return getChannelMethodAction(clazz, channelMethod);
}
/**
@@ -112,32 +112,32 @@
* following a convention similar to javadoc.
*/
private static String getChannelMethodAction(Class<?> clazz, Method method) {
- if (Channel.class.isAssignableFrom(clazz)) {
- try {
- Method m = clazz.getMethod(method.getName(), method.getParameterTypes());
- return m.getDeclaringClass().getName() + "#" + m.getName();
- } catch (SecurityException e) { // ignore
- } catch (NoSuchMethodException e) { // ignore
- }
- }
- return null;
+ if (Channel.class.isAssignableFrom(clazz)) {
+ try {
+ Method m = clazz.getMethod(method.getName(), method.getParameterTypes());
+ return m.getDeclaringClass().getName() + "#" + m.getName();
+ } catch (SecurityException e) { // ignore
+ } catch (NoSuchMethodException e) { // ignore
+ }
+ }
+ return null;
}
public static Expression findActionMethod(final Set<Method> implementedMethods) {
- return new Expression() {
- @SuppressWarnings("unchecked")
- public <T> T evaluate(Message message, Class<T> type) {
- String action = message.getAction();
- if (Method.class.equals(type) && action != null) {
- for (Method m : implementedMethods) {
- if (action.equals(ClassUtil.getActionForMethod(m))) {
- return (T)m;
- }
- }
- }
- return null;
- }
- };
+ return new Expression() {
+ @SuppressWarnings("unchecked")
+ public <T> T evaluate(Message message, Class<T> type) {
+ String action = message.getAction();
+ if (Method.class.equals(type) && action != null) {
+ for (Method m : implementedMethods) {
+ if (action.equals(ClassUtil.getActionForMethod(m))) {
+ return (T)m;
+ }
+ }
+ }
+ return null;
+ }
+ };
}
diff --git a/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
index eccec5e..525c701 100644
--- a/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
+++ b/src/main/java/org/apache/ode/jacob/oo/MessageHandler.java
@@ -36,5 +36,5 @@
@Documented
@Target({ElementType.METHOD})
public @interface MessageHandler {
- String value();
+ String value();
}
diff --git a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
index 5b9fabd..a79ed6a 100644
--- a/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
+++ b/src/main/java/org/apache/ode/jacob/oo/ProcessUtil.java
@@ -31,7 +31,7 @@
public final class ProcessUtil {
- // TODO: add more logging at TRACE level
+ // TODO: add more logging at TRACE level
private static final Logger LOG = LoggerFactory.getLogger(ProcessUtil.class);
private ProcessUtil() {
@@ -39,14 +39,14 @@
}
public static String exportChannel(Channel channel) {
- if (channel != null && channel instanceof ChannelProxy) {
+ if (channel != null && channel instanceof ChannelProxy) {
// TODO: replace the brute force call on the activeThread with
// something that doesn't expose the implementation once the
// cleaner refactored api becomes available
return JacobVPU.activeJacobThread().exportChannel(channel);
- }
+ }
throw new IllegalArgumentException("Invalid proxy type: "
- + channel == null ? "<null>" : channel.getClass().toString());
+ + channel == null ? "<null>" : channel.getClass().toString());
}
public static CompositeProcess compose(ChannelListener process) {
@@ -54,85 +54,85 @@
return result.or(process);
}
- public static <T extends Channel> ChannelListener receive(T proxy, T listener) {
- // TODO: NOTE: this *only* works when the listener doesn't need to be Serializable really
- // because we cannot declare a static serialVersionUID like this
- // once we fix serialization, this can be simplified significantly via a dsl
- return new ReceiveProcess().setChannel(proxy).setReceiver(listener);
+ public static <T extends Channel> ChannelListener receive(T proxy, T listener) {
+ // TODO: NOTE: this *only* works when the listener doesn't need to be Serializable really
+ // because we cannot declare a static serialVersionUID like this
+ // once we fix serialization, this can be simplified significantly via a dsl
+ return new ReceiveProcess().setChannel(proxy).setReceiver(listener);
}
- /**
- *
- * @return A noop RunnableProcess
- */
- public static RunnableProcess nil() {
- return new Nil();
- }
+ /**
+ *
+ * @return A noop RunnableProcess
+ */
+ public static RunnableProcess nil() {
+ return new Nil();
+ }
- /**
- *
- * @param callback
- * @return a RunnableProcess that wraps a return notification in a separate process
- *
- */
- public static RunnableProcess terminator(final Synch callback) {
- return callback != null ? new Terminator(callback) : null;
- }
+ /**
+ *
+ * @param callback
+ * @return a RunnableProcess that wraps a return notification in a separate process
+ *
+ */
+ public static RunnableProcess terminator(final Synch callback) {
+ return callback != null ? new Terminator(callback) : null;
+ }
- /**
- *
- * @param callback
- * @param process
- * @return
- *
- * Returns a synchronized process embedding the runnable process. Once the process finishes it
- * will notify that on the callback return channel
- */
+ /**
+ *
+ * @param callback
+ * @param process
+ * @return
+ *
+ * Returns a synchronized process embedding the runnable process. Once the process finishes it
+ * will notify that on the callback return channel
+ */
public static Synchronized sync(final Synch callback, final RunnableProcess process) {
- return new SynchronizedWrapper(callback, process);
+ return new SynchronizedWrapper(callback, process);
}
- /**
- *
- * @param callback
- * @param process
- * @return
- *
- * Intercepts the execution of a synchronized process and executes an interceptor before the
- * termination of the process is actually signaled
- */
- public static Synchronized intercept(final Synchronized process, final RunnableProcess interceptor) {
- if (interceptor == null) {
- return process;
- }
- Synch callback = newChannel(Synch.class, "");
- object(receive(callback, new InterceptorSynch(process.getCallback(), interceptor)));
- process.setCallback(callback);
- return process;
+ /**
+ *
+ * @param callback
+ * @param process
+ * @return
+ *
+ * Intercepts the execution of a synchronized process and executes an interceptor before the
+ * termination of the process is actually signaled
+ */
+ public static Synchronized intercept(final Synchronized process, final RunnableProcess interceptor) {
+ if (interceptor == null) {
+ return process;
+ }
+ Synch callback = newChannel(Synch.class, "");
+ object(receive(callback, new InterceptorSynch(process.getCallback(), interceptor)));
+ process.setCallback(callback);
+ return process;
}
- /**
- *
- * @param processes
- * @return a Synchronized process
- *
- * Ensures the sequential execution of processes
- */
- public static Synchronized sequence(final RunnableProcess... processes) {
- return sequence(null, processes);
+ /**
+ *
+ * @param processes
+ * @return a Synchronized process
+ *
+ * Ensures the sequential execution of processes
+ */
+ public static Synchronized sequence(final RunnableProcess... processes) {
+ return sequence(null, processes);
}
- /**
- *
- * @param callback
- * @param processes
- * @return
- *
- * Ensures the sequential execution of processes. After the execution is complete a
- * notification is sent to the callback channel
- */
+ /**
+ *
+ * @param callback
+ * @param processes
+ * @return
+ *
+ * Ensures the sequential execution of processes. After the execution is complete a
+ * notification is sent to the callback channel
+ */
public static Synchronized sequence(final Synch callback, final RunnableProcess... processes) {
- return new SequenceProcess(callback, processes);
+ return new SequenceProcess(callback, processes);
}
// Helpers Process composers
@@ -141,98 +141,98 @@
* TODO: Document me
*/
public static class Nil extends RunnableProcess {
- private static final long serialVersionUID = 1L;
- public void run() {
- // do nothing
- }
+ private static final long serialVersionUID = 1L;
+ public void run() {
+ // do nothing
+ }
}
/**
* TODO: Document me
*/
public static class Terminator extends RunnableProcess {
- private static final long serialVersionUID = 1L;
- protected Synch callback;
- public Terminator(final Synch callback) {
- this.callback = callback;
- }
- public Synch getCallback() {
- return callback;
- }
- public void run() {
- callback.ret();
- }
+ private static final long serialVersionUID = 1L;
+ protected Synch callback;
+ public Terminator(final Synch callback) {
+ this.callback = callback;
+ }
+ public Synch getCallback() {
+ return callback;
+ }
+ public void run() {
+ callback.ret();
+ }
}
public static abstract class Synchronized extends RunnableProcess {
- private static final long serialVersionUID = 1L;
- protected Synch callback;
+ private static final long serialVersionUID = 1L;
+ protected Synch callback;
- public abstract void execute();
+ public abstract void execute();
- public Synchronized(final Synch callback) {
- setCallback(callback);
- }
- public Synch getCallback() {
- return callback;
- }
- public void setCallback(final Synch callback) {
- this.callback = callback;
- }
- public void run() {
- execute();
- if (callback != null) {
- callback.ret();
- }
- }
+ public Synchronized(final Synch callback) {
+ setCallback(callback);
+ }
+ public Synch getCallback() {
+ return callback;
+ }
+ public void setCallback(final Synch callback) {
+ this.callback = callback;
+ }
+ public void run() {
+ execute();
+ if (callback != null) {
+ callback.ret();
+ }
+ }
}
public static class SynchronizedWrapper extends Synchronized {
- private static final long serialVersionUID = 1L;
- protected final RunnableProcess process;
- public SynchronizedWrapper(final Synch callback, final RunnableProcess process) {
- super(callback);
- this.process = process;
- }
- public void execute() {
- process.run();
- }
+ private static final long serialVersionUID = 1L;
+ protected final RunnableProcess process;
+ public SynchronizedWrapper(final Synch callback, final RunnableProcess process) {
+ super(callback);
+ this.process = process;
+ }
+ public void execute() {
+ process.run();
+ }
}
public static final class InterceptorSynch implements Synch {
- private static final long serialVersionUID = 1L;
- protected final RunnableProcess interceptor;
- private final Synch target;
- public InterceptorSynch(final Synch target, final RunnableProcess interceptor) {
- this.target = target;
- this.interceptor = interceptor;
- }
- public void ret() {
- instance(sync(target, interceptor));
- }
+ private static final long serialVersionUID = 1L;
+ protected final RunnableProcess interceptor;
+ private final Synch target;
+ public InterceptorSynch(final Synch target, final RunnableProcess interceptor) {
+ this.target = target;
+ this.interceptor = interceptor;
+ }
+ public void ret() {
+ instance(sync(target, interceptor));
+ }
}
public static final class SequenceProcess extends Synchronized {
- private static final long serialVersionUID = 1L;
- private final RunnableProcess[] processes;
+ private static final long serialVersionUID = 1L;
+ private final RunnableProcess[] processes;
- public SequenceProcess(final Synch callback, final RunnableProcess[] processes) {
- super(callback);
- this.processes = processes;
- }
+ public SequenceProcess(final Synch callback, final RunnableProcess[] processes) {
+ super(callback);
+ this.processes = processes;
+ }
- public void execute() {
- // can only sequence synchronized processes
- final Synchronized current = ensureSynchronized(processes[0]);
- instance(intercept(current, processes.length > 1 ?
- sequence(this.callback, Arrays.copyOfRange(processes, 1, processes.length)) :
- terminator(this.callback)));
- this.callback = null;
- }
-
- public Synchronized ensureSynchronized(RunnableProcess process) {
- return process instanceof Synchronized ? (Synchronized)process : sync(null, process);
- }
+ public void execute() {
+ // can only sequence synchronized processes
+ final Synchronized current = ensureSynchronized(processes[0]);
+ instance(intercept(current, processes.length > 1 ?
+ sequence(this.callback, Arrays.copyOfRange(processes, 1, processes.length)) :
+ terminator(this.callback)));
+ this.callback = null;
+ }
+
+ public Synchronized ensureSynchronized(RunnableProcess process) {
+ return process instanceof Synchronized ? (Synchronized)process : sync(null, process);
+ }
}
}
diff --git a/src/main/java/org/apache/ode/jacob/oo/Synch.java b/src/main/java/org/apache/ode/jacob/oo/Synch.java
index 80160b2..4025353 100644
--- a/src/main/java/org/apache/ode/jacob/oo/Synch.java
+++ b/src/main/java/org/apache/ode/jacob/oo/Synch.java
@@ -28,6 +28,6 @@
public interface Synch extends Channel {
- public void ret();
+ public void ret();
}
diff --git a/src/main/java/org/apache/ode/jacob/oo/Val.java b/src/main/java/org/apache/ode/jacob/oo/Val.java
index edc9b8a..681378f 100644
--- a/src/main/java/org/apache/ode/jacob/oo/Val.java
+++ b/src/main/java/org/apache/ode/jacob/oo/Val.java
@@ -24,6 +24,6 @@
*/
public interface Val extends Channel {
- public void val(Object retVal);
+ public void val(Object retVal);
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefDeserializer.java
index 1f21d59..38bb6e9 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefDeserializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelRefDeserializer.java
@@ -36,9 +36,9 @@
*/
public class ChannelRefDeserializer extends StdDeserializer<ChannelRef> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- protected ChannelRefDeserializer() {
+ protected ChannelRefDeserializer() {
super(ChannelRef.class);
}
@@ -67,4 +67,4 @@
return new ChannelRef(target);
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
index 3a5c71d..7a35fd2 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
@@ -54,10 +54,10 @@
private static final Logger LOG = LoggerFactory.getLogger(JacksonExecutionQueueImpl.class);
- public JacksonExecutionQueueImpl() {
- super(null);
- }
-
+ public JacksonExecutionQueueImpl() {
+ super(null);
+ }
+
public static class ExecutionQueueImplSerializer extends StdSerializer<JacksonExecutionQueueImpl> {
private Set<Integer> usedChannels = new LinkedHashSet<Integer>();
@@ -94,7 +94,7 @@
usedChannels.clear();
- // write metadata
+ // write metadata
jgen.writeNumberField("objIdCounter", value._objIdCounter);
jgen.writeNumberField("currentCycle", value._currentCycle);
@@ -136,9 +136,9 @@
public static class ExecutionQueueImplDeserializer extends StdDeserializer<JacksonExecutionQueueImpl> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public ExecutionQueueImplDeserializer() {
+ public ExecutionQueueImplDeserializer() {
super(JacksonExecutionQueueImpl.class);
}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
index 94b53cb..16c82b7 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/MessageDeserializer.java
@@ -40,10 +40,10 @@
*/
public class MessageDeserializer extends StdDeserializer<Message> {
- private static final long serialVersionUID = 1L;
- private TypeReference<HashMap<String,Object>> mapTypeRef = new TypeReference<HashMap<String, Object>>() {};
+ private static final long serialVersionUID = 1L;
+ private TypeReference<HashMap<String,Object>> mapTypeRef = new TypeReference<HashMap<String, Object>>() {};
- protected MessageDeserializer() {
+ protected MessageDeserializer() {
super(Message.class);
}
@@ -96,4 +96,4 @@
return msg;
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index 72e33ce..0c3bdea 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -301,7 +301,7 @@
}
public Channel newChannel(Class<?> channelType, String description) {
- return JacobVPU.this.newChannel(channelType, description);
+ return JacobVPU.this.newChannel(channelType, description);
}
public ChannelRef newCommChannel(String description) {
@@ -422,7 +422,7 @@
LOG.trace(">> [{}] : {}", _cycle, _source);
stackThread();
- CommChannel replyTo = message.getReplyTo() != null ? message.getReplyTo().getEndpoint(CommChannel.class) : null;
+ CommChannel replyTo = message.getReplyTo() != null ? message.getReplyTo().getEndpoint(CommChannel.class) : null;
long ctime = System.currentTimeMillis();
try {
@@ -447,7 +447,7 @@
//XXX: All replys have the same Synch.ret() action
sendMessage(ClassUtil.createMessage(replyTo, ClassUtil.SYNCH_RET_METHOD_ACTION, null, null));
}
- } finally {
+ } finally {
ctime = System.currentTimeMillis() - ctime;
_statistics.totalClientTimeMs += ctime;
unstackThread();
diff --git a/src/test/java/org/apache/ode/jacob/examples/cell/CELL_.java b/src/test/java/org/apache/ode/jacob/examples/cell/CELL_.java
index 5c4c7ce..5358263 100644
--- a/src/test/java/org/apache/ode/jacob/examples/cell/CELL_.java
+++ b/src/test/java/org/apache/ode/jacob/examples/cell/CELL_.java
@@ -44,7 +44,7 @@
}
@SuppressWarnings("serial")
- public void run() {
+ public void run() {
// INSTANTIATION{Cell(run,val)}
// ==> run ? [ read(r)={...} & write(newVal)={...} ]
object(new ReceiveProcess().setChannel(_self).setReceiver(new Cell() {
diff --git a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
index d34524e..0bffb85 100644
--- a/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
+++ b/src/test/java/org/apache/ode/jacob/examples/cell/Cell.java
@@ -31,12 +31,12 @@
* Read the value of the cell.
* @param replyTo channel to which the value of the cell is sent
*/
- public void read(Val replyTo);
+ public void read(Val replyTo);
/**
* Write the value of the cell.
* @param newVal new value of the cell
*/
- public void write(Object newVal);
+ public void write(Object newVal);
}
diff --git a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
index b894c89..974d4e1 100644
--- a/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
+++ b/src/test/java/org/apache/ode/jacob/examples/eratosthenes/NaturalNumberStream.java
@@ -32,6 +32,6 @@
*/
public interface NaturalNumberStream extends Channel {
- public void val(int n, Synch ret);
+ public void val(int n, Synch ret);
}
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 5165f8b..6810efe 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -285,39 +285,39 @@
static class HWSequence extends Sequence {
- private final String[] greetings;
- private final Val out;
+ private final String[] greetings;
+ private final Val out;
- @JsonCreator
- public HWSequence(@JsonProperty("greetings") String[] greetings, @JsonProperty("out") Val out, @JsonProperty("done") Synch done) {
- super(greetings.length, done);
- this.greetings = greetings;
- this.out = out;
- }
-
- @Override
- protected RunnableProcess doStep(int step, Synch done) {
- return new SequenceItemEmitter(greetings[step], done, out);
+ @JsonCreator
+ public HWSequence(@JsonProperty("greetings") String[] greetings, @JsonProperty("out") Val out, @JsonProperty("done") Synch done) {
+ super(greetings.length, done);
+ this.greetings = greetings;
+ this.out = out;
}
- static class SequenceItemEmitter extends RunnableProcess {
- private final String string;
- private final Synch done;
- private final Val out;
+ @Override
+ protected RunnableProcess doStep(int step, Synch done) {
+ return new SequenceItemEmitter(greetings[step], done, out);
+ }
- @JsonCreator
- public SequenceItemEmitter(@JsonProperty("string") String string, @JsonProperty("done") Synch done, @JsonProperty("out") Val out) {
- this.string = string;
- this.done = done;
- this.out = out;
- }
+ static class SequenceItemEmitter extends RunnableProcess {
+ private final String string;
+ private final Synch done;
+ private final Val out;
- @Override
- public void run() {
- instance(new StringEmitterProcess(string, out));
- done.ret();
- }
- }
+ @JsonCreator
+ public SequenceItemEmitter(@JsonProperty("string") String string, @JsonProperty("done") Synch done, @JsonProperty("out") Val out) {
+ this.string = string;
+ this.done = done;
+ this.out = out;
+ }
+
+ @Override
+ public void run() {
+ instance(new StringEmitterProcess(string, out));
+ done.ret();
+ }
+ }
}
@@ -329,8 +329,8 @@
calculusHelloWorld();
}
- @SuppressWarnings("unchecked")
- public static void main(String args[]) throws Exception {
+ @SuppressWarnings("unchecked")
+ public static void main(String args[]) throws Exception {
// enable logging
// BasicConfigurator.configure();
List<Logger> loggers = Collections.<Logger>list(LogManager.getCurrentLoggers());
diff --git a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
index 140daee..08b09d5 100644
--- a/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
+++ b/src/test/java/org/apache/ode/jacob/examples/synch/SynchPrint.java
@@ -31,6 +31,6 @@
*/
public interface SynchPrint extends Channel {
- public Synch print(String msg);
+ public Synch print(String msg);
}
diff --git a/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java b/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java
index daf056e..b8fb24c 100644
--- a/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java
+++ b/src/test/java/org/apache/ode/jacob/oo/JacobOOMappingTest.java
@@ -27,43 +27,43 @@
public class JacobOOMappingTest {
- @Test
- public void testProcessAction() {
- String action = ClassUtil.getActionForMethod(methodOf(TestProcess.class, "run"));
- Assert.assertEquals("java.lang.Runnable#run", action);
- }
+ @Test
+ public void testProcessAction() {
+ String action = ClassUtil.getActionForMethod(methodOf(TestProcess.class, "run"));
+ Assert.assertEquals("java.lang.Runnable#run", action);
+ }
- @Test
- public void testDefaultChannelAction() {
- String action = ClassUtil.getActionForMethod(methodOf(TestChannel.class, "one"));
- Assert.assertEquals("org.apache.ode.jacob.oo.JacobOOMappingTest$TestChannel#one", action);
- }
+ @Test
+ public void testDefaultChannelAction() {
+ String action = ClassUtil.getActionForMethod(methodOf(TestChannel.class, "one"));
+ Assert.assertEquals("org.apache.ode.jacob.oo.JacobOOMappingTest$TestChannel#one", action);
+ }
- @Test
- public void testCustomChannelAction() {
- String action = ClassUtil.getActionForMethod(methodOf(TestChannel.class, "two"));
- Assert.assertEquals("TestChannel-custom", action);
- }
+ @Test
+ public void testCustomChannelAction() {
+ String action = ClassUtil.getActionForMethod(methodOf(TestChannel.class, "two"));
+ Assert.assertEquals("TestChannel-custom", action);
+ }
- private final Method methodOf(Class<?> clazz, String name) {
+ private final Method methodOf(Class<?> clazz, String name) {
try {
return clazz.getMethod(name, new Class[]{});
} catch (Exception e) {
- // ignore
+ // ignore
}
return null;
- }
+ }
- public static interface TestChannel extends Channel {
- void one();
- @MessageHandler("TestChannel-custom") void two();
- }
+ public static interface TestChannel extends Channel {
+ void one();
+ @MessageHandler("TestChannel-custom") void two();
+ }
- public static class TestProcess implements Runnable {
- public void run() {
- // do nothing
- }
- }
+ public static class TestProcess implements Runnable {
+ public void run() {
+ // do nothing
+ }
+ }
}
diff --git a/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java b/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java
index 1bd938c..ca63de2 100644
--- a/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java
+++ b/src/test/java/org/apache/ode/jacob/oo/SequentialProcessingTest.java
@@ -38,223 +38,223 @@
public class SequentialProcessingTest {
@SuppressWarnings("serial")
- @Test
+ @Test
public void testParallelProcesses() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- instance(intercept(sync(null, atomicProcess("A", out)), atomicProcess("0", out)));
- instance(intercept(sync(null, atomicProcess("B", out)), atomicProcess("1", out)));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ instance(intercept(sync(null, atomicProcess("A", out)), atomicProcess("0", out)));
+ instance(intercept(sync(null, atomicProcess("B", out)), atomicProcess("1", out)));
+ }
});
- // parallelism is proven by process "B" being executed before "A0" and "A1"
- Assert.assertEquals("AB01", out.toString());
+ // parallelism is proven by process "B" being executed before "A0" and "A1"
+ Assert.assertEquals("AB01", out.toString());
}
@SuppressWarnings("serial")
- @Test
+ @Test
public void testSynchronizeProcess() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- Synch c1 = Jacob.newChannel(Synch.class, "");
- object(receive(c1, new Synch() {
- public void ret() {
- out.append("x");
- }
- }));
- instance(sync(c1, atomicProcess("A", out)));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ Synch c1 = Jacob.newChannel(Synch.class, "");
+ object(receive(c1, new Synch() {
+ public void ret() {
+ out.append("x");
+ }
+ }));
+ instance(sync(c1, atomicProcess("A", out)));
+ }
});
- // Return hook "x" is executed after process "A"
- Assert.assertEquals("Ax", out.toString());
+ // Return hook "x" is executed after process "A"
+ Assert.assertEquals("Ax", out.toString());
}
@SuppressWarnings("serial")
- @Test
+ @Test
public void testSynchronizeSynchronizedProcess() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- Synch c1 = Jacob.newChannel(Synch.class, "");
- object(receive(c1, new Synch() {
- public void ret() {
- out.append("x");
- }
- }));
- Synch c2 = Jacob.newChannel(Synch.class, "");
- object(receive(c2, new Synch() {
- public void ret() {
- out.append("y");
- }
- }));
- Synchronized process = synchronizedProcess(c1, "S", out);
- instance(sync(c2, process));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ Synch c1 = Jacob.newChannel(Synch.class, "");
+ object(receive(c1, new Synch() {
+ public void ret() {
+ out.append("x");
+ }
+ }));
+ Synch c2 = Jacob.newChannel(Synch.class, "");
+ object(receive(c2, new Synch() {
+ public void ret() {
+ out.append("y");
+ }
+ }));
+ Synchronized process = synchronizedProcess(c1, "S", out);
+ instance(sync(c2, process));
+ }
});
- // Both return hooks "x" and "y" are executed after synchronized process "S"
- Assert.assertEquals("Sxy", out.toString());
+ // Both return hooks "x" and "y" are executed after synchronized process "S"
+ Assert.assertEquals("Sxy", out.toString());
}
@SuppressWarnings("serial")
- @Test
+ @Test
public void testInterceptProcess() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- Synch c1 = Jacob.newChannel(Synch.class, "");
- object(receive(c1, new Synch() {
- public void ret() {
- out.append("x");
- }
- }));
- instance(intercept(sync(c1, atomicProcess("A", out)), atomicProcess("B", out)));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ Synch c1 = Jacob.newChannel(Synch.class, "");
+ object(receive(c1, new Synch() {
+ public void ret() {
+ out.append("x");
+ }
+ }));
+ instance(intercept(sync(c1, atomicProcess("A", out)), atomicProcess("B", out)));
+ }
});
- // Return interceptor "B" is executed after process "A", but before the hook "x"
- Assert.assertEquals("ABx", out.toString());
+ // Return interceptor "B" is executed after process "A", but before the hook "x"
+ Assert.assertEquals("ABx", out.toString());
}
@SuppressWarnings("serial")
- @Test
+ @Test
public void testInterceptSynchronizedProcess() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- Synch c1 = Jacob.newChannel(Synch.class, "");
- object(receive(c1, new Synch() {
- public void ret() {
- out.append("x");
- }
- }));
- Synch c2 = Jacob.newChannel(Synch.class, "");
- object(receive(c2, new Synch() {
- public void ret() {
- out.append("y");
- }
- }));
- instance(intercept(sync(c1, atomicProcess("A", out)), sync(c2, atomicProcess("B", out))));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ Synch c1 = Jacob.newChannel(Synch.class, "");
+ object(receive(c1, new Synch() {
+ public void ret() {
+ out.append("x");
+ }
+ }));
+ Synch c2 = Jacob.newChannel(Synch.class, "");
+ object(receive(c2, new Synch() {
+ public void ret() {
+ out.append("y");
+ }
+ }));
+ instance(intercept(sync(c1, atomicProcess("A", out)), sync(c2, atomicProcess("B", out))));
+ }
});
- // Return interceptor "B" is executed after process "A", but before the hook "x"
- Assert.assertEquals("AByx", out.toString());
+ // Return interceptor "B" is executed after process "A", but before the hook "x"
+ Assert.assertEquals("AByx", out.toString());
}
@SuppressWarnings("serial")
@Test
public void testSimpleSequence() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- Jacob.instance(sequence(
- atomicProcess("A", out),
- atomicProcess("B", out),
- atomicProcess("C", out)));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ Jacob.instance(sequence(
+ atomicProcess("A", out),
+ atomicProcess("B", out),
+ atomicProcess("C", out)));
+ }
});
- // TODO: explain
- Assert.assertEquals("ABC", out.toString());
+ // TODO: explain
+ Assert.assertEquals("ABC", out.toString());
}
@SuppressWarnings("serial")
@Test
public void testSynchronizedSequence() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- Synch c1 = Jacob.newChannel(Synch.class, "");
- object(receive(c1, new Synch() {
- public void ret() {
- out.append("x");
- }
- }));
- Jacob.instance(sequence(c1,
- atomicProcess("A", out),
- atomicProcess("B", out),
- atomicProcess("C", out),
- atomicProcess("D", out)));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ Synch c1 = Jacob.newChannel(Synch.class, "");
+ object(receive(c1, new Synch() {
+ public void ret() {
+ out.append("x");
+ }
+ }));
+ Jacob.instance(sequence(c1,
+ atomicProcess("A", out),
+ atomicProcess("B", out),
+ atomicProcess("C", out),
+ atomicProcess("D", out)));
+ }
});
- // TODO: explain
- Assert.assertEquals("ABCDx", out.toString());
+ // TODO: explain
+ Assert.assertEquals("ABCDx", out.toString());
}
@SuppressWarnings("serial")
@Test
public void testTransitiveSequence() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- Synch c1 = Jacob.newChannel(Synch.class, "");
- object(receive(c1, new Synch() {
- public void ret() {
- out.append("x");
- }
- }));
- Jacob.instance(sequence(c1,
- atomicProcess("A", out),
- sequence(atomicProcess("B", out), atomicProcess("C", out)),
- atomicProcess("D", out)));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ Synch c1 = Jacob.newChannel(Synch.class, "");
+ object(receive(c1, new Synch() {
+ public void ret() {
+ out.append("x");
+ }
+ }));
+ Jacob.instance(sequence(c1,
+ atomicProcess("A", out),
+ sequence(atomicProcess("B", out), atomicProcess("C", out)),
+ atomicProcess("D", out)));
+ }
});
- // TODO: explain
- Assert.assertEquals("ABCDx", out.toString());
+ // TODO: explain
+ Assert.assertEquals("ABCDx", out.toString());
}
@SuppressWarnings("serial")
@Test
public void testSequenceComposition() {
- final StringBuffer out = new StringBuffer();
- executeProcess(new RunnableProcess() {
- public void run() {
- Synch c1 = Jacob.newChannel(Synch.class, "");
- object(receive(c1, new Synch() {
- public void ret() {
- out.append("x");
- }
- }));
- Synch c2 = Jacob.newChannel(Synch.class, "");
- object(receive(c2, new Synch() {
- public void ret() {
- out.append("y");
- }
- }));
- // just test a more complex scenario once
- Jacob.instance(sequence(c1,
- sequence(
- sequence(
- atomicProcess("A", out),
- atomicProcess("B", out),
- atomicProcess("C", out)),
- atomicProcess("D", out)),
- atomicProcess("E", out),
- sequence(c2,
- atomicProcess("F", out),
- sequence(
- atomicProcess("G", out),
- atomicProcess("H", out)))));
- }
+ final StringBuffer out = new StringBuffer();
+ executeProcess(new RunnableProcess() {
+ public void run() {
+ Synch c1 = Jacob.newChannel(Synch.class, "");
+ object(receive(c1, new Synch() {
+ public void ret() {
+ out.append("x");
+ }
+ }));
+ Synch c2 = Jacob.newChannel(Synch.class, "");
+ object(receive(c2, new Synch() {
+ public void ret() {
+ out.append("y");
+ }
+ }));
+ // just test a more complex scenario once
+ Jacob.instance(sequence(c1,
+ sequence(
+ sequence(
+ atomicProcess("A", out),
+ atomicProcess("B", out),
+ atomicProcess("C", out)),
+ atomicProcess("D", out)),
+ atomicProcess("E", out),
+ sequence(c2,
+ atomicProcess("F", out),
+ sequence(
+ atomicProcess("G", out),
+ atomicProcess("H", out)))));
+ }
});
- // TODO: explain
- Assert.assertEquals("ABCDEFGHxy", out.toString());
+ // TODO: explain
+ Assert.assertEquals("ABCDEFGHxy", out.toString());
}
@SuppressWarnings("serial")
- protected RunnableProcess atomicProcess(final String id, final StringBuffer out) {
- return new RunnableProcess() {
- public void run() {
- out.append(id);
- }
- };
+ protected RunnableProcess atomicProcess(final String id, final StringBuffer out) {
+ return new RunnableProcess() {
+ public void run() {
+ out.append(id);
+ }
+ };
}
@SuppressWarnings("serial")
- protected Synchronized synchronizedProcess(final Synch callback, final String id, final StringBuffer out) {
- return new Synchronized(callback) {
- public void execute() {
- out.append(id);
- }
- };
+ protected Synchronized synchronizedProcess(final Synch callback, final String id, final StringBuffer out) {
+ return new Synchronized(callback) {
+ public void execute() {
+ out.append(id);
+ }
+ };
}
protected void executeProcess(final RunnableProcess process) {
diff --git a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
index 4468699..5c23c6b 100644
--- a/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
+++ b/src/test/java/org/apache/ode/jacob/vpu/ProxyConstructorTimingTest.java
@@ -102,18 +102,18 @@
}
public interface Greeter extends Channel {
- String hello(String name);
+ String hello(String name);
}
@SuppressWarnings("serial")
- public class GreeterImpl implements Greeter {
+ public class GreeterImpl implements Greeter {
public String hello(String name) {
return "Hello " + name;
}
}
@SuppressWarnings("serial")
- public class GreeterImpl2 implements Greeter {
+ public class GreeterImpl2 implements Greeter {
public String hello(String name) {
return "";
}