blob: 7520f909fa9468915915aecd6f84805a7251c67f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.jacob.vpu;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.ode.jacob.IndexedObject;
import org.apache.ode.jacob.Message;
import org.apache.ode.jacob.MessageListener;
import org.apache.ode.jacob.oo.Channel;
import org.apache.ode.jacob.oo.ChannelListener;
import org.apache.ode.jacob.soup.Comm;
import org.apache.ode.jacob.soup.CommChannel;
import org.apache.ode.jacob.soup.CommGroup;
import org.apache.ode.jacob.soup.CommRecv;
import org.apache.ode.jacob.soup.CommSend;
import org.apache.ode.jacob.soup.ExecutionQueue;
import org.apache.ode.jacob.soup.ExecutionQueueObject;
import org.apache.ode.jacob.soup.ReplacementMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A fast, in-memory {@link org.apache.ode.jacob.soup.ExecutionQueue} implementation.
*/
public class ExecutionQueueImpl implements ExecutionQueue {
public static ConcurrentHashMap<String, ObjectStreamClass> _classDescriptors =
new ConcurrentHashMap<String, ObjectStreamClass>();
private static final Logger LOG = LoggerFactory.getLogger(ExecutionQueueImpl.class);
private ClassLoader _classLoader;
/**
* Cached set of enqueued {@link Message} objects (i.e. those read using
* {@link #enqueueMessage(Message)}).
* These reactions are "cached"--that is it is not sent directly to the DAO
* layer--to minimize unnecessary serialization/deserialization of closures.
* This is a pretty useful optimization, as most {@link Message}s are
* enqueued, and then immediately dequeued in the next cycle. By caching
* {@link Message}s, we eliminate practically all serialization of
* these objects, the only exception being cases where the system decides to
* stop processing a particular soup despite the soup being able to make
* forward progress; this scenario would occur if a maximum processign
* time-per-instance policy were in effect.
*/
protected Set<Message> _messages = new LinkedHashSet<Message>();
protected Map<Integer, ChannelFrame> _channels = new LinkedHashMap<Integer, ChannelFrame>();
/**
* The "expected" cycle counter, use to detect database serialization
* issues.
*/
protected int _currentCycle;
protected int _objIdCounter;
private ReplacementMap _replacementMap;
protected Serializable _gdata;
private Map<Object, LinkedList<IndexedObject>> _index = new LinkedHashMap<Object, LinkedList<IndexedObject>>();
public ExecutionQueueImpl() {}
public ExecutionQueueImpl(ClassLoader classLoader) {
_classLoader = classLoader;
}
public void setClassLoader(ClassLoader classLoader) {
_classLoader = classLoader;
}
public void setReplacementMap(ReplacementMap replacementMap) {
_replacementMap = replacementMap;
}
public Map<Object, LinkedList<IndexedObject>> getIndex() {
return _index;
}
public void add(CommChannel channel) {
LOG.trace(">> add (channel={})", channel);
verifyNew(channel);
ChannelFrame cframe = new ChannelFrame(channel.getType(), ++_objIdCounter,
channel.getDescription());
_channels.put(cframe.getId(), cframe);
assignId(channel, cframe.getId());
}
public void enqueueMessage(Message message) {
LOG.trace(">> enqueueMessage (message={})", message);
_messages.add(message);
}
public Message dequeueMessage() {
LOG.trace(">> dequeueMessage ()");
Message message = null;
if (!_messages.isEmpty()) {
Iterator<Message> it = _messages.iterator();
message = it.next();
it.remove();
}
return message;
}
public void add(CommGroup group) {
LOG.trace(">> add (group={})", group);
verifyNew(group);
CommGroupFrame commGroupFrame = new CommGroupFrame(group.isReplicated());
for (Iterator<Comm> i = group.getElements(); i.hasNext();) {
Comm comm = i.next();
ChannelFrame chnlFrame = findChannelFrame(comm.getChannel().getId());
if (comm instanceof CommSend) {
if (chnlFrame.replicatedSend) {
// TODO: JACOB "bad-process" ex
throw new IllegalStateException("Send attempted on channel containing replicated send! Channel= "
+ comm.getChannel());
}
if (group.isReplicated()) {
chnlFrame.replicatedSend = true;
}
CommSend commSend = (CommSend) comm;
MessageFrame mframe = new MessageFrame(commGroupFrame, chnlFrame, commSend.getMessage());
commGroupFrame.commFrames.add(mframe);
chnlFrame.msgFrames.add(mframe);
} else if (comm instanceof CommRecv) {
if (chnlFrame.replicatedRecv) {
// TODO: JACOB "bad-process" ex
throw new IllegalStateException(
"Receive attempted on channel containing replicated receive! Channel= " + comm.getChannel());
}
if (group.isReplicated()) {
chnlFrame.replicatedRecv = true;
}
CommRecv commRecv = (CommRecv) comm;
ListenerFrame oframe = new ListenerFrame(commGroupFrame, chnlFrame, commRecv.getListener());
commGroupFrame.commFrames.add(oframe);
chnlFrame.objFrames.add(oframe);
}
}
// Match communications.
for (Iterator<Comm> i = group.getElements(); i.hasNext();) {
Comm comm = i.next();
matchCommunications(comm.getChannel());
}
}
private ChannelFrame findChannelFrame(Object id) {
ChannelFrame chnlFrame = _channels.get(id);
if (chnlFrame == null) {
throw new IllegalArgumentException("No such channel; id=" + id);
}
return chnlFrame;
}
public int cycle() {
LOG.trace(">> cycle ()");
return ++_currentCycle;
}
public String createExport(CommChannel channel) {
LOG.trace(">> createExport (channel={})", channel);
ChannelFrame cframe = findChannelFrame(channel.getId());
cframe.refCount++;
return channel.getId().toString();
}
public CommChannel consumeExport(String exportId) {
LOG.trace(">> consumeExport (exportId={})", exportId);
Integer id = Integer.valueOf(exportId);
ChannelFrame cframe = findChannelFrame(id);
cframe.refCount--;
CommChannel commChannel = new CommChannel(cframe.type);
commChannel.setId(id);
commChannel.setDescription("EXPORTED CHANNEL");
return commChannel;
}
public boolean hasReactions() {
return !_messages.isEmpty();
}
public void flush() {
LOG.trace(">> flush ()");
}
public void read(InputStream iis) throws IOException, ClassNotFoundException {
_channels.clear();
_messages.clear();
_index.clear();
ExecutionQueueInputStream sis = new ExecutionQueueInputStream(iis);
_objIdCounter = sis.readInt();
_currentCycle = sis.readInt();
int reactions = sis.readInt();
for (int i = 0; i < reactions; ++i) {
_messages.add((Message)sis.readObject());
}
int numChannels = sis.readInt();
for (int i = 0; i < numChannels; ++i) {
int objFrames = sis.readInt();
for (int j = 0; j < objFrames; ++j) {
sis.readObject();
}
int msgFrames = sis.readInt();
for (int j = 0; j < msgFrames; ++j) {
sis.readObject();
}
}
numChannels = sis.readInt();
for (int i = 0; i < numChannels; ++i) {
ChannelFrame cframe = (ChannelFrame) sis.readObject();
_channels.put(cframe.getId(), cframe);
}
_gdata = (Serializable) sis.readObject();
sis.close();
}
private void index(IndexedObject object) {
LinkedList<IndexedObject> vals = _index.get(object.getKey());
if (vals == null) {
vals = new LinkedList<IndexedObject>();
_index.put(object.getKey(), vals);
}
vals.add(object);
}
public void write(OutputStream oos) throws IOException {
flush();
ExecutionQueueOutputStream sos = new ExecutionQueueOutputStream(oos);
// XQXMLOutputStream sos = createObjectOutputStream(new OutputStreamWriter(oos));
sos.writeInt(_objIdCounter);
sos.writeInt(_currentCycle);
// Write out the reactions.
sos.writeInt(_messages.size());
for (Message m : _messages) {
sos.writeObject(m);
}
sos.writeInt(_channels.values().size());
for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) {
ChannelFrame cframe = i.next();
sos.writeInt(cframe.objFrames.size());
for (Iterator<ListenerFrame> j = cframe.objFrames.iterator(); j.hasNext();) {
sos.writeObject(j.next());
}
sos.writeInt(cframe.msgFrames.size());
for (Iterator<MessageFrame> j = cframe.msgFrames.iterator(); j.hasNext();) {
sos.writeObject(j.next());
}
}
Set<Object> referencedChannels = sos.getSerializedChannels();
for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) {
ChannelFrame cframe = i.next();
if (referencedChannels.contains(Integer.valueOf(cframe.id)) || cframe.refCount > 0) {
// skip
} else {
LOG.debug("GC Channel: {}", cframe);
i.remove();
}
}
sos.writeInt(_channels.values().size());
for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) {
ChannelFrame cframe = i.next();
LOG.debug("Writing Channel: {}", cframe);
sos.writeObject(cframe);
}
// Write the global data.
sos.writeObject(_gdata);
sos.close();
}
public boolean isComplete() {
// If we have more reactions we're not done.
if (!_messages.isEmpty()) {
return false;
}
// If we have no reactions, but there are some channels that have
// external references, we are not done.
for (Iterator<ChannelFrame> i = _channels.values().iterator(); i.hasNext();) {
if (i.next().refCount > 0) {
return false;
}
}
return true;
}
public void dumpState(PrintStream ps) {
ps.print(this.toString());
ps.println(" state dump:");
ps.println("-- GENERAL INFO");
ps.println(" Current Cycle : " + _currentCycle);
ps.println(" Num. Reactions : " + _messages.size());
if (!_messages.isEmpty()) {
ps.println("-- REACTIONS");
int cnt = 0;
for (Message m : _messages) {
ps.println(" #" + (++cnt) + ": " + m.toString());
}
}
if (!_channels.isEmpty()) {
ps.println("-- CHANNELS");
int cnt = 0;
for (ChannelFrame channel : _channels.values()) {
ps.println(" #" + (++cnt) + ": " + channel.toString());
}
}
}
private void matchCommunications(CommChannel channel) {
LOG.trace(">> matchCommunications (channel={})", channel);
ChannelFrame cframe = _channels.get(channel.getId());
while (cframe != null && !cframe.msgFrames.isEmpty() && !cframe.objFrames.isEmpty()) {
MessageFrame mframe = cframe.msgFrames.iterator().next();
ListenerFrame oframe = cframe.objFrames.iterator().next();
Message msg = Message.copyFrom(mframe.message);
msg.setTo(new org.apache.ode.jacob.ChannelRef(oframe.listener));
enqueueMessage(msg);
if (!mframe.commGroupFrame.replicated) {
removeCommGroup(mframe.commGroupFrame);
}
if (!oframe.commGroupFrame.replicated) {
removeCommGroup(oframe.commGroupFrame);
}
}
// Do some cleanup, if the channel is empty we can remove it from memory.
// if (cframe != null && cframe.msgFrames.isEmpty() &&
// cframe.objFrames.isEmpty() && cframe.refCount ==0)
// _channels.values().remove(cframe);
}
/**
* Verify that a {@link ExecutionQueueObject} is new, that is it has not
* already been added to the soup.
*
* @param so object to check.
* @throws IllegalArgumentException in case the object is not new
*/
private void verifyNew(ExecutionQueueObject so) throws IllegalArgumentException {
if (so.getId() != null)
throw new IllegalArgumentException("The object " + so + " is not new!");
}
private void assignId(ExecutionQueueObject so, Integer id) {
so.setId(id);
}
private void removeCommGroup(CommGroupFrame groupFrame) {
// Add all channels reference in the group to the GC candidate set.
for (Iterator<CommFrame> i = groupFrame.commFrames.iterator(); i.hasNext();) {
CommFrame frame = i.next();
if (frame instanceof ListenerFrame) {
assert frame.channelFrame.objFrames.contains(frame);
frame.channelFrame.objFrames.remove(frame);
} else {
assert frame instanceof MessageFrame;
assert frame.channelFrame.msgFrames.contains(frame);
frame.channelFrame.msgFrames.remove(frame);
}
}
}
public void setGlobalData(Serializable data) {
_gdata = data;
}
public Serializable getGlobalData() {
return _gdata;
}
protected static class ChannelFrame implements Externalizable {
Class<?> type;
int id;
/** External Reference Count */
int refCount;
boolean replicatedSend;
boolean replicatedRecv;
Set<ListenerFrame> objFrames = new LinkedHashSet<ListenerFrame>();
Set<MessageFrame> msgFrames = new LinkedHashSet<MessageFrame>();
public String description;
// Used for deserialization
public ChannelFrame() {
}
public ChannelFrame(Class<?> type, int id, String description) {
this.type = type;
this.id = id;
this.description = description;
}
public Integer getId() {
return Integer.valueOf(id);
}
public int getRefCount() {
return refCount;
}
public Set<ListenerFrame> getObjFrames() {
return objFrames;
}
public Set<MessageFrame> getMsgFrames() {
return msgFrames;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
type = (Class<?>)in.readObject();
id = in.readInt();
description = in.readUTF();
refCount = in.readInt();
replicatedSend = in.readBoolean();
replicatedRecv = in.readBoolean();
int cnt = in.readInt();
for (int i = 0; i < cnt; ++i) {
objFrames.add((ListenerFrame) in.readObject());
}
cnt = in.readInt();
for (int i = 0; i < cnt; ++i) {
msgFrames.add((MessageFrame) in.readObject());
}
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(type);
out.writeInt(id);
out.writeUTF(description == null ? "" : description);
out.writeInt(refCount);
out.writeBoolean(replicatedSend);
out.writeBoolean(replicatedRecv);
out.writeInt(objFrames.size());
for (Iterator<ListenerFrame> i = objFrames.iterator(); i.hasNext();) {
out.writeObject(i.next());
}
out.writeInt(msgFrames.size());
for (Iterator<MessageFrame> i = msgFrames.iterator(); i.hasNext();)
out.writeObject(i.next());
}
public String toString() {
StringBuffer buf = new StringBuffer(32);
buf.append("{CFRAME ");
buf.append(type == null ? "untyped" : type.getSimpleName());
buf.append(':');
buf.append(description);
buf.append('#');
buf.append(id);
buf.append(" refCount=");
buf.append(refCount);
buf.append(", msgs=");
buf.append(msgFrames.size());
if (replicatedSend) {
buf.append("R");
}
buf.append(", objs=");
buf.append(objFrames.size());
if (replicatedRecv) {
buf.append("R");
}
buf.append("}");
return buf.toString();
}
}
@SuppressWarnings("serial")
protected static class CommGroupFrame implements Serializable {
boolean replicated;
public Set<CommFrame> commFrames = new LinkedHashSet<CommFrame>();
// default constructor for deserialization
public CommGroupFrame() {}
public CommGroupFrame(boolean replicated) {
this.replicated = replicated;
}
}
protected static class CommFrame implements Externalizable {
CommGroupFrame commGroupFrame;
ChannelFrame channelFrame;
public CommFrame() {
}
CommFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame) {
this.commGroupFrame = commGroupFrame;
this.channelFrame = channelFrame;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
commGroupFrame = (CommGroupFrame) in.readObject();
channelFrame = (ChannelFrame) in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(commGroupFrame);
out.writeObject(channelFrame);
}
}
protected static class ListenerFrame extends CommFrame implements Externalizable {
private static final long serialVersionUID = -7212430608484116919L;
MessageListener listener;
// Used for deserialization
public ListenerFrame() {
}
public ListenerFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, MessageListener listener) {
super(commGroupFrame, channelFrame);
this.listener = listener;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
listener = (ChannelListener)in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeObject(listener);
}
}
protected static class MessageFrame extends CommFrame implements Externalizable {
private static final long serialVersionUID = -1112437852498126297L;
Message message;
// Used for deserialization
public MessageFrame() {
}
public MessageFrame(CommGroupFrame commFrame, ChannelFrame channelFrame, Message msg) {
super(commFrame, channelFrame);
this.message = msg;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
message = (Message)in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeObject(message);
}
}
/**
* DOCUMENTME.
* <p>
* Created on Feb 16, 2004 at 8:09:48 PM.
* </p>
*
* @author Maciej Szefler <a href="mailto:mbs@fivesight.com">mbs</a>
*/
private class ExecutionQueueOutputStream extends ObjectOutputStream {
private Set<Object> _serializedChannels = new HashSet<Object>();
public ExecutionQueueOutputStream(OutputStream outputStream) throws IOException {
super(new GZIPOutputStream(outputStream));
enableReplaceObject(true);
}
public Set<Object> getSerializedChannels() {
return _serializedChannels;
}
protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
if (Serializable.class.isAssignableFrom(desc.forClass())) {
writeBoolean(true);
writeUTF(desc.getName());
} else {
writeBoolean(false);
super.writeClassDescriptor(desc);
}
}
/**
* Use this method to spy on any channels that are being serialized to
* this stream.
*
* @param obj
* @return
* @throws IOException
*/
protected Object replaceObject(Object obj) throws IOException {
if (!Serializable.class.isAssignableFrom(obj.getClass())) {
throw new IllegalArgumentException("Cannot replace non Serializable instance of " + obj.getClass());
}
if (obj instanceof org.apache.ode.jacob.oo.ChannelProxy) {
CommChannel commChannel = ChannelFactory.getBackend((Channel)obj);
_serializedChannels.add(commChannel.getId());
return new ChannelRef(commChannel.getType(), (Integer) commChannel.getId());
} else if (_replacementMap != null && _replacementMap.isReplaceable(obj)) {
Object replacement = _replacementMap.getReplacement(obj);
LOG.debug("ReplacmentMap: getReplacement({}) = {}", obj, replacement);
return replacement;
}
return obj;
}
}
/**
*/
public class ExecutionQueueInputStream extends ObjectInputStream {
private Set<CommChannel> _deserializedChannels = new HashSet<CommChannel>();
public ExecutionQueueInputStream(InputStream in) throws IOException {
super(new GZIPInputStream(in));
enableResolveObject(true);
}
public Set<CommChannel> getSerializedChannels() {
return _deserializedChannels;
}
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
return Class.forName(desc.getName(), true, _classLoader);
}
protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {
boolean ser = readBoolean();
if (ser) {
String clsName = readUTF();
ObjectStreamClass cached = _classDescriptors.get(clsName);
if (cached == null) {
cached = ObjectStreamClass.lookup(Class.forName(clsName, true, _classLoader));
_classDescriptors.put(clsName, cached);
}
return cached;
}
return super.readClassDescriptor();
}
protected Object resolveObject(Object obj) throws IOException {
Object resolved;
if (obj instanceof ChannelRef) {
// We know this is a channel reference, so we have to resolve
// the channel.
ChannelRef oref = (ChannelRef) obj;
CommChannel channel = new CommChannel(oref._type);
channel.setId(oref._id);
_deserializedChannels.add(channel);
resolved = ChannelFactory.createChannel(channel, channel.getType());
} else if (_replacementMap != null && _replacementMap.isReplacement(obj)) {
resolved = _replacementMap.getOriginal(obj);
LOG.debug("ReplacementMap: getOriginal({}) = {}", obj, resolved);
} else {
resolved = obj;
}
if (resolved != null && resolved instanceof IndexedObject)
index((IndexedObject) resolved);
return resolved;
}
}
protected static final class ChannelRef implements Externalizable {
private Class<?> _type;
private Integer _id;
private ChannelRef(Class<?> type, Integer id) {
_type = type;
_id = id;
}
// Used for deserialization
public ChannelRef() {
}
public boolean equals(Object obj) {
return ((ChannelRef) obj)._id.equals(_id);
}
public int hashCode() {
return _id.hashCode();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(_type);
out.writeInt(_id.intValue());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
_type = (Class<?>)in.readObject();
_id = Integer.valueOf(in.readInt());
}
}
}