blob: b58ce6bfe598bc93ecdb76bf55cdfeab55c4134c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.amqp_1_0.client;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.UnsignedLong;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Demo extends Util
{
private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
private static final String OPCODE = "opcode";
private static final String ACTION = "action";
private static final String MESSAGE_ID = "message-id";
private static final String VENDOR = "vendor";
private static final String LOG = "log";
private static final String RECEIVED = "received";
private static final String TEST = "test";
private static final String APACHE = "apache";
private static final String SENT = "sent";
private static final String LINK_REF = "link-ref";
private static final String HOST = "host";
private static final String PORT = "port";
private static final String SASL_USER = "sasl-user";
private static final String SASL_PASSWORD = "sasl-password";
private static final String ROLE = "role";
private static final String ADDRESS = "address";
private static final String SENDER = "sender";
private static final String SEND_MESSAGE = "send-message";
private static final String ANNOUNCE = "announce";
private static final String MESSAGE_VENDOR = "message-vendor";
private static final String CREATE_LINK = "create-link";
public static void main(String[] args)
{
new Demo(args).run();
}
public Demo(String[] args)
{
super(args);
}
@Override
protected boolean hasLinkDurableOption()
{
return false;
}
@Override
protected boolean hasLinkNameOption()
{
return false;
}
@Override
protected boolean hasResponseQueueOption()
{
return false;
}
@Override
protected boolean hasSizeOption()
{
return false;
}
@Override
protected boolean hasBlockOption()
{
return false;
}
@Override
protected boolean hasStdInOption()
{
return false;
}
@Override
protected boolean hasTxnOption()
{
return false;
}
@Override
protected boolean hasModeOption()
{
return true;
}
@Override
protected boolean hasCountOption()
{
return false;
}
@Override
protected boolean hasWindowSizeOption()
{
return false;
}
public void run()
{
try
{
final String vendor = getArgs()[0];
final String queue = "control";
String message = "";
Connection conn = newConnection();
Session session = conn.createSession();
Receiver responseReceiver;
responseReceiver = session.createTemporaryQueueReceiver();
responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
Sender s = session.createSender(queue, getWindowSize(), getMode());
Properties properties = new Properties();
properties.setMessageId(java.util.UUID.randomUUID());
properties.setReplyTo(responseReceiver.getAddress());
HashMap appPropMap = new HashMap();
ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
appPropMap.put(OPCODE, ANNOUNCE);
appPropMap.put(VENDOR, vendor);
appPropMap.put(ADDRESS,responseReceiver.getAddress());
AmqpValue amqpValue = new AmqpValue(message);
Section[] sections = { properties, appProperties, amqpValue};
final Message message1 = new Message(Arrays.asList(sections));
s.send(message1);
Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
boolean done = false;
while(!done)
{
boolean wait = true;
Message m = responseReceiver.receive(false);
if(m != null)
{
List<Section> payload = m.getPayload();
wait = false;
ApplicationProperties props = m.getApplicationProperties();
Map map = props.getValue();
String op = (String) map.get(OPCODE);
if("reset".equals(op))
{
for(Sender sender : sendingLinks.values())
{
try
{
sender.close();
Session session1 = sender.getSession();
session1.close();
session1.getConnection().close();
}
catch(Exception e)
{
e.printStackTrace();
}
}
for(Receiver receiver : receivingLinks.values())
{
try
{
receiver.close();
receiver.getSession().close();
receiver.getSession().getConnection().close();
}
catch(Exception e)
{
e.printStackTrace();
}
}
sendingLinks.clear();
receivingLinks.clear();
}
else if(CREATE_LINK.equals(op))
{
Object linkRef = map.get(LINK_REF);
String host = (String) map.get(HOST);
Object o = map.get(PORT);
int port = Integer.parseInt(String.valueOf(o));
String user = (String) map.get(SASL_USER);
String password = (String) map.get(SASL_PASSWORD);
String role = (String) map.get(ROLE);
String address = (String) map.get(ADDRESS);
System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
try{
Connection conn2 = new Connection(host, port, user, password, host);
Session session2 = conn2.createSession();
if(sendingLinks.containsKey(linkRef))
{
try
{
sendingLinks.remove(linkRef).close();
}
catch (Exception e)
{
}
}
if(receivingLinks.containsKey(linkRef))
{
try
{
receivingLinks.remove(linkRef).close();
}
catch (Exception e)
{
}
}
if(SENDER.equals(role))
{
System.err.println("%%% Creating sender (" + linkRef + ")");
Sender sender = session2.createSender(address);
sendingLinks.put(linkRef, sender);
}
else
{
System.err.println("%%% Creating receiver (" + linkRef + ")");
Receiver receiver2 = session2.createReceiver(address);
receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
receivingLinks.put(linkRef, receiver2);
}
}
catch(Exception e)
{
e.printStackTrace();
}
}
else if(SEND_MESSAGE.equals(op))
{
Sender sender = sendingLinks.get(map.get(LINK_REF));
Properties m2props = new Properties();
Object messageId = map.get(MESSAGE_ID);
m2props.setMessageId(messageId);
Map m2propmap = new HashMap();
m2propmap.put(OPCODE, TEST);
m2propmap.put(VENDOR, vendor);
ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
sender.send(m2);
Map m3propmap = new HashMap();
m3propmap.put(OPCODE, LOG);
m3propmap.put(ACTION, SENT);
m3propmap.put(MESSAGE_ID, messageId);
m3propmap.put(VENDOR, vendor);
m3propmap.put(MESSAGE_VENDOR, vendor);
Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
new AmqpValue("AMQP-"+messageId)));
s.send(m3);
}
responseReceiver.acknowledge(m);
}
else
{
for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
{
m = entry.getValue().receive(false);
if(m != null)
{
wait = false;
System.err.println("%%% Received message from " + entry.getKey());
Properties mp = m.getProperties();
ApplicationProperties ap = m.getApplicationProperties();
Map m3propmap = new HashMap();
m3propmap.put(OPCODE, LOG);
m3propmap.put(ACTION, RECEIVED);
m3propmap.put(MESSAGE_ID, mp.getMessageId());
m3propmap.put(VENDOR, vendor);
m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
new AmqpValue("AMQP-"+mp.getMessageId())));
s.send(m3);
entry.getValue().acknowledge(m);
}
}
}
if(wait)
{
try
{
Thread.sleep(500l);
}
catch (InterruptedException e)
{
e.printStackTrace(); //TODO.
}
}
}
s.close();
session.close();
conn.close();
}
catch (Connection.ConnectionException e)
{
e.printStackTrace(); //TODO.
}
catch (Sender.SenderClosingException e)
{
e.printStackTrace(); //TODO.
}
catch (Sender.SenderCreationException e)
{
e.printStackTrace(); //TODO.
}
catch (AmqpErrorException e)
{
e.printStackTrace(); //TODO.
}
}
protected boolean hasSingleLinkPerConnectionMode()
{
return false;
}
protected void printUsage(Options options)
{
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(USAGE_STRING, options );
}
}