blob: 15587ac234b5d994d97b8f54a4311da0e26c448d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.jacob.examples.helloworld;
import static org.apache.ode.jacob.Jacob.*;
import org.apache.ode.jacob.ChannelRef;
import org.apache.ode.jacob.JacobObject;
import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.MessageListener;
import org.apache.ode.jacob.examples.sequence.Sequence;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ReceiveProcess;
import org.apache.ode.jacob.oo.Synch;
import org.apache.ode.jacob.oo.Val;
import org.apache.ode.jacob.soup.CommChannel;
import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl;
import org.apache.ode.jacob.soup.jackson.JacobModule;
import org.apache.ode.jacob.vpu.JacobVPU;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
/**
* Simple Hello World example to showcase different
* features and approaches of the Jacob API.
*
* Inspired by http://scienceblogs.com/goodmath/2007/04/16/back-to-calculus-a-better-intr-1/
*
* @author Tammo van Lessen
*
*/
@SuppressWarnings("serial")
public class HelloWorld extends JacobObject implements Runnable {
public static interface Callback<T, R extends Channel> extends Channel {
public void invoke(T value, R callback);
}
static class ReliablePrinterProcess extends JacobObject implements Runnable {
private Callback<String, Synch> in;
@JsonCreator
public ReliablePrinterProcess(@JsonProperty("in") Callback<String, Synch> in) {
this.in = in;
}
public void run() {
object(true, new ReliablePrinterReceiveProcess().setChannel(in).setReceiver(new ReliablePrinterCallback()));
}
static class ReliablePrinterReceiveProcess extends ReceiveProcess {}
static class ReliablePrinterCallback implements Callback<String, Synch> {
public void invoke(String value, Synch callback) {
System.out.println(value);
callback.ret();
}
}
}
static class ReliableStringEmitterProcess extends JacobObject implements Runnable {
private String str;
private Callback<String, Synch> to;
@JsonCreator
public ReliableStringEmitterProcess(@JsonProperty("str")String str, @JsonProperty("to") Callback<String, Synch> to) {
this.str = str;
this.to = to;
}
public void run() {
Synch callback = newChannel(Synch.class, "callback channel to ACK " + str);
object(new ReliableStringEmitterReceiveProcess().setChannel(callback).setReceiver(new ReliableStringEmitterSynch(str)));
to.invoke(str, callback);
}
static class ReliableStringEmitterReceiveProcess extends ReceiveProcess {}
static class ReliableStringEmitterSynch implements Synch {
private String str;
@JsonCreator
public ReliableStringEmitterSynch(@JsonProperty("str") String str) {
this.str = str;
}
public void ret() {
System.out.println(str + " ACKed");
}
}
}
static class PrinterProcess extends JacobObject implements Runnable {
private Val _in;
@JsonCreator
public PrinterProcess(@JsonProperty("in") Val in) {
_in = in;
}
public void run() {
object(true, new PrinterProcessReceiveProcess().setChannel(_in).setReceiver(new PrinterProcessVal()));
}
static class PrinterProcessReceiveProcess extends ReceiveProcess {}
static class PrinterProcessVal implements Val {
public void val(Object o) {
System.out.println(o);
}
}
}
static class StringEmitterProcess extends JacobObject implements Runnable {
private String str;
private Val to;
@JsonCreator
public StringEmitterProcess(@JsonProperty("str") String str, @JsonProperty("to") Val to) {
this.str = str;
this.to = to;
}
public void run() {
to.val(str);
}
}
static class ForwarderProcess extends JacobObject implements Runnable {
private Val in;
private Val out;
@JsonCreator
public ForwarderProcess(@JsonProperty("in") Val in, @JsonProperty("out") Val out) {
this.in = in;
this.out = out;
}
public void run() {
object(true, new ForwarderProcessReceiveProcess().setChannel(in).setReceiver(new ForwarderProcessVal(out)));
}
static class ForwarderProcessReceiveProcess extends ReceiveProcess {}
static class ForwarderProcessVal implements Val {
private Val out;
@JsonCreator
public ForwarderProcessVal(@JsonProperty("out")Val out) {
this.out = out;
}
public void val(Object o) {
out.val(o);
}
}
}
protected void simpleHelloWorld() {
// new(out)
final Val out = newChannel(Val.class, "simpleHelloWorld-out");
// new(x)
final Val x = newChannel(Val.class, "simpleHelloWorld-x");
// *(?out(str).!sysout(str))
instance(new PrinterProcess(out));
// *(?x(str).!out(str))
instance(new ForwarderProcess(x, out));
// !out(hello) | !out(world)
instance(new StringEmitterProcess("Hello", x));
instance(new StringEmitterProcess("World", x));
}
protected void reliableHelloWorld() {
// reliable version of the code above
// (new(callback).!out(hello).?callback) | (new(callback).!out(world).?callback)
// new(rout)
@SuppressWarnings("unchecked")
Callback<String, Synch> rout = newChannel(Callback.class, "reliableHelloWorld-rout");
// *(?rout(str).!sysout(str))
instance(new ReliablePrinterProcess(rout));
// (new(callback).!out(hello).?callback)
instance(new ReliableStringEmitterProcess("Hello", rout));
// (new(callback).!out(world).?callback)
instance(new ReliableStringEmitterProcess("World", rout));
}
protected void sequencedHelloWorld() {
// send hello world as a sequence
// !out(hello).!out(world)
// new(out)
final Val out = newChannel(Val.class, "sequencedHelloWorld-out");
// *(?out(str).!sysout(str))
instance(new PrinterProcess(out));
final String[] greeting = {"Hello", "World"};
instance(new HWSequence(greeting, out, null));
}
protected void calculusHelloWorld() {
// new(out)
final ChannelRef out = newCommChannel(Val.class, "calculusHelloWorld-out");
// new(x)
final ChannelRef x = newCommChannel(Val.class, "calculusHelloWorld-x");
// *(?out(str).!sysout(str))
subscribe(true, out, new PrinterMessageListener());
// *(?x(str).!out(str))
subscribe(true, x, new ForwarderMessageListener(out));
// !out(hello) | !out(world)
instance(new StringEmitterRunnable("Hello", x));
instance(new StringEmitterRunnable("World", x));
}
static class PrinterMessageListener implements MessageListener {
@Override
public void onMessage(Message msg) {
System.out.println(msg.getBody());
}
}
static class ForwarderMessageListener implements MessageListener {
private ChannelRef to;
@JsonCreator
public ForwarderMessageListener(@JsonProperty("to") ChannelRef to) {
this.to = to;
}
@Override
public void onMessage(Message msg) {
Message msg2 = new Message(to, null, msg.getAction());
msg2.setBody(msg.getBody());
sendMessage(msg2);
}
}
static class StringEmitterRunnable extends JacobObject implements Runnable {
private String str;
private ChannelRef to;
@JsonCreator
public StringEmitterRunnable(@JsonProperty("str") String str, @JsonProperty("to") ChannelRef to) {
this.str = str;
this.to = to;
}
public void run() {
Message msg = new Message(to, null, "printHW");
msg.setBody(str);
sendMessage(msg);
}
}
static class HWSequence extends Sequence {
private final String[] greetings;
private final Val out;
@JsonCreator
public HWSequence(@JsonProperty("greetings") String[] greetings, @JsonProperty("out") Val out, @JsonProperty("done") Synch done) {
super(greetings.length, done);
this.greetings = greetings;
this.out = out;
}
@Override
protected Runnable doStep(int step, Synch done) {
return new SequenceItemEmitter(greetings[step], done, out);
}
static class SequenceItemEmitter extends JacobObject implements Runnable {
private final String string;
private final Synch done;
private final Val out;
@JsonCreator
public SequenceItemEmitter(@JsonProperty("string") String string, @JsonProperty("done") Synch done, @JsonProperty("out") Val out) {
this.string = string;
this.done = done;
this.out = out;
}
@Override
public void run() {
instance(new StringEmitterProcess(string, out));
done.ret();
}
}
}
@Override
public void run() {
// simpleHelloWorld();
// reliableHelloWorld();
// sequencedHelloWorld();
calculusHelloWorld();
}
public static void main(String args[]) throws Exception {
// enable logging
//BasicConfigurator.configure();
SmileFactory sf = null;
// // enable smile:
// sf = new SmileFactory();
// sf.enable(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES);
// sf.enable(SmileGenerator.Feature.ENCODE_BINARY_AS_7BIT);
ObjectMapper mapper = new ObjectMapper(sf);
mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
mapper.registerModule(new JacobModule());
JacobVPU vpu = new JacobVPU();
JacksonExecutionQueueImpl queue = new JacksonExecutionQueueImpl();
vpu.setContext(queue);
long start = System.currentTimeMillis();
vpu.inject(new HelloWorld());
while (vpu.execute()) {
queue = loadAndRestoreQueue(mapper, (JacksonExecutionQueueImpl)vpu.getContext());
vpu.setContext(queue);
System.out.println(vpu.isComplete() ? "<0>" : ".");
//vpu.dumpState();
}
System.out.println("Runtime in ms: " + (System.currentTimeMillis() - start));
vpu.dumpState();
}
public static JacksonExecutionQueueImpl loadAndRestoreQueue(ObjectMapper mapper, JacksonExecutionQueueImpl in) throws Exception {
byte[] json = mapper.writeValueAsBytes(in);
// print json
System.out.println(new String(json));
JacksonExecutionQueueImpl q2 = mapper.readValue(json, JacksonExecutionQueueImpl.class);
return q2;
}
}