blob: aa7f55abea92fe8fa2d35d1bf1c9eb7a1bd3b754 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.apollo.amqp.test
import com.swiftmq.amqp.AMQPContext
import com.swiftmq.amqp.v100.client.Connection
import com.swiftmq.amqp.v100.client.QoS
import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue
import com.swiftmq.amqp.v100.types.AMQPString
import com.swiftmq.amqp.v100.client.ExceptionListener
class SwiftMQClientTest extends AmqpTestSupport {
test("broker") {
// val port = 5672
// val queue = "testqueue"
val queue = "/queue/testqueue"
val nMsgs = 1
val qos = QoS.AT_MOST_ONCE
val ctx = new AMQPContext(AMQPContext.CLIENT);
try {
val connection = new Connection(ctx, "127.0.0.1", port, false)
connection.setContainerId("client")
connection.setIdleTimeout(-1)
connection.setMaxFrameSize(1024*4)
connection.setExceptionListener(new ExceptionListener(){
def onException(e: Exception) {
e.printStackTrace();
}
})
connection.connect;
{
var data = "x" * 10 // 1024*20
var session = connection.createSession(10, 10)
var p = {
session.createProducer(queue, qos)
}
for (i <- 0 until nMsgs) {
var msg = new com.swiftmq.amqp.v100.messaging.AMQPMessage
var s = "Message #" + (i + 1)
println("Sending " + s)
msg.setAmqpValue(new AmqpValue(new AMQPString(s+", data: "+data)))
p.send(msg)
}
p.close()
session.close()
}
{
var session = connection.createSession(10, 10)
val c = session.createConsumer(queue, 100, qos, true, null);
// Receive messages non-transacted
for (i <- 0 until nMsgs)
{
val msg = c.receive();
if (msg == null)
msg.getAmqpValue().getValue match {
case value:AMQPString =>
println("Received: " + value.getValue());
}
if (!msg.isSettled())
msg.accept();
}
c.close()
session.close()
}
connection.close()
} catch {
case e: Exception => {
e.printStackTrace
}
}
}
}