blob: b1a4a8c93e4bdfdb2ee53666761955ff0283f227 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.agent;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.qpid.agent.binding.BindingContext;
import org.apache.qpid.agent.binding.BindingUtils;
import org.apache.qpid.agent.binding.ClassBinding;
import org.apache.qpid.agent.binding.BindingException;
import org.apache.qpid.agent.binding.MethodBinding;
import org.apache.qpid.agent.binding.ParameterBinding;
import org.apache.qpid.agent.binding.PropertyBinding;
import org.apache.qpid.agent.binding.TypeBinding;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.transport.codec.Decoder;
/**
* The main class for interacting with the QMF bus. Objects which are to be
* managed can be registered with the agent, as can classes to be exposed via
* the schema.
*/
public class Agent implements MessageListener
{
// The following are settings to configure the Agent
protected AMQConnection connection;
protected boolean sessionTransacted = false;
protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
protected String label;
protected UUID systemId;
// this list holds the objects until the agent is started
protected List managedObjects = new ArrayList();
protected List registeredClasses = new ArrayList();
// The following instance variables are not
// able to be set by the end user.
protected Session session;
protected MessageProducer prod;
protected MessageConsumer cons;
protected Queue reply;
protected BindingContext bctx = new BindingContext();
protected Map<Long, ManagedObject> objects = new Hashtable<Long, ManagedObject>();
protected long bbank;
protected long abank;
protected static final Log log = LogFactory.getLog(Agent.class);
protected volatile boolean inside = false;
protected ClassLoader classLoader = null;
public Agent()
{
systemId = UUID.randomUUID();
log.debug(String.format("Agent with uid %s created", systemId
.toString()));
}
public Agent(String label, UUID systemId)
{
this.systemId = systemId;
this.label = label;
log.debug(String.format("Agent with name %s and uid %s created", label,
systemId.toString()));
}
public void register(ManagedObject managedObject)
{
Class managedClass = managedObject.getObjectClass();
long id = managedObject.getId();
ClassBinding cb = bctx.register(managedClass);
managedObject.setManagedClassName(cb.getName());
managedObject.setManagedPackageName(cb.getPackage());
log.debug(String.format(
"Added managed object id '%d' for package '%s' class '%s'", id,
managedObject.getManagedPackageName(), managedObject
.getManagedClassName()));
objects.put(id, managedObject);
managedObjects.add(managedObject);
}
public void registerClass(Class cls)
{
bctx.register(cls);
if (!registeredClasses.contains(cls))
{
registeredClasses.add(cls);
}
}
/**
* Stops the agents connection to the bus
*/
public void stop()
{
try
{
cons.close();
prod.close();
connection.stop();
connection.close();
session.close();
} catch (JMSException e)
{
log.error("Exception:", e);
}
}
/**
* Starts up the agent. Many bean containers may call this by default which
* aids in deployment
*/
public void start()
{
log.debug(String.format("Agent with uid %s and name %s starting",
systemId.toString(), label));
for (Object clsName : registeredClasses.toArray())
{
try
{
Class cls = null;
if (String.class.isAssignableFrom(clsName.getClass()))
{
cls = getClass(clsName.toString());
} else
{
cls = (Class) clsName;
}
this.registerClass(cls);
} catch (Exception e)
{
log.error("Could not register class " + clsName);
}
}
for (Object obj : managedObjects.toArray())
{
this.register((ManagedObject) obj);
}
try
{
session = connection.createSession(sessionTransacted,
acknowledgeMode);
reply = session
.createQueue(String
.format(
"direct://amq.direct//%s-%s?exclusive='True'&autodelete='True'",
label, systemId));
cons = session.createConsumer(reply);
cons.setMessageListener(this);
prod = session.createProducer(null);
} catch (JMSException e)
{
throw new AgentException(e);
}
attachRequest(label, systemId);
try
{
connection.start();
} catch (JMSException e)
{
throw new AgentException(e);
}
}
/**
* Send an event object to the bus
*/
public void raiseEvent(Object value, EventSeverity sev)
{
log.debug(String.format("Sending event of class %s with Severity %s",
value.getClass(), sev.ordinal()));
BBEncoder enc = this.init('e');
ClassBinding cb = bctx.getClassBinding(value.getClass());
String pkg = cb.getPackage();
String cls = cb.getName();
enc.writeStr8(pkg);
enc.writeStr8(cls);
enc.writeBin128(cb.getSchemaHash());
long now = System.currentTimeMillis() * 1000000;
enc.writeInt64(now);
enc.writeUint8((short) sev.ordinal());
for (PropertyBinding p : cb.getProperties())
{
p.getType().encode(enc, BindingUtils.get(p, value));
}
send(
String.format("console.event.%d.%d.%s.%s", bbank, abank, pkg,
cls), enc);
}
public void onMessage(Message message)
{
if (inside)
{
new Throwable().printStackTrace();
}
inside = true;
Decoder dec = readBody(message);
Destination replyTo;
try
{
replyTo = message.getJMSReplyTo();
} catch (JMSException e)
{
throw new AgentException(e);
}
byte[] magic = dec.readBytes(3);
if (magic[0] != 'A' || magic[1] != 'M' || magic[2] != '2')
{
throw new AgentException("bad magic: " + new String(magic));
}
short op = dec.readUint8();
long seq = dec.readUint32();
log.debug("Message recieved: " + (char) op);
switch (op)
{
case 'a':
this.handleAgentAttach(seq, replyTo, dec);
break;
case 'G':
this.handleGetQuery(seq, replyTo, dec);
break;
case 'M':
this.handleMethodRequest(seq, replyTo, dec);
break;
case 'S':
this.handleSchemaRequest(seq, replyTo, dec);
break;
case 'x':
// TODO
break;
default:
throw new IllegalArgumentException("opcode: " + ((char) op));
}
inside = false;
}
protected ClassBinding getClassBinding(ManagedObject mobj)
{
return bctx.getClassBinding(mobj.getObjectClass());
}
private byte[] ensure(int capacity, byte[] body, int size)
{
if (capacity > body.length)
{
byte[] copy = new byte[capacity];
System.arraycopy(body, 0, copy, 0, size);
body = copy;
}
return body;
}
private Decoder readBody(Message message)
{
BytesMessage msg = (BytesMessage) message;
BBDecoder dec = new BBDecoder();
byte[] buf = new byte[1024];
byte[] body = new byte[1024];
int size = 0;
int n;
try
{
while ((n = msg.readBytes(buf)) > 0)
{
body = ensure(size + n, body, size);
System.arraycopy(buf, 0, body, size, n);
size += n;
}
} catch (JMSException e)
{
throw new AgentException(e);
}
dec.init(ByteBuffer.wrap(body, 0, size));
return dec;
}
protected void handleAgentAttach(long seq, Destination replyTo, Decoder dec)
{
log.debug("Agent Attach Message");
bbank = dec.readUint32();
abank = dec.readUint32();
try
{
MessageConsumer mc = session
.createConsumer(session
.createQueue(String
.format(
"management://qpid.management//%s-%s?routingkey='agent.%d.%d'&exclusive='True'&autodelete='True'",
label, systemId, bbank, abank)));
mc.setMessageListener(this);
} catch (JMSException e)
{
throw new AgentException(e);
}
for (String packageName : bctx.getPackages())
{
packageIndication(packageName);
}
for (ClassBinding cb : bctx.getAllBindings())
{
classIndication(cb);
}
for (ManagedObject mo : objects.values())
{
content('i', seq, null, mo);
}
}
protected void handleMethodRequest(long seq, Destination replyTo,
Decoder dec)
{
dec.readUint64(); // first part of object-id
long id = dec.readUint64();
ManagedObject mo = objects.get(id);
if (mo == null)
{
methodResponse(seq, replyTo, 1, String.format(
"no such object: 0x%x", id));
} else
{
dec.readStr8(); // pkg
dec.readStr8(); // cls
dec.readBin128(); // hash
String mname = dec.readStr8();
ClassBinding cls = getClassBinding(mo);
MethodBinding method = cls.getMethod(mname);
if (method == null)
{
methodResponse(seq, replyTo, 2, String.format(
"no such method: %s", mname));
} else
{
log.trace("Handle method: " + method.getName());
List<ParameterBinding> params = method.getInParameters();
Object[] args = new Object[params.size()];
int idx = 0;
for (ParameterBinding p : params)
{
TypeBinding typeBinding = p.getType();
log
.trace(String
.format(
"Decoding parameter with type %s ref package %s ref class %s ",
typeBinding.getCode(), typeBinding
.getRefPackage(),
typeBinding.getRefClass()));
args[idx++] = typeBinding.decode(dec);
log.trace("Done");
}
try
{
Object[] result = mo.invoke(method, args);
methodResponse(seq, replyTo, 0, null, method, result);
} catch (BindingException ex)
{
log
.error(
String
.format(
"An exception occured invoking method %s. Stack trace sent to console.",
method.getName()), ex);
StringWriter str = new StringWriter();
PrintWriter writer = new PrintWriter(str);
ex.printStackTrace(writer);
writer.flush();
methodResponse(seq, replyTo, 7, str.toString());
}
log.trace("Done with method: " + method.getName());
}
}
}
protected void handleGetQuery(long seq, Destination replyTo, Decoder dec)
{
Map<String, Object> data = dec.readMap();
if (data.containsKey("_objectid"))
{
long objId = (Long) data.get("_objectid");
log.debug("Get Request message for object id " + objId);
ManagedObject mo = objects.get(objId);
if (mo == null)
{
methodResponse(seq, replyTo, 1, String.format(
"no such object: 0x%x", objId));
} else
{
content('g', seq, replyTo, mo);
}
} else if (data.containsKey("_class"))
{
String className = (String) data.get("_class");
String packageName = (String) data.get("_package");
log.debug(String.format(
"Get Request message for package '%s' class '%s'",
packageName, className));
for (ManagedObject mo : objects.values())
{
if (mo.getManagedClassName().equals(className))
{
if ((packageName == null) || packageName.equals("")
|| packageName.equals(mo.getManagedPackageName()))
{
content('g', seq, replyTo, mo);
}
}
}
} else
{
for (ManagedObject mo : objects.values())
{
content('g', seq, replyTo, mo);
}
}
complete(seq, replyTo);
}
protected void handleSchemaRequest(long seq, Destination replyTo,
Decoder dec)
{
String pkg = dec.readStr8();
String cls = dec.readStr8();
log.debug(String.format(
"SchemaRequest message for package '%s' class '%s'", pkg, cls));
ClassBinding cb = bctx.getClassBinding(pkg, cls);
if (cb == null)
{
throw new AgentException("no such class: " + pkg + ", " + cls);
}
schemaResponse(seq, cb);
}
protected BBEncoder init(char opcode)
{
return init(opcode, 0);
}
protected BBEncoder init(char opcode, long sequence)
{
BBEncoder enc = new BBEncoder(1024);
enc.init();
enc.writeUint8((short) 'A');
enc.writeUint8((short) 'M');
enc.writeUint8((short) '2');
enc.writeUint8((short) opcode);
enc.writeUint32(sequence);
return enc;
}
protected void send(BBEncoder enc)
{
send("broker", enc);
}
protected void send(Destination dest, BBEncoder enc)
{
try
{
byte[] buf = new byte[1024];
BytesMessage msg = session.createBytesMessage();
ByteBuffer slice = enc.segment();
while (slice.hasRemaining())
{
int n = Math.min(buf.length, slice.remaining());
slice.get(buf, 0, n);
msg.writeBytes(buf, 0, n);
}
msg.setJMSReplyTo(reply);
// ???: I assume this is thread safe.
prod.send(dest, msg);
} catch (JMSException e)
{
throw new AgentException(e);
}
}
protected void send(String routingKey, BBEncoder enc)
{
try
{
send(session
.createQueue("management://qpid.management//?routingkey='"
+ routingKey + "'"), enc);
} catch (JMSException e)
{
throw new AgentException(e);
}
}
protected void attachRequest(String label, UUID systemId)
{
BBEncoder enc = init('A');
enc.writeStr8(label);
enc.writeUuid(systemId);
enc.writeUint32(0);
enc.writeUint32(0);
send(enc);
}
protected void packageIndication(String pkg)
{
BBEncoder enc = init('p');
enc.writeStr8(pkg);
send(enc);
}
protected void classIndication(ClassBinding cb)
{
BBEncoder enc = init('q');
enc.writeUint8(cb.getKind());
enc.writeStr8(cb.getPackage());
enc.writeStr8(cb.getName());
enc.writeBin128(cb.getSchemaHash()); // schema hash?
send(enc);
}
protected void schemaResponse(long seq, ClassBinding cb)
{
BBEncoder enc = init('s', seq);
cb.encode(enc);
send(enc);
}
protected void content(char c, long seq, Destination dest, ManagedObject mo)
{
BBEncoder enc = init(c, seq);
ClassBinding cb = getClassBinding(mo);
String pkg = cb.getPackage();
String cls = cb.getName();
enc.writeStr8(pkg);
enc.writeStr8(cls);
enc.writeBin128(cb.getSchemaHash());
long now = System.currentTimeMillis() * 1000000;
enc.writeUint64(now);
enc.writeUint64(now);
enc.writeUint64(0);
enc.writeUint64(0x0000FFFFFFFFFFFFL & ((bbank << 28) | abank));
enc.writeUint64(mo.getId());
for (PropertyBinding p : cb.getProperties())
{
p.getType().encode(enc, mo.get(p));
}
if (dest == null)
{
send(String.format("console.obj.%d.%d.%s.%s", bbank, abank, pkg,
cls), enc);
} else
{
send(dest, enc);
}
}
protected void complete(long seq, Destination dest)
{
BBEncoder enc = init('z', seq);
enc.writeUint32(0);
enc.writeStr8("");
send(dest, enc);
}
protected void methodResponse(long seq, Destination dest, int status,
String text)
{
methodResponse(seq, dest, status, text, null, null);
}
protected void methodResponse(long seq, Destination dest, int status,
String text, MethodBinding method, Object[] result)
{
BBEncoder enc = init('m', seq);
enc.writeUint32(status);
enc.writeStr16(text == null ? "" : text);
if (method != null)
{
int idx = 0;
for (ParameterBinding p : method.getOutParameters())
{
p.getType().encode(enc, result[idx++]);
}
}
send(dest, enc);
}
protected Class getClass(String className)
{
try
{
if (classLoader != null)
{
return classLoader.loadClass(className);
} else
{
return Class.forName(className);
}
} catch (ClassNotFoundException e)
{
throw new AgentException(String.format(
"No class named %s was found", className), e);
}
}
public String getLabel()
{
return label;
}
public void setLabel(String label)
{
this.label = label;
}
public AMQConnection getConnection()
{
return connection;
}
public void setConnection(AMQConnection connection)
{
this.connection = connection;
}
public boolean isSessionTransacted()
{
return sessionTransacted;
}
public void setSessionTransacted(boolean sessionTransacted)
{
this.sessionTransacted = sessionTransacted;
}
public void setManagedObjects(List objectList)
{
this.managedObjects = objectList;
}
public List getManagedObjects()
{
return managedObjects;
}
public void setRegisteredClasses(List objectList)
{
this.registeredClasses = objectList;
}
public List getRegisteredClasses()
{
return this.registeredClasses;
}
public static void main(String[] args) throws Exception
{
String broker = args[0];
String name = args[1];
String url = String.format(
"amqp://guest:guest@/?brokerlist='tcp://%s'", broker);
AMQConnection conn = new AMQConnection(url);
Agent agent = new Agent(name, UUID.randomUUID());
agent.setConnection(conn);
for (int i = 2; i < args.length; i++)
{
Class<?> cls = Class.forName(args[i]);
agent.register(new ManagedPOJO(cls.newInstance()));
}
agent.start();
while (true)
{
Thread.sleep(1000);
}
}
}