blob: 1a1fe8dde3176ec97902b6326464afbad6763885 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.jacob.soup.jackson;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
/**
* Variant of {@link org.apache.ode.jacob.vpu.ExecutionQueueImpl} that can be
* serialized and deserialized with Jackson.
*
* @author Tammo van Lessen
*
*/
public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
private static final Logger LOG = LoggerFactory.getLogger(JacksonExecutionQueueImpl.class);
public JacksonExecutionQueueImpl() {
super(null);
}
public static class ExecutionQueueImplSerializer extends StdSerializer<JacksonExecutionQueueImpl> {
private Set<Integer> usedChannels = new LinkedHashSet<Integer>();
public ExecutionQueueImplSerializer() {
super(JacksonExecutionQueueImpl.class);
}
public void markChannelUsed(int channelId) {
usedChannels.add(channelId);
}
@Override
public void serialize(JacksonExecutionQueueImpl value, JsonGenerator jgen,
SerializerProvider provider) throws IOException,
JsonGenerationException {
jgen.writeStartObject();
serializeContents(value, jgen, provider);
jgen.writeEndObject();
}
@Override
public void serializeWithType(JacksonExecutionQueueImpl value, JsonGenerator jgen,
SerializerProvider provider, TypeSerializer typeSer)
throws IOException, JsonProcessingException {
typeSer.writeTypePrefixForObject(value, jgen);
serializeContents(value, jgen, provider);
typeSer.writeTypeSuffixForObject(value, jgen);
}
private void serializeContents(JacksonExecutionQueueImpl value, JsonGenerator jgen,
SerializerProvider provider) throws JsonGenerationException, IOException {
usedChannels.clear();
// write metadata
jgen.writeNumberField("objIdCounter", value._objIdCounter);
jgen.writeNumberField("currentCycle", value._currentCycle);
// write continuations
jgen.writeObjectField("messages", value._messages.toArray(new Message[]{}));
// channel garbage collection
// - traverse whole object graph and record referenced channel proxies.
// - first, regularily serialize continuations.
// - second, serialize channels to a null serializer in order to record channel references
// without writing them to the stream.
// - remove unused channels.
// - serialize remaining channels.
// write channels to null serializer
JsonGenerator nullgen = new NullJsonGenerator(null, 0, jgen.getCodec());
nullgen.writeObjectField("channels", value._channels.values().toArray(new ChannelFrame[] {}));
// remove unreferenced channels (and keep those which have been exported using export()).
for (Iterator<ChannelFrame> i = value._channels.values().iterator(); i.hasNext();) {
ChannelFrame cframe = i.next();
if (usedChannels.contains(cframe.getId()) || cframe.getRefCount() > 0) {
// skip
} else {
LOG.debug("GC Channel: {}", cframe);
i.remove();
}
}
// write channels
jgen.writeObjectField("channels", value._channels.values().toArray(new ChannelFrame[] {}));
// write global data
jgen.writeObjectField("global", value._gdata);
}
}
public static class ExecutionQueueImplDeserializer extends StdDeserializer<JacksonExecutionQueueImpl> {
private static final long serialVersionUID = 1L;
public ExecutionQueueImplDeserializer() {
super(JacksonExecutionQueueImpl.class);
}
@Override
public JacksonExecutionQueueImpl deserialize(JsonParser jp,
DeserializationContext ctxt) throws IOException,
JsonProcessingException {
JacksonExecutionQueueImpl soup = new JacksonExecutionQueueImpl();
while (jp.nextToken() != JsonToken.END_OBJECT) {
String fieldname = jp.getCurrentName();
if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
// if we're not already on the field, advance by one.
jp.nextToken();
}
if ("objIdCounter".equals(fieldname)) {
soup._objIdCounter = jp.getIntValue();
} else if ("currentCycle".equals(fieldname)) {
soup._currentCycle = jp.getIntValue();
} else if ("messages".equals(fieldname)) {
Message[] cs = (Message[])jp.readValueAs(Message[].class);
soup._messages = new HashSet<Message>(Arrays.asList(cs));
} else if ("channels".equals(fieldname)) {
soup._channels = new HashMap<Integer, ChannelFrame>();
ChannelFrame[] frames = jp.readValueAs(ChannelFrame[].class);
for (ChannelFrame f : frames) {
soup._channels.put(f.getId(), f);
}
} else if ("global".equals(fieldname)) {
soup._gdata = jp.readValueAs(Serializable.class);
}
}
return soup;
}
}
}