blob: d7eb138523cd042d25119b3c2db90873bf64beb7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.example.transport;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.url.URLSyntaxException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.UUID;
/**
* This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as
* the transport for the Client API.
*
* The Demo here runs twice:
* 1. Just to show a simple publish and receive.
* 2. To demonstrate how to use existing sockets and utilise the underlying client failover mechnaism.
*/
public class ExistingSocketConnectorDemo implements ConnectionListener
{
private static boolean DEMO_FAILOVER = false;
public static void main(String[] args) throws IOException, URLSyntaxException, AMQException, JMSException
{
System.out.println("Testing socket connection to localhost:5672.");
new ExistingSocketConnectorDemo();
System.out.println("Testing socket connection failover between localhost:5672 and localhost:5673.");
DEMO_FAILOVER = true;
new ExistingSocketConnectorDemo();
}
Connection _connection;
MessageProducer _producer;
Session _session;
String Socket1_ID = UUID.randomUUID().toString();
String Socket2_ID = UUID.randomUUID().toString();
/** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */
public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'";
public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException
{
Socket socket = SocketChannel.open().socket();
socket.connect(new InetSocketAddress("localhost", 5672));
TransportConnection.registerOpenSocket(Socket1_ID, socket);
_connection = new AMQConnection(CONNECTION);
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = _session.createConsumer(_session.createQueue("Queue"));
_producer = _session.createProducer(_session.createQueue("Queue"));
_connection.start();
if (!DEMO_FAILOVER)
{
_producer.send(_session.createTextMessage("Simple Test"));
}
else
{
// Using the Qpid interfaces we can set a listener that allows us to demonstrate failover
((AMQConnection) _connection).setConnectionListener(this);
System.out.println("Testing failover: Please ensure second broker running on localhost:5673 and shutdown broker on 5672.");
}
//We do a blocking receive here so that we can demonstrate failover.
Message message = consumer.receive();
System.out.println("Recevied :" + message);
_connection.close();
}
// ConnectionListener Interface
public void bytesSent(long count)
{
//not used in this example
}
public void bytesReceived(long count)
{
//not used in this example
}
public boolean preFailover(boolean redirect)
{
/**
* This method is called before the underlying client library starts to reconnect. This gives us the opportunity
* to set a new socket for the failover to occur on.
*/
try
{
Socket socket = SocketChannel.open().socket();
socket.connect(new InetSocketAddress("localhost", 5673));
// This is the new method to pass in an open socket for the connection to use.
TransportConnection.registerOpenSocket(Socket2_ID, socket);
}
catch (IOException e)
{
e.printStackTrace();
return false;
}
return true;
}
public boolean preResubscribe()
{
//not used in this example - but must return true to allow the resubscription of existing clients.
return true;
}
public void failoverComplete()
{
// Now that failover has completed we can send a message that the receiving thread will pick up
try
{
_producer.send(_session.createTextMessage("Simple Failover Test"));
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}