Initial version of a Jackson-serializable ExecutionQueue.
diff --git a/pom.xml b/pom.xml
index 48f7644..7e2acc9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,6 +29,7 @@
<properties>
<slf4j.version>1.7.2</slf4j.version>
<junit.version>4.11</junit.version>
+ <jackson.version>2.1.3</jackson.version>
</properties>
<dependencyManagement>
@@ -48,6 +49,11 @@
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -56,6 +62,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
<!-- test -->
<dependency>
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java
new file mode 100644
index 0000000..cab8380
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.io.IOException;
+
+import org.apache.ode.jacob.Channel;
+import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.jacob.vpu.ChannelFactory;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class ChannelProxyDeserializer extends StdDeserializer<Channel> {
+
+ private static final long serialVersionUID = 1L;
+
+ public ChannelProxyDeserializer() {
+ super(Channel.class);
+ }
+
+ @Override
+ public Channel deserialize(JsonParser jp, DeserializationContext ctxt)
+ throws IOException, JsonProcessingException {
+
+ String type = null;
+ int id = -1;
+ while (jp.nextToken() != JsonToken.END_OBJECT) {
+ String fieldname = jp.getCurrentName();
+ if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
+ // if we're not already on the field, advance by one.
+ jp.nextToken();
+ }
+ if ("channelType".equals(fieldname)) {
+ type = jp.getText();
+ } else if ("channelId".equals(fieldname)) {
+ id = jp.getIntValue();
+ }
+ }
+
+ if (type == null) {
+ throw ctxt.mappingException(Channel.class);
+ }
+
+ if (id < 0) {
+ throw ctxt.mappingException(Channel.class);
+ }
+
+
+ try {
+ CommChannel channel = new CommChannel(ctxt.findClass(type));
+ channel.setId(id);
+ return (Channel)ChannelFactory.createChannel(channel, channel.getType());
+
+ } catch (ClassNotFoundException e) {
+ throw ctxt.instantiationException(Channel.class, e);
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
new file mode 100644
index 0000000..eadb813
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxySerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.io.IOException;
+
+import org.apache.ode.jacob.Channel;
+import org.apache.ode.jacob.ChannelProxy;
+import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.jacob.vpu.ChannelFactory;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
+import com.fasterxml.jackson.databind.jsontype.impl.ClassNameIdResolver;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class ChannelProxySerializer extends StdSerializer<ChannelProxy>{
+
+ protected ChannelProxySerializer() {
+ super(ChannelProxy.class);
+ }
+
+ @Override
+ public void serialize(ChannelProxy value, JsonGenerator jgen,
+ SerializerProvider provider) throws IOException,
+ JsonGenerationException {
+ jgen.writeStartObject();
+ serializeContents(value, jgen, provider);
+ jgen.writeEndObject();
+ }
+
+ @Override
+ public void serializeWithType(ChannelProxy value, JsonGenerator jgen, SerializerProvider provider,
+ TypeSerializer typeSer)
+ throws IOException, JsonGenerationException
+ {
+ typeSer.writeTypePrefixForObject(value, jgen);
+ serializeContents(value, jgen, provider);
+ typeSer.writeTypeSuffixForObject(value, jgen);
+ }
+
+ private void serializeContents(ChannelProxy value, JsonGenerator jgen,
+ SerializerProvider provider) throws JsonGenerationException, IOException {
+ CommChannel commChannel = (CommChannel) ChannelFactory.getBackend((Channel)value);
+ ClassNameIdResolver idResolver = new ClassNameIdResolver(provider.constructType(commChannel.getType()), provider.getTypeFactory());
+ jgen.writeStringField("channelType", idResolver.idFromBaseType());
+ jgen.writeNumberField("channelId", (Integer)commChannel.getId());
+ }
+
+}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationDeserializer.java
new file mode 100644
index 0000000..75b7fba
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationDeserializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.io.IOException;
+
+import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.soup.Continuation;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+public class ContinuationDeserializer extends StdDeserializer<Continuation> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected ContinuationDeserializer() {
+ super(Continuation.class);
+ }
+
+ @Override
+ public Continuation deserialize(JsonParser jp,
+ DeserializationContext ctxt) throws IOException,
+ JsonProcessingException {
+
+ JacobObject target = null;
+ String methodName = null;
+ Object[] args = null;
+
+ while (jp.nextToken() != JsonToken.END_OBJECT) {
+ String fieldname = jp.getCurrentName();
+ if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
+ // if we're not already on the field, advance by one.
+ jp.nextToken();
+ }
+
+ if ("target".equals(fieldname)) {
+ target = jp.readValueAs(JacobObject.class);
+ } else if ("method".equals(fieldname)) {
+ methodName = jp.getText();
+ } if ("args".equals(fieldname)) {
+ args = jp.readValueAs(Object[].class);
+ }
+ }
+
+ if (target == null) {
+ throw ctxt.mappingException(Continuation.class);
+ }
+
+ if (methodName == null) {
+ throw ctxt.mappingException(Continuation.class);
+ }
+
+ return new Continuation(target, target.getMethod(methodName), args);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java
new file mode 100644
index 0000000..242b602
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.io.IOException;
+
+import org.apache.ode.jacob.soup.Continuation;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class ContinuationSerializer extends StdSerializer<Continuation> {
+
+ public ContinuationSerializer() {
+ super(Continuation.class);
+ }
+
+ @Override
+ public void serialize(Continuation value, JsonGenerator jgen,
+ SerializerProvider provider) throws IOException,
+ JsonGenerationException {
+ jgen.writeStartObject();
+ serializeContents(value, jgen, provider);
+ jgen.writeEndObject();
+ }
+
+
+ @Override
+ public void serializeWithType(Continuation value, JsonGenerator jgen,
+ SerializerProvider provider, TypeSerializer typeSer)
+ throws IOException, JsonProcessingException {
+ typeSer.writeTypePrefixForObject(value, jgen);
+ serializeContents(value, jgen, provider);
+ typeSer.writeTypeSuffixForObject(value, jgen);
+ }
+
+ private void serializeContents(Continuation value, JsonGenerator jgen,
+ SerializerProvider provider) throws JsonGenerationException, IOException {
+
+ jgen.writeObjectField("target", value.getClosure());
+ jgen.writeStringField("method", value.getMethod().getName());
+ jgen.writeObjectField("args", value.getArgs());
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationValueInstantiator.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationValueInstantiator.java
new file mode 100644
index 0000000..d17fcfa
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ContinuationValueInstantiator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.lang.reflect.Method;
+
+import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.soup.Continuation;
+
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.deser.CreatorProperty;
+import com.fasterxml.jackson.databind.deser.SettableBeanProperty;
+import com.fasterxml.jackson.databind.deser.ValueInstantiator;
+
+public class ContinuationValueInstantiator extends ValueInstantiator {
+
+ @Override
+ public String getValueTypeDesc() {
+ return Continuation.class.getName();
+ }
+
+ @Override
+ public boolean canCreateFromObjectWith() {
+ return true;
+ }
+
+ @Override
+ public SettableBeanProperty[] getFromObjectArguments(
+ DeserializationConfig config) {
+ return new CreatorProperty[] {
+ new CreatorProperty("_closure", config.constructType(JacobObject.class), null, null, null, 0, null),
+ new CreatorProperty("_method", config.constructType(Method.class), null, null, null, 1, null),
+ new CreatorProperty("_args", config.constructType(Object[].class), null, null, null, 2, null)};
+
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
new file mode 100644
index 0000000..eeb1333
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.ode.jacob.Channel;
+import org.apache.ode.jacob.ChannelProxy;
+import org.apache.ode.jacob.soup.Continuation;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+/**
+ * Variant of {@link org.apache.ode.jacob.vpu.ExecutionQueueImpl} that can be
+ * serialized and deserialized with Jackson.
+ */
+public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
+
+ public JacksonExecutionQueueImpl() {
+ super(null);
+ }
+
+ public static ObjectMapper configureMapper() {
+
+ SimpleModule sm = new SimpleModule("jacobmodule");
+ sm.addSerializer(ChannelProxy.class, new ChannelProxySerializer());
+ sm.addSerializer(Continuation.class, new ContinuationSerializer());
+ sm.addSerializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplSerializer());
+ sm.addDeserializer(JacksonExecutionQueueImpl.class, new ExecutionQueueImplDeserializer());
+ sm.addDeserializer(Continuation.class, new ContinuationDeserializer());
+ sm.addDeserializer(Channel.class, new ChannelProxyDeserializer());
+
+ ObjectMapper om = new ObjectMapper();
+ om.registerModule(sm);
+ om.disable(MapperFeature.AUTO_DETECT_CREATORS);
+ om.disable(MapperFeature.AUTO_DETECT_GETTERS);
+ om.disable(MapperFeature.AUTO_DETECT_IS_GETTERS);
+ om.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
+
+ om.setDefaultTyping(new JacobTypeResolverBuilder());
+
+ om.enable(SerializationFeature.WRITE_ENUMS_USING_INDEX);
+ om.enable(SerializationFeature.INDENT_OUTPUT);
+
+ return om;
+ }
+
+
+ public static class ExecutionQueueImplSerializer extends StdSerializer<JacksonExecutionQueueImpl> {
+
+ public ExecutionQueueImplSerializer() {
+ super(JacksonExecutionQueueImpl.class);
+ }
+
+ @Override
+ public void serialize(JacksonExecutionQueueImpl value, JsonGenerator jgen,
+ SerializerProvider provider) throws IOException,
+ JsonGenerationException {
+ jgen.writeStartObject();
+ serializeContents(value, jgen, provider);
+ jgen.writeEndObject();
+ }
+
+
+ @Override
+ public void serializeWithType(JacksonExecutionQueueImpl value, JsonGenerator jgen,
+ SerializerProvider provider, TypeSerializer typeSer)
+ throws IOException, JsonProcessingException {
+ typeSer.writeTypePrefixForObject(value, jgen);
+ serializeContents(value, jgen, provider);
+ typeSer.writeTypeSuffixForObject(value, jgen);
+ }
+
+ private void serializeContents(JacksonExecutionQueueImpl value, JsonGenerator jgen,
+ SerializerProvider provider) throws JsonGenerationException, IOException {
+
+ jgen.writeNumberField("objIdCounter", value._objIdCounter);
+ jgen.writeNumberField("currentCycle", value._currentCycle);
+
+ jgen.writeObjectField("continuations", value._reactions.toArray(new Continuation[] {}));
+ jgen.writeObjectField("channels", value._channels.values().toArray(new ChannelFrame[] {}));
+ jgen.writeObjectField("global", value._gdata);
+ }
+ }
+
+ public static class ExecutionQueueImplDeserializer extends StdDeserializer<JacksonExecutionQueueImpl> {
+
+ private static final long serialVersionUID = 1L;
+
+ public ExecutionQueueImplDeserializer() {
+ super(JacksonExecutionQueueImpl.class);
+ }
+
+ @Override
+ public JacksonExecutionQueueImpl deserialize(JsonParser jp,
+ DeserializationContext ctxt) throws IOException,
+ JsonProcessingException {
+
+ JacksonExecutionQueueImpl soup = new JacksonExecutionQueueImpl();
+
+ while (jp.nextToken() != JsonToken.END_OBJECT) {
+ String fieldname = jp.getCurrentName();
+ if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
+ // if we're not already on the field, advance by one.
+ jp.nextToken();
+ }
+
+ if ("objIdCounter".equals(fieldname)) {
+ soup._objIdCounter = jp.getIntValue();
+ } else if ("currentCycle".equals(fieldname)) {
+ soup._currentCycle = jp.getIntValue();
+ } else if ("continuations".equals(fieldname)) {
+ Continuation[] cs = (Continuation[])jp.readValueAs(Continuation[].class);
+ soup._reactions = new HashSet<Continuation>(Arrays.asList(cs));
+ } else if ("channels".equals(fieldname)) {
+ soup._channels = new HashMap<Integer, ChannelFrame>();
+ ChannelFrame[] frames = jp.readValueAs(ChannelFrame[].class);
+ for (ChannelFrame f : frames) {
+ soup._channels.put(f.getId(), f);
+ }
+ }
+
+ }
+ return soup;
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
new file mode 100644
index 0000000..8e6a982
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jacob.soup.jackson;
+
+import java.util.Collection;
+
+import org.apache.ode.jacob.Channel;
+import org.apache.ode.jacob.ChannelProxy;
+import org.apache.ode.jacob.JacobObject;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.SerializationConfig;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
+import com.fasterxml.jackson.databind.jsontype.TypeIdResolver;
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
+import com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer;
+import com.fasterxml.jackson.databind.jsontype.impl.ClassNameIdResolver;
+import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder;
+import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+
+public class JacobTypeResolverBuilder extends StdTypeResolverBuilder {
+
+ public JacobTypeResolverBuilder() {
+ init(JsonTypeInfo.Id.CLASS, null);
+ inclusion(JsonTypeInfo.As.PROPERTY);
+ typeProperty("@class");
+ }
+
+
+ @Override
+ protected TypeIdResolver idResolver(MapperConfig<?> config,
+ JavaType baseType, Collection<NamedType> subtypes, boolean forSer,
+ boolean forDeser) {
+ return new ChannelAwareTypeIdResolver(baseType, config.getTypeFactory());
+ }
+
+
+ @Override
+ public TypeSerializer buildTypeSerializer(SerializationConfig config,
+ JavaType baseType, Collection<NamedType> subtypes) {
+
+ return useForType(baseType) ? super.buildTypeSerializer(config, baseType, subtypes) : null;
+ }
+
+ private boolean useForType(JavaType t) {
+ if (JacobObject.class.isAssignableFrom(t.getRawClass())) {
+ //System.err.println("XXX: JO " + t);
+ return true;
+ }
+
+ if (Channel.class.isAssignableFrom(t.getRawClass())) {
+ //System.err.println("XXX: CH " + t);
+ return true;
+ }
+
+ //if (!t.isConcrete()) {
+ if (t.getRawClass() == Object.class) {
+ //System.err.println("XXX: CON " + t + "- " + t.isConcrete());
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public TypeDeserializer buildTypeDeserializer(DeserializationConfig config,
+ JavaType baseType, Collection<NamedType> subtypes) {
+
+ if (useForType(baseType)) {
+ if (baseType.isInterface() && Channel.class.isAssignableFrom(baseType.getRawClass())) {
+ TypeIdResolver idRes = idResolver(config, baseType, subtypes, false, true);
+ return new AsPropertyTypeDeserializer(baseType, idRes,
+ _typeProperty, _typeIdVisible, Channel.class);
+ } else {
+ return super.buildTypeDeserializer(config, baseType, subtypes);
+ }
+ }
+
+ return null;
+ }
+
+ public static class ChannelAwareTypeIdResolver extends TypeIdResolverBase {
+
+ private ClassNameIdResolver delegate;
+
+ protected ChannelAwareTypeIdResolver(JavaType baseType,
+ TypeFactory typeFactory) {
+ super(baseType, typeFactory);
+ delegate = new ClassNameIdResolver(baseType, typeFactory);
+ }
+
+ @Override
+ public String idFromValue(Object value) {
+ if (value instanceof ChannelProxy) {
+ return "<<channelproxy>>";
+ }
+ return delegate.idFromValue(value);
+ }
+
+ @Override
+ public String idFromValueAndType(Object value, Class<?> suggestedType) {
+ return delegate.idFromValueAndType(value, suggestedType);
+ }
+
+ @Override
+ public JavaType typeFromId(String id) {
+ if ("<<channelproxy>>".equals(id)) {
+ return null; // force jackson to use default impl
+ }
+ return delegate.typeFromId(id);
+ }
+
+ @Override
+ public Id getMechanism() {
+ return Id.CUSTOM;
+ }
+
+ }
+
+}
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index 2581b57..362be0c 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -80,21 +80,21 @@
* forward progress; this scenario would occur if a maximum processign
* time-per-instance policy were in effect.
*/
- private Set<Continuation> _reactions = new HashSet<Continuation>();
+ protected Set<Continuation> _reactions = new HashSet<Continuation>();
- private Map<Integer, ChannelFrame> _channels = new HashMap<Integer, ChannelFrame>();
+ protected Map<Integer, ChannelFrame> _channels = new HashMap<Integer, ChannelFrame>();
/**
* The "expected" cycle counter, use to detect database serialization
* issues.
*/
- private int _currentCycle;
+ protected int _currentCycle;
- private int _objIdCounter;
+ protected int _objIdCounter;
private ReplacementMap _replacementMap;
- private Serializable _gdata;
+ protected Serializable _gdata;
private Map<Object, LinkedList<IndexedObject>> _index = new HashMap<Object, LinkedList<IndexedObject>>();
@@ -442,7 +442,7 @@
return _gdata;
}
- private static class ChannelFrame implements Externalizable {
+ protected static class ChannelFrame implements Externalizable {
Class<?> type;
int id;
@@ -534,7 +534,7 @@
}
@SuppressWarnings("serial")
- private static class CommGroupFrame implements Serializable {
+ protected static class CommGroupFrame implements Serializable {
boolean replicated;
public Set<CommFrame> commFrames = new HashSet<CommFrame>();
@@ -543,7 +543,7 @@
}
}
- private static class CommFrame implements Externalizable {
+ protected static class CommFrame implements Externalizable {
CommGroupFrame commGroupFrame;
ChannelFrame channelFrame;
@@ -566,7 +566,7 @@
}
}
- private static class ObjectFrame extends CommFrame implements Externalizable {
+ protected static class ObjectFrame extends CommFrame implements Externalizable {
private static final long serialVersionUID = -7212430608484116919L;
ChannelListener _continuation;
@@ -743,7 +743,7 @@
}
}
- private static final class ChannelRef implements Externalizable {
+ protected static final class ChannelRef implements Externalizable {
private Class<?> _type;
private Integer _id;
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index c0f5d05..d755789 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -24,9 +24,13 @@
import org.apache.ode.jacob.Synch;
import org.apache.ode.jacob.Val;
import org.apache.ode.jacob.examples.sequence.Sequence;
-import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.ode.jacob.soup.jackson.JacksonExecutionQueueImpl;
import org.apache.ode.jacob.vpu.JacobVPU;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
/**
* Simple Hello World example to showcase different
* features and approaches of the Jacob API.
@@ -37,20 +41,22 @@
@SuppressWarnings("serial")
public class HelloWorld extends JacobRunnable {
- public interface Callback<T, R extends Channel> extends Channel {
+ public static interface Callback<T, R extends Channel> extends Channel {
public void invoke(T value, R callback);
}
static class ReliablePrinterProcess extends JacobRunnable {
- private Callback<String, Synch> _in;
- public ReliablePrinterProcess(Callback<String, Synch> in) {
- _in = in;
+ private Callback<String, Synch> in;
+
+ @JsonCreator
+ public ReliablePrinterProcess(@JsonProperty("in") Callback<String, Synch> in) {
+ this.in = in;
}
public void run() {
object(true, new ReceiveProcess() {
private static final long serialVersionUID = 1L;
- }.setChannel(_in).setReceiver(new Callback<String, Synch>(){
+ }.setChannel(in).setReceiver(new Callback<String, Synch>(){
@Override
public void invoke(String value, Synch callback) {
System.out.println(value);
@@ -63,8 +69,9 @@
static class ReliableStringEmitterProcess extends JacobRunnable {
private String str;
private Callback<String, Synch> to;
-
- public ReliableStringEmitterProcess(String str, Callback<String, Synch> to) {
+
+ @JsonCreator
+ public ReliableStringEmitterProcess(@JsonProperty("str")String str, @JsonProperty("to") Callback<String, Synch> to) {
this.str = str;
this.to = to;
}
@@ -85,26 +92,30 @@
static class PrinterProcess extends JacobRunnable {
private Val _in;
- public PrinterProcess(Val in) {
+
+ @JsonCreator
+ public PrinterProcess(@JsonProperty("in") Val in) {
_in = in;
}
public void run() {
- object(true, new ReceiveProcess() {
- private static final long serialVersionUID = 1L;
- }.setChannel(_in).setReceiver(new Val(){
- public void val(Object o) {
- System.out.println(o);
- }
- }));
+ object(true, new PrinterProcessReceiveProcess().setChannel(_in).setReceiver(new PrinterProcessVal()));
+ }
+
+ static class PrinterProcessReceiveProcess extends ReceiveProcess {}
+ static class PrinterProcessVal implements Val {
+ public void val(Object o) {
+ System.out.println(o);
+ }
}
}
static class StringEmitterProcess extends JacobRunnable {
private String str;
private Val to;
-
- public StringEmitterProcess(String str, Val to) {
+
+ @JsonCreator
+ public StringEmitterProcess(@JsonProperty("str") String str, @JsonProperty("to") Val to) {
this.str = str;
this.to = to;
}
@@ -117,20 +128,29 @@
static class ForwarderProcess extends JacobRunnable {
private Val in;
private Val out;
- public ForwarderProcess(Val in, Val out) {
+
+ @JsonCreator
+ public ForwarderProcess(@JsonProperty("in") Val in, @JsonProperty("out") Val out) {
this.in = in;
this.out = out;
}
public void run() {
- object(true, new ReceiveProcess() {
- private static final long serialVersionUID = 1L;
- }.setChannel(in).setReceiver(new Val(){
- public void val(Object o) {
- out.val(o);
- }
- }));
+ object(true, new ForwarderProcessReceiveProcess().setChannel(in).setReceiver(new ForwarderProcessVal(out)));
}
+
+ static class ForwarderProcessReceiveProcess extends ReceiveProcess {}
+ static class ForwarderProcessVal implements Val {
+ private Val out;
+ @JsonCreator
+ public ForwarderProcessVal(@JsonProperty("out")Val out) {
+ this.out = out;
+ }
+ public void val(Object o) {
+ out.val(o);
+ }
+ }
+
}
private void simpleHelloWorld() {
@@ -153,6 +173,7 @@
// (new(callback).!out(hello).?callback) | (new(callback).!out(world).?callback)
// new(rout)
+ @SuppressWarnings("unchecked")
Callback<String, Synch> rout = newChannel(Callback.class, "reliableHelloWorld-rout");
// *(?rout(str).!sysout(str))
instance(new ReliablePrinterProcess(rout));
@@ -170,21 +191,49 @@
// new(out)
final Val out = newChannel(Val.class, "sequencedHelloWorld-out");
+ // *(?out(str).!sysout(str))
+ instance(new PrinterProcess(out));
+
final String[] greeting = {"Hello", "World"};
- instance(new Sequence(greeting.length, null) {
- @Override
- protected JacobRunnable doStep(final int step, final Synch done) {
- return new JacobRunnable() {
- @Override
- public void run() {
- instance(new StringEmitterProcess(greeting[step], out));
- done.ret();
- }
- };
- }
- });
+
+ instance(new HWSequence(greeting, out, null));
+ }
+
+ static class HWSequence extends Sequence {
+
+ private final String[] greetings;
+ private final Val out;
+
+ @JsonCreator
+ public HWSequence(@JsonProperty("greetings") String[] greetings, @JsonProperty("out") Val out, @JsonProperty("done") Synch done) {
+ super(greetings.length, done);
+ this.greetings = greetings;
+ this.out = out;
+ }
+
+ @Override
+ protected JacobRunnable doStep(int step, Synch done) {
+ return new SequenceItemEmitter(greetings[step], done);
+ }
+
+ class SequenceItemEmitter extends JacobRunnable {
+ private final String string;
+ private final Synch done;
+
+ public SequenceItemEmitter(String string, Synch done) {
+ this.string = string;
+ this.done = done;
+ }
+
+ @Override
+ public void run() {
+ instance(new StringEmitterProcess(string, out));
+ done.ret();
+ }
+ }
}
+
@Override
public void run() {
simpleHelloWorld();
@@ -192,15 +241,31 @@
sequencedHelloWorld();
}
- public static void main(String args[]) {
+ public static void main(String args[]) throws Exception {
+ long start = System.currentTimeMillis();
+ ObjectMapper mapper = JacksonExecutionQueueImpl.configureMapper();
JacobVPU vpu = new JacobVPU();
- vpu.setContext(new ExecutionQueueImpl(null));
+ JacksonExecutionQueueImpl queue = new JacksonExecutionQueueImpl();
+ vpu.setContext(queue);
vpu.inject(new HelloWorld());
while (vpu.execute()) {
+ queue = loadAndRestoreQueue(mapper, queue);
System.out.println(vpu.isComplete() ? "<0>" : ".");
//vpu.dumpState();
}
vpu.dumpState();
+ System.out.println("Runtime in ms: " + (System.currentTimeMillis() - start));
+ }
+
+ public static JacksonExecutionQueueImpl loadAndRestoreQueue(ObjectMapper mapper, JacksonExecutionQueueImpl in) throws Exception {
+ String json = mapper.writeValueAsString(in);
+ // System.out.println(json);
+ JacksonExecutionQueueImpl q2 = mapper.readValue(json, JacksonExecutionQueueImpl.class);
+ //String json2 = mapper.writeValueAsString(q2);
+
+ // System.out.println("----");
+ // System.out.println(json2);
+ return q2;
}
}