blob: 44709fbaf3f486ea1e031fb3792220f01c0c7d99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.tools;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.google.gson.Gson;
public class DuccPubListener
implements MessageListener
{
String host;
int port;
String topic_str;
String[] agents;
String outfn = null;
int generation = 0;
int timeout = 0;
Map<String, Boolean> agentPresence = new HashMap<String, Boolean>();
int total_agents = 0;
Connection connection;
Session session;
Topic topic;
boolean do_filter = false;
public DuccPubListener (String host, int port, String topic_str, int timeout, String outfn, String[] agents)
{
this.host = host;
this.port = port;
this.topic_str = topic_str;
this.timeout = timeout;
this.outfn = outfn;
this.agents = agents;
for ( String a : agents ) {
agentPresence.put(a, false);
}
total_agents = agentPresence.size();
if ( ((topic_str.indexOf("metrics") >= 0 ) || (topic_str.indexOf("inventory") >= 0) )
&& ( total_agents > 0 ) ) {
do_filter = true;
}
}
protected void connect()
throws Throwable
{
String url = "tcp://" + host + ":" + port;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(topic_str);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(this);
connection.start();
if ( timeout > 0 ) {
try {
Thread.sleep(timeout * 1000);
} catch ( Throwable t ){
}
close();
}
}
protected void close()
throws Throwable
{
connection.close();
System.exit(0);
}
/**
* For agents - we might be looking for multiple messages. Need to search the message to see if
* one of the watch agent names is there.
*/
String receivedAgent(String msg)
{
String answer = null;
for ( String a : agents ) {
if ( agentPresence.get(a) == true ) {
continue;
}
if ( msg.indexOf(a) >= 0 ) {
agentPresence.put(a, true);
total_agents--;
answer = a;
}
}
return answer;
}
void writeString(String fn, String msg)
{
try {
FileWriter fw = new FileWriter(fn);
fw.write(msg);
fw.close();
System.out.println(fn); // for filters to know what was written
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public void onMessage(Message message)
{
// System.out.println("Got message");
try {
if ( message instanceof ObjectMessage ) {
String suffix = null;
Object obj = ((ObjectMessage)message).getObject();
Gson gson = new Gson();
String msg = gson.toJson(obj);
suffix = ".json";
if ( !do_filter ) {
String out = outfn + suffix;
if ( timeout > 0 ) out = out + "." + (++generation);
writeString(out, msg);
if ( timeout <= 0 ) close();
} else {
String agent = receivedAgent(msg);
if ( agent != null ) {
String out = outfn + "." + agent + suffix;
writeString(out, msg);
}
if ( total_agents == 0 ) close();
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
public void run()
{
try {
connect();
} catch (Throwable e) {
e.printStackTrace();
}
}
static void usage(String msg)
{
if ( msg != null ) System.out.println(msg);
System.out.println("Usage:");
System.out.println(" DuccPubListener args");
System.out.println("Where args are:");
System.out.println(" -host AMQhostname");
System.out.println(" Default is \"localhost\"");
System.out.println(" -port AMQportname");
System.out.println(" Default is 61616");
System.out.println(" -topic topic");
System.out.println(" No default, required");
System.out.println(" -output outputFileName");
System.out.println(" Default is topic");
System.out.println(" -timeout timeInSeconds");
System.out.println(" How long to listen. Default is 0. If 0, exit after first message.");
System.out.println(" -agent agentname");
System.out.println(" For agent broadcasts, which agent to listen for. Default is \"all\"");
System.exit(1);
}
public static void main(String[] args)
{
if ( args.length == 0 ) usage(null);
String[] agents = {"all"};
String amqhost = "localhost";
int amqport = 61616;
int timeout = 0;
String topic = null;
String outfn = null;
for ( int i = 0; i < args.length; ) {
if ( "-agent".startsWith(args[i]) ) {
agents = args[i+1].split(",");
i++; i++;
continue;
}
if ( "-host".startsWith(args[i]) ) {
amqhost = args[i+1];
i++; i++;
continue;
}
if ( "-port".startsWith(args[i]) ) {
try {
amqport = Integer.parseInt(args[i+1]);
} catch ( Throwable t ) {
usage("AMQ port is not numeric");
}
i++; i++;
continue;
}
if ( "-topic".startsWith(args[i]) ) {
topic = args[i+1];
i++; i++;
continue;
}
if ( "-timeout".startsWith(args[i]) ) {
try {
timeout = Integer.parseInt(args[i+1]);
} catch ( Throwable t ) {
usage("Timeout is not numeric");
}
i++; i++;
continue;
}
if ( "-output".startsWith(args[i]) ) {
outfn = args[i+1];
i++; i++;
continue;
}
usage(null);
}
if ( topic == null ) {
usage("Must specify topic");
}
if ( outfn == null ) {
outfn = topic;
}
DuccPubListener dl = new DuccPubListener(amqhost, amqport, topic, timeout, outfn, agents);
dl.run();
}
}