PROTON-1362: remove previously-deprecated Messenger
diff --git a/docs/markdown/index.md b/docs/markdown/index.md
deleted file mode 100644
index 5262c44..0000000
--- a/docs/markdown/index.md
+++ /dev/null
@@ -1,27 +0,0 @@
-
-Proton is a library for speaking AMQP, including:
-
-- The AMQP [Messenger API](messenger/index.html), a simple but powerful interface to send and receive
- messages over AMQP.
-- The [AMQP Protocol Engine](engine/engine.html), a succinct encapsulation of the full
- AMQP protocol machinery.
-
-Proton is designed for maximum embeddability:
-
-- minimal dependencies
-- minimal assumptions about application threading model
-
-Proton is designed to scale up and down:
-
-- transparently supports both simple peer to peer messaging and complex
-globally federated topologies
-
-Proton is multi-lingual:
-
-- Proton-C - a C implementation with lanuage bindings in Python, Php, Perl,
-Ruby, and Java (via JNI).
-- Proton-J - a pure Java implementation
-
-Please see http://qpid.apache.org/proton for a more info.
-
-
diff --git a/docs/markdown/messenger/addressing-and-routing.md b/docs/markdown/messenger/addressing-and-routing.md
deleted file mode 100644
index 9714e1e..0000000
--- a/docs/markdown/messenger/addressing-and-routing.md
+++ /dev/null
@@ -1,210 +0,0 @@
-
-Messenger Addressing and Routing
-=================================================
-
-
-Addressing
--------------------------
-
-An address has the following form:
-
- [ amqp[s]:// ] [user[:password]@] domain [/[name]]
-
-Where domain can be one of:
-
- host | host:port | ip | ip:port | name
-
-The following are valid examples of addresses:
-
- * example.org
- * example.org:1234
- * amqp://example.org
- * amqps://example.org
- * example.org/incoming
- * amqps://example.org/outgoing
- * amqps://fred:trustno1@example.org
- * 127.0.0.1:1234
- * amqps://127.0.0.1:1234
-
-The "/name" part of the address, that optionally follows
-the domain, is not used by the Messenger library.
-For example, if a receiver subscribes to
-
- amqp://~0.0.0.0:5678
-
-Then it will receive messages sent to
-
- amqp://~0.0.0.0:5678
-as well as
- amqp://~0.0.0.0:5678/foo
-
-
-Likewise, if the receiver subscribes to
-
- amqp://~0.0.0.0:5678/foo
-
-it will receive messages addressed to
-
- amqp://~0.0.0.0:5678/foo
- amqp://~0.0.0.0:5678
-
-and
-
- amqp://~0.0.0.0:5678/bar
-
-
-
-
-<br/>
-
-Routing
-------------------------------
-
-### Pattern Matching, Address Translation, and Message Routing ###
-
-The Messenger library provides message routing capability
-with an address translation table. Each entry in the table
-consists of a *pattern* and a *translation*.
-
-You store a new route entry in the table with the call:
-
- pn_messenger_route(messenger, pattern, translation);
-
-
-The address of each outgoing message is compared to the
-table's patterns until the first matching pattern is found,
-or until all patterns have failed to match.
-
-If no pattern matches, then Messenger will send (or attempt
-to send) your message with the address as given.
-
-If a pattern does match your outgoing message's address, then
-Messenger will create a temporary address by transforming
-your message's address. Your message will be sent to the
-transformed address, but **(note!)** the address on your
-outgoing message will not be changed. The receiver will see
-the original, not the transformed address.
-
-
-<br/>
-
-### Two Translation Mechanisms ###
-
-
-Messenger uses two mechanisms to translate addresses.
-The first is simple string substitution.
-
-
- pattern: COLOSSUS
- translation: amqp://0.0.0.0:6666
- input addr: COLOSSUS
- result: amqp://0.0.0.0:6666
-
-
-The second mechanism is wildcard/variable substitution.
-A wildcard in the pattern matches all or part of your
-input address. The part of your input address that matched
-the wildcard is stored. The matched value is then inserted
-into your translated address in place of numbered variables:
-$1, $2, and so on, up to a maximum of 64.
-
-There are two wildcards: * and % .
-The rules for matching are:
-
- * matches anything
- % matches anything but a /
- other characters match themselves
- the whole input addr must be matched
-
-
-Examples of wildcard matching:
-
- pattern: /%/%/%
- translation: $1x$2x$3
- input addr: /foo/bar/baz
- result: fooxbarxbaz
-
- pattern: *
- translation: $1
- inout addr: /foo/bar/baz
- result: /foo/bar/baz
-
- pattern: /*
- translation: $1
- input addr: /foo/bar/baz
- result: foo/bar/baz
-
- pattern: /*baz
- translation: $1
- input addr: /foo/bar/baz
- result: foo/bar/
-
- pattern: /%baz
- translation: $1
- input addr: /foo/bar/baz
- result: FAIL
-
- pattern: /%/baz
- translation: $1
- input addr: /foo/bar/baz
- result: FAIL
-
- pattern: /%/%/baz
- translation: $1
- input addr: /foo/bar/baz
- result: foo
-
- pattern: /*/baz
- translation: $1
- input addr: /foo/bar/baz
- result: foo/bar
-
-
-Examples of route translation usage:
-
- pattern: foo
- translation: amqp://foo.com
- explanation: Any message sent to "foo" will be routed to "amqp://foo.com"
-
-
- pattern: bar/*
- translation: amqp://bar.com/$1
- explanation: Any message sent to bar/<path> will be routed to the corresponding path within the amqp://bar.com domain.
-
-
- pattern: amqp:*
- translation: amqps:$1
- explanation: Route all messages over TLS.
-
-
- pattern: amqp://foo.com/*
- translation: amqp://user:password@foo.com/$1
- explanation: Supply credentials for foo.com.
-
-
- pattern: amqp://*
- translation: amqp://user:password@$1
- explanation: Supply credentials for all domains.
-
-
- pattern: amqp://%/*
- translation: amqp://user:password@proxy/$1/$2
- explanation: Route all addresses through a single proxy while preserving the
- original destination.
-
-
- pattern: *
- translation: amqp://user:password@broker/$1
- explanation: Route any address through a single broker.
-
-
-
-<br/>
-
-### First Match Wins ###
-
-If you create multiple routing rules, each new rule is appended
-to your Messenger's list. At send-time, Messenger looks down the
-list, and the first rule that matches your outgoing message's
-address wins. Thus, when creating your routing rules, you should
-create them in order of most specific first, most general last.
diff --git a/docs/markdown/messenger/index.md b/docs/markdown/messenger/index.md
deleted file mode 100644
index e8ccaa4..0000000
--- a/docs/markdown/messenger/index.md
+++ /dev/null
@@ -1,13 +0,0 @@
-Proton Messenger Documentation
-==========================================
-
-Proton Messenger is a high-level API that lets you build simple but powerful messaging systems.
-
-- Use the [Linux Quick Start](quick-start-linux.html) to download, build, and run your first Messenger example in two minutes.
-
-- Examples and explanations of Messenger's [Sending and Receiving](sending-and-receiving.html) capabilities.
-
-- Use [Message Disposition](message-disposition.html) functionality to create reliable messaging systems.
-
-- Messenger's [Addressing and Routing](addressing-and-routing.html) capabilities allow you to separate application code from installation-specific configuration information.
-
diff --git a/docs/markdown/messenger/message-disposition.md b/docs/markdown/messenger/message-disposition.md
deleted file mode 100644
index bf66d2e..0000000
--- a/docs/markdown/messenger/message-disposition.md
+++ /dev/null
@@ -1,196 +0,0 @@
-Message Disposition
-===============================
-
-
-Messenger disposition operations allow a receiver to accept or
-reject specific messages, or ranges of messages. Senders can
-then detect the disposition of their messages.
-
-
-Message States
----------------------------
-
-Messages have one of four different states:
- `PN_STATUS_UNKNOWN`
- `PN_STATUS_PENDING`
- `PN_STATUS_ACCEPTED`
- `PN_STATUS_REJECTED`
-
-<br/>
-
-
-Windows and Trackers
-----------------------------
-
-<br/>
-
-Messenger does not track the disposition of every message that
-it sends or receives. To set (or get) the disposition of a
-message, that message must be within your incoming (or outgoing)
-window.
-
-( I will use the incoming direction as the example. The outgoing
-direction works similarly. )
-
-When you call
-
- pn_messenger_set_incoming_window(messenger, window_size);
-
-you have only declared the window size. The window is not yet
-created. The window will be created when you issue your first
-call to
-
- pn_messenger_get(messenger, msg);
-
-And the window will be further populated only by further calls to
-pn_messenger_get().
-
-
-
-
-
-
-
-### Receiving ###
-
-To explicitly set or get message dispositions, your messenger
-must set a positive size for its incoming window:
-
- pn_messenger_set_incoming_window(messenger, N);
-
-You can implicity accept messages by simply letting enough
-new messages arrive. As older messages pass beyond the threshold
-of your incoming window size, they will be automatically
-accepted. Thus, if you want to automatically accept all
-messages as they arrive, you can set your incoming window
-size to 0.
-
-To exercise *explicit* control over particular messages or ranges
-of messages, the receiver can use trackers. The call
-
- pn_messenger_incoming_tracker(messenger);
-
-will return a tracker for the message most recently returned
-by a call to
-
- pn_messenger_get(messenger, message);
-With a message that is being tracked, the messenger can accept
-(or reject) that individual message:
-
- pn_messenger_accept(messenger, tracker, 0);
- pn_messenger_reject(messenger, tracker, 0);
-
-Or it can accept (or reject) the tracked message as well as all older
-messages back to the limit of the incoming window:
-
- pn_messenger_accept(messenger, tracker, PN_CUMULATIVE);
- pn_messenger_reject(messenger, tracker, PN_CUMULATIVE);
-
-Once a message is accepted or rejected, its status can no longer
-be changed, even if you have a separate tracker associated with it.
-
-
-
-<br/>
-
-###When to Accept###
-
-Although you *can* accept messages implicitly by letting them fall
-off the edge of your incoming window, you *shouldn't*. Message
-disposition is an important form of communication to the sender.
-The best practice is to let the sender know your response, by
-explicit acceptance or rejection, as soon as you can. Implicitly
-accepting messages by allowing them to fall off the edge of the
-incoming window could delay your response to the sender for an
-unpredictable amount of time.
-
-A nonzero window size places a limit on
-how much state your Messenger needs to track.
-
-<br/>
-
-###Accepting by Accident####
-
-If you allow a message to "fall off the edge" of your incoming
-window before you have explicitly accepted or rejected it, then
-it will be accepted automatically.
-
-But since your incoming window is only filled by calls to
-
- pn_messenger_get(messenger, msg);
-
-messages cannot be forced to fall over the edge by simply
-receiving more messages. Messages will not be forced over the
-edge of the incoming window unless you make too many calls to
-`pn_messenger_get()` without explicitly accepting or rejecting
-the messages.
-
-Your application should accept or reject each message as soon
-as practical after getting and processing it.
-
-
-
-
-<br/>
-<br/>
-
-
-
-### Sending ###
-
-A sender can learn how an individual message has been received
-if it has a positive outgoing window size:
-
- pn_messenger_set_outgoing_window(messenger, N);
-
-and if a tracker has been associated with that message in question.
-This call:
-
- pn_messenger_outgoing_tracker(messenger);
-
-will return a tracker for the message most recently given to:
-
- pn_messenger_put(messenger, message);
-
-To later find the status of the individual tracked message, you can call:
-
- pn_messenger_status(messenger, tracker);
-
-The returned value will be one of
-
-* `PN_STATUS_ACCEPTED`
-* `PN_STATUS_REJECTED` , or
-* `PN_STATUS_PENDING` - If the receiver has not disposed the message yet.
-
-
-If either the sender or the receiver simply declares the message (or range of messages) to
-be settled, with one of these calls:
-
- pn_messenger_settle(messenger, tracker, 0);
- pn_messenger_settle(messenger, tracker, PN_CUMULATIVE);
-
-then the sender will see `PN_STATUS_PENDING` as the status of any
-settled messages.
-
-<br/>
-
-
-### Message Rejection ###
-If a message is rejected by the receiver, it does not mean that
-the message was malformed. Malformed messages cannot be sent.
-Even messages with no content are valid messages.
-Rejection by a receiver should be understood as the receiver
-saying "I don't want this." or possibly "I don't want this *yet*."
-depending on your application.
-The sender could decide to try sending the same message again later,
-or to send the message to another receiver, or to discard it.
-
-The AMQP 1.0 specification permits a distinction
-between *rejecting* the message, and *releasing* the message,
-but the Proton library does not expose the *releasing*
-disposition.
-
-
-
-
-
diff --git a/docs/markdown/messenger/quick-start-linux.md b/docs/markdown/messenger/quick-start-linux.md
deleted file mode 100644
index e8ef466..0000000
--- a/docs/markdown/messenger/quick-start-linux.md
+++ /dev/null
@@ -1,73 +0,0 @@
-Linux Proton Messenger Quick Start
-==============================================
-
-
-On a Linux system, these instructions take you from
-zero to running your first example code. You will
-need root privileges for one of the commands.
-
-
-
-
-Prerequisite Packages
----------------------------------
-
-For a minimum build, you will need packages installed on your
-box for :
-
- subversion
- gcc
- cmake
- libuuid-devel
-
-
-
-Quick Start Commands
----------------------------
-
- svn co http://svn.apache.org/repos/asf/qpid/proton/trunk proton
- cd ./proton
- mkdir ./build
- cd ./build
- cmake ..
- make all
- # Become root and go to your build dir.
- make install
- # Stop being root.
- # Now let's see if it works.
- cd ./proton-c/examples/messenger/c
- ./recv &
- ./send
- # You're done ! ( Kill that recv process. )
- # The output you should see:
-
- Address: amqp://0.0.0.0
- Subject: (no subject)
- Content: "Hello World!"
-
-
-
-
-
-Notes
-----------------------------
-
-1. If you will be editing and checking in code from this tree,
- replace the "svn co" line with this:
-
- svn co https://svn.apache.org/repos/asf/qpid/proton/trunk
-
- You must check out through https, or you will not be able to
- check in code changes from your tree.
-
-
-2. The recv application in the example defaults to the same port
- as the qpid demon. If you happen to have that demon running,
- and using the default port, the recv app above will fail.
-
-
-3. If you don't have root privileges, you can still do the
- "make install" step by setting a non-standard prefix, thus:
- cmake -DCMAKE_INSTALL_PREFIX=/my/path ..
-
-
diff --git a/docs/markdown/messenger/sending-and-receiving.md b/docs/markdown/messenger/sending-and-receiving.md
deleted file mode 100644
index 555075e..0000000
--- a/docs/markdown/messenger/sending-and-receiving.md
+++ /dev/null
@@ -1,144 +0,0 @@
-Sending and Receiving Messages
-=======================================================
-
-The Proton Messenger API provides a mixture of synchronous
-and asynchronous operations to give you flexibility in
-deciding when you application should block waiting for I/O,
-and when it should not.
-
-
-When sending messages, you can:
-
-* send a message immediately,
-* enqueue a message to be sent later,
-* block until all enqueued messages are sent,
-* send enqueued messages until a timeout occurs, or
-* send all messages that can be sent without blocking.
-
-When receiving messages, you can:
-
-* receive messages that can be received without blocking,
-* block until at least one message is received,
-* receive no more than a fixed number of messages.
-
-
-
-Functions
-------------------------------
-
-* `pn_messenger_put(messenger)`
-
- Stage message for later transmission, and possibly
- transmit any messages currently staged that are not
- blocked.
- This function will not block.
-
-
-
-* `pn_messenger_send(messenger)`
-
- If messenger timeout is negative (initial default),
- block until all staged messages have been sent.
-
- If messenger timeout is 0, send all messages that
- can be sent without blocking.
-
- If messenger timeout is positive, send all messages
- that can be sent until timeout expires.
-
- *note: If there are any messages that can be received
- when `pn_messenger_send()` is called, they will
- be received.*
-
-
-
-* `pn_messenger_get(messenger, msg)`
-
- Dequeue the head of the incoming message queue to
- your application.
- This call does not block.
-
-
-
-* `pn_messenger_recv(messenger)`
-
- If messenger timeout is negative(initial default),
- block until at least one message is received.
-
- If timeout is 0, receive whatever messages are available,
- but do not block.
-
- If timeout is positive, receive available messages until
- timeout expires.
-
- *note: If there are any unblocked outgoing messages,
- they will be sent during this call.*
-
-
-
-
-
-Examples
-------------------------------
-
-* send a message immediately
-
- pn_messenger_put(messenger, msg);
- pn_messenger_send(messenger);
-
-
-
-* enqueue a message to be sent later
-
- pn_messenger_put(messenger, msg);
-
- *note:
- The message will be sent whenever it is not blocked and
- the Messenger code has other I/O work to be done.*
-
-
-
-* block until all enqueued messages are sent
-
- pn_messenger_set_timeout(messenger, -1);
- pn_messenger_send(messenger);
-
- *note:
- A negative timeout means 'forever'. That is the initial
- default for a messenger.*
-
-
-
-* send enqueued messages until a timeout occurs
-
- pn_messenger_set_timeout(messenger, 100); /* 100 msec */
- pn_messenger_send(messenger);
-
-
-
-* send all messages that can be sent without blocking
-
- pn_messenger_set_timeout(messenger, 0);
- pn_messenger_send(messenger);
-
-
-
-* receive messages that can be received without blocking
-
- pn_messenger_set_timeout(messenger, 0);
- pn_messenger_recv(messenger, -1);
-
-
-* block until at least one message is received
-
- pn_messenger_set_timeout(messenger, -1);
- pn_messenger_recv(messenger, -1);
-
- *note: -1 is initial messenger default.*
-
-
-
-* receive no more than a fixed number of messages
-
- pn_messenger_recv(messenger, 10);
-
diff --git a/examples/java/messenger/README.txt b/examples/java/messenger/README.txt
deleted file mode 100644
index 20d3788..0000000
--- a/examples/java/messenger/README.txt
+++ /dev/null
@@ -1,22 +0,0 @@
-This directory contains java examples that use the messenger API.
-Based on the python examples in ../py
-
- Send.java - a simple example of using the messenger API to send messages
- Recv.java - a simple example of using the messenger API to receive messages
-
-Note that depending on the address passed into these scripts, you can
-use them in either a peer to peer or a brokered scenario.
-
-For brokered usage:
- java Recv.class amqp://<broker>/<queue>
- java Send.class -a amqp://<broker>/<queue> msg_1 ... msg_n
-
-For peer to peer usage:
- # execute on <host> to receive messages from all local network interfaces
- java Recv.class amqp://~0.0.0.0
- java Send.class -a amqp://<host> msg_1 ... msg_n
-
-Or, use the shell scripts "recv" and "send" to run the java programs:
-recv [-v] [-n MAXMESSAGES] [-a ADDRESS] ... [-a ADDRESS]
-send [-a ADDRESS] [-s SUBJECT] MESSAGE ... MESSAGE
-
diff --git a/examples/java/messenger/pom.xml b/examples/java/messenger/pom.xml
deleted file mode 100644
index a23a4a3..0000000
--- a/examples/java/messenger/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-project</artifactId>
- <version>0.17.0-SNAPSHOT</version>
- <relativePath>../../..</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>proton-j-messenger-example</artifactId>
- <name>proton-j-messenger-example</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>proton-j</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- </dependencies>
-
- <scm>
- <url>http://svn.apache.org/viewvc/qpid/proton/</url>
- </scm>
-</project>
diff --git a/examples/java/messenger/recv b/examples/java/messenger/recv
deleted file mode 100755
index 862700c..0000000
--- a/examples/java/messenger/recv
+++ /dev/null
@@ -1,31 +0,0 @@
-#! /bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Usage: recv [-v] [-n MAXMESSAGES] [-a ADDRESS]*"
-# Subscribes to the given amqp addresses (by default, to amqp://localhost/test),
-# and prints messages received, upt to MAXMESSAGES.
-# Prints message headers and body; -v means print all message properties.
-
-HERE=$(cd $(dirname $0); pwd)
-TOP=$(cd $(dirname $0); cd ../../..; pwd)
-LIBS=$HERE/target/classes:$TOP/proton-j/target/classes
-JFLAGS="-Djava.util.logging.config.file=$HERE/recv.trace.props -cp $LIBS"
-java -cp $LIBS org.apache.qpid.proton.example.Recv "$@"
-
diff --git a/examples/java/messenger/send b/examples/java/messenger/send
deleted file mode 100755
index e7b4b67..0000000
--- a/examples/java/messenger/send
+++ /dev/null
@@ -1,29 +0,0 @@
-#! /bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Usage: send [-a ADDRESS] [-s SUBJECT] MESSAGE ... MESSAGE
-# sends each arg as a text-message to the given adress (by default, to amqp://localhost/test)
-
-HERE=$(cd $(dirname $0); pwd)
-TOP=$(cd $(dirname $0); cd ../../..; pwd)
-LIBS=$HERE/target/classes:$TOP/proton-j/target/classes
-JFLAGS="-Djava.util.logging.config.file=$HERE/send.trace.props -cp $LIBS"
-java -cp $LIBS org.apache.qpid.proton.example.Send "$@"
-
diff --git a/examples/java/messenger/src/main/java/org/apache/qpid/proton/example/Recv.java b/examples/java/messenger/src/main/java/org/apache/qpid/proton/example/Recv.java
deleted file mode 100644
index 3934cff..0000000
--- a/examples/java/messenger/src/main/java/org/apache/qpid/proton/example/Recv.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.example;
-
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.impl.MessengerImpl;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Example/test of the java Messenger/Message API.
- * Based closely qpid src/proton/examples/messenger/py/recv.py
- * @author mberkowitz@sf.org
- * @since 8/4/2013
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public class Recv {
- private static Logger tracer = Logger.getLogger("proton.example");
- private boolean verbose = false;
- private int maxct = 0;
- private List<String> addrs = new ArrayList<String>();
-
- private static void usage() {
- System.err.println("Usage: recv [-v] [-n MAXCT] [-a ADDRESS]*");
- System.exit(2);
- }
-
- private Recv(String args[]) {
- int i = 0;
- while (i < args.length) {
- String arg = args[i++];
- if (arg.startsWith("-")) {
- if ("-v".equals(arg)) {
- verbose = true;
- } else if ("-a".equals(arg)) {
- addrs.add(args[i++]);
- } else if ("-n".equals(arg)) {
- maxct = Integer.valueOf(args[i++]);
- } else {
- System.err.println("unknown option " + arg);
- usage();
- }
- } else {
- usage();
- }
- }
- if (addrs.size() == 0) {
- addrs.add("amqp://~0.0.0.0");
- }
- }
-
- private static String safe(Object o) {
- return String.valueOf(o);
- }
-
- private void print(int i, Message msg) {
- StringBuilder b = new StringBuilder("message: ");
- b.append(i).append("\n");
- b.append("Address: ").append(msg.getAddress()).append("\n");
- b.append("Subject: ").append(msg.getSubject()).append("\n");
- if (verbose) {
- b.append("Props: ").append(msg.getProperties()).append("\n");
- b.append("App Props: ").append(msg.getApplicationProperties()).append("\n");
- b.append("Msg Anno: ").append(msg.getMessageAnnotations()).append("\n");
- b.append("Del Anno: ").append(msg.getDeliveryAnnotations()).append("\n");
- } else {
- ApplicationProperties p = msg.getApplicationProperties();
- String s = (p == null) ? "null" : safe(p.getValue());
- b.append("Headers: ").append(s).append("\n");
- }
- b.append(msg.getBody()).append("\n");
- b.append("END").append("\n");
- System.out.println(b.toString());
- }
-
- private void run() {
- try {
- Messenger mng = new MessengerImpl();
- mng.start();
- for (String a : addrs) {
- mng.subscribe(a);
- }
- int ct = 0;
- boolean done = false;
- while (!done) {
- mng.recv();
- while (mng.incoming() > 0) {
- Message msg = mng.get();
- ++ct;
- print(ct, msg);
- if (maxct > 0 && ct >= maxct) {
- done = true;
- break;
- }
- }
- }
- mng.stop();
- } catch (Exception e) {
- tracer.log(Level.SEVERE, "proton error", e);
- }
- }
-
- public static void main(String args[]) {
- Recv o = new Recv(args);
- o.run();
- }
-}
-
diff --git a/examples/java/messenger/src/main/java/org/apache/qpid/proton/example/Send.java b/examples/java/messenger/src/main/java/org/apache/qpid/proton/example/Send.java
deleted file mode 100644
index 6f4a919..0000000
--- a/examples/java/messenger/src/main/java/org/apache/qpid/proton/example/Send.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.example;
-
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.impl.MessengerImpl;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Example/test of the java Messenger/Message API.
- * Based closely qpid src/proton/examples/messenger/py/send.py
- * @author mberkowitz@sf.org
- * @since 8/4/2013
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public class Send {
-
- private static Logger tracer = Logger.getLogger("proton.example");
- private String address = "amqp://0.0.0.0";
- private String subject;
- private String[] bodies = new String[]{"Hello World!"};
-
- private static void usage() {
- System.err.println("Usage: send [-a ADDRESS] [-s SUBJECT] MSG+");
- System.exit(2);
- }
-
- private Send(String args[]) {
- int i = 0;
- while (i < args.length) {
- String arg = args[i++];
- if (arg.startsWith("-")) {
- if ("-a".equals(arg)) {
- address = args[i++];
- } else if ("-s".equals(arg)) {
- subject = args[i++];
- } else {
- System.err.println("unknown option " + arg);
- usage();
- }
- } else {
- --i;
- break;
- }
- }
-
- if(i != args.length)
- {
- bodies = Arrays.copyOfRange(args, i, args.length);
- }
- }
-
- private void run() {
- try {
- Messenger mng = new MessengerImpl();
- mng.start();
- Message msg = new MessageImpl();
- msg.setAddress(address);
- if (subject != null) msg.setSubject(subject);
- for (String body : bodies) {
- msg.setBody(new AmqpValue(body));
- mng.put(msg);
- }
- mng.send();
- mng.stop();
- } catch (Exception e) {
- tracer.log(Level.SEVERE, "proton error", e);
- }
- }
-
- public static void main(String args[]) {
- Send o = new Send(args);
- o.run();
- }
-}
-
-
-
diff --git a/pom.xml b/pom.xml
index 7391713..7f2b443 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,6 @@
<module>proton-j</module>
<module>tests</module>
<module>examples/engine/java</module>
- <module>examples/java/messenger</module>
<module>examples/java/reactor</module>
</modules>
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
index 38f39e0..b428736 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
@@ -31,7 +31,6 @@
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.Codec;
import org.apache.qpid.proton.codec.Data;
-import org.apache.qpid.proton.driver.Driver;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Engine;
@@ -40,10 +39,8 @@
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.reactor.Reactor;
-@SuppressWarnings("deprecation")
public final class Proton
{
@@ -96,33 +93,6 @@
applicationProperties, body, footer);
}
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated
- public static Messenger messenger()
- {
- return Messenger.Factory.create();
- }
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated
- public static Messenger messenger(String name)
- {
- return Messenger.Factory.create(name);
- }
-
- /**
- * @deprecated Messenger and its driver will be removed from upcoming proton-j releases.
- */
- @Deprecated
- public static Driver driver() throws IOException
- {
- return Driver.Factory.create();
- }
-
public static Reactor reactor() throws IOException
{
return Reactor.Factory.create();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/Connector.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/Connector.java
deleted file mode 100644
index 619b1cf..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/driver/Connector.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.driver;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Transport;
-
-/**
- * Intermediates between a proton engine {@link Connection} and the I/O
- * layer.
- *
- * The top half of the engine can be access via {@link #getConnection()}.
- * The bottom half of the engine is used by {@link #process()}.
- * Stores application specific context using {@link #setContext(Object)}.
- *
- * Implementations are not necessarily thread-safe.
- *
- * @param <C> application supplied context
- * @deprecated Messenger and its connector will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public interface Connector<C>
-{
- /**
- * Handle any inbound data, outbound data, or timing events pending on
- * the connector.
- * Typically, applications repeatedly invoke this method
- * during the lifetime of a connection.
- */
- boolean process() throws IOException;
-
- /**
- * Access the listener which opened this connector.
- *
- * @return the listener which created this connector, or null if the
- * connector has no listener (e.g. an outbound client
- * connection).
- */
- @SuppressWarnings("rawtypes")
- Listener listener();
-
- /**
- * Access the Authentication and Security context of the connector.
- *
- * @return the Authentication and Security context for the connector,
- * or null if none.
- */
- Sasl sasl();
-
- /**
- * Access the Transport associated with the connector.
- *
- */
-
- Transport getTransport();
-
- /**
- * Access the AMQP Connection associated with the connector.
- *
- * @return the connection context for the connector, or null if none.
- */
- Connection getConnection();
-
- /**
- * Assign the AMQP Connection associated with the connector.
- *
- * @param connection the connection to associate with the connector.
- */
- void setConnection(Connection connection);
-
- /**
- * Access the application context that is associated with the connector.
- *
- * @return the application context that was passed when creating this
- * connector. See
- * {@link Driver#createConnector(String, int, Object)
- * createConnector(String, int, Object)} and
- * {@link Driver#createConnector(java.nio.channels.SelectableChannel, Object)
- * createConnector(java.nio.channels.SelectableChannel, Object)}.
- */
- C getContext();
-
- /**
- * Assign a new application context to the connector.
- *
- * @param context new application context to associate with the connector
- */
- void setContext(C context);
-
- /**
- * Close the socket used by the connector.
- */
- void close();
-
- /**
- * Determine if the connector is closed.
- */
- boolean isClosed();
-
- /**
- * Destructor for the given connector.
- *
- * Assumes the connector's socket has been closed prior to call.
- */
- void destroy();
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java
deleted file mode 100644
index 3d8ada3..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.driver;
-
-import java.io.IOException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.ServerSocketChannel;
-
-import org.apache.qpid.proton.driver.impl.DriverImpl;
-
-/**
- * A driver for the proton engine.
- *
- * Manages {@link Connector}'s and {@link Listener}'s, which act as intermediaries between
- * the proton engine and the network.
- *
- * Provides methods for the application to access the "top half" of the engine API when the state
- * of the engine may have changed due to I/O or timing events - see {@link #connector()}.
- *
- * Connectors incorporate the SASL engine in order to provide a complete network stack:
- * AMQP over SASL over TCP.
- *
- * Unless otherwise stated, methods on Driver implementations are not necessarily thread-safe.
- *
- * @deprecated Messenger and its driver will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public interface Driver
-{
- /**
- * @deprecated Messenger and its driver will be removed from upcoming proton-j releases.
- */
- @Deprecated
- public static final class Factory
- {
- public static Driver create() throws IOException {
- return new DriverImpl();
- }
- }
-
- /**
- * Force {@link #doWait(long)} to return.
- *
- * If the driver is not currently waiting then the next invocation of {@link #doWait(long)}
- * will return immediately unless the {@link #connector()} method is invoked in the meantime.
- *
- * Thread-safe.
- */
- void wakeup();
-
- /**
- * Wait for an active connector or listener, or for {@link #wakeup()} to be called.
- *
- * Thread-safe.
- *
- * @param timeout maximum time in milliseconds to wait. -1 means wait indefinitely.
- *
- * @return true if woken up
- */
- boolean doWait(long timeout);
-
- /**
- * Get the next listener with pending data in the driver.
- *
- * @return null if no active listener available
- */
- @SuppressWarnings("rawtypes")
- Listener listener();
-
- /**
- * Get the next active connector in the driver.
- *
- * Returns the next connector with pending inbound data, available capacity
- * for outbound data, or pending tick.
- *
- * Clears the wake-up status that is set by {@link #wakeup()}.
- *
- * @return null if no active connector available
- */
- @SuppressWarnings("rawtypes")
- Connector connector();
-
- /**
- * Destruct the driver and all associated listeners, connectors and other resources.
- */
- void destroy();
-
- /**
- * Construct a listener for the given address.
- *
- * @param host local host address to listen on
- * @param port local port to listen on
- * @param context application-supplied, can be accessed via
- * {@link Listener#getContext() getContext()} method on a listener.
- * @return a new listener on the given host:port, null if error
- */
- <C> Listener<C> createListener(String host, int port, C context);
-
- /**
- * Create a listener using the existing channel.
- *
- * @param c existing SocketChannel for listener to listen on
- * @param context application-supplied, can be accessed via
- * {@link Listener#getContext() getContext()} method on a listener.
- * @return a new listener on the given channel, null if error
- */
- <C> Listener<C> createListener(ServerSocketChannel c, C context);
-
- /**
- * Construct a connector to the given remote address.
- *
- * @param host remote host to connect to.
- * @param port remote port to connect to.
- * @param context application-supplied, can be accessed via
- * {@link Connector#getContext() getContext()} method on a listener.
- *
- * @return a new connector to the given remote, or null on error.
- */
- <C> Connector<C> createConnector(String host, int port, C context);
-
- /**
- * Create a connector using the existing file descriptor.
- *
- * @param fd existing SocketChannel for listener to listen on
- * @param context application-supplied, can be accessed via
- * {@link Connector#getContext() getContext()} method on a listener.
- *
- * @return a new connector to the given host:port, null if error.
- */
- <C> Connector<C> createConnector(SelectableChannel fd, C context);
-
- /**
- * Return an iterator over all listeners.
- */
- Iterable<Listener> listeners();
- /**
- * Return an iterator over all connectors.
- */
- Iterable<Connector> connectors();
-
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/Listener.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/Listener.java
deleted file mode 100644
index 7541d4d..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/driver/Listener.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.driver;
-
-/**
- * Server API.
- *
- * @param <C> application supplied context
- * @deprecated Messenger and its listener will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public interface Listener<C>
-{
- /**
- * Accept a connection that is pending on the listener.
- *
- * @return a new connector for the remote, or NULL on error.
- */
- Connector<C> accept();
-
- /**
- * Access the application context that is associated with the listener.
- *
- * @return the application context that was passed when creating this
- * listener. See {@link Driver#createListener(String, int, Object)
- * createListener(String, int, Object)} and
- * {@link Driver#createConnector(java.nio.channels.SelectableChannel, Object)
- * createConnector(java.nio.channels.SelectableChannel, Object)}
- */
- C getContext();
-
- /**
- * Set the application context that is associated with this listener.
- *
- */
- void setContext(C ctx);
-
- /**
- * Close the socket used by the listener.
- *
- */
- void close() throws java.io.IOException;
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
deleted file mode 100644
index 18cad9a..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.driver.impl;
-
-import static org.apache.qpid.proton.driver.impl.ConnectorImpl.ConnectorState.UNINITIALIZED;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.driver.Connector;
-import org.apache.qpid.proton.driver.Listener;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.TransportException;
-
-@SuppressWarnings("deprecation")
-class ConnectorImpl<C> implements Connector<C>
-{
- private static int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private static int readBufferSize = Integer.getInteger
- ("pn.receive_buffer_size", DEFAULT_BUFFER_SIZE);
- private static int writeBufferSize = Integer.getInteger
- ("pn.send_buffer_size", DEFAULT_BUFFER_SIZE);
-
- enum ConnectorState {UNINITIALIZED, OPENED, EOS, CLOSED};
-
- private final DriverImpl _driver;
- private final Listener<C> _listener;
- private final SocketChannel _channel;
- private final Logger _logger = Logger.getLogger("proton.driver");
- private C _context;
-
- private Connection _connection;
- private Transport _transport = Proton.transport();
- private SelectionKey _key;
- private ConnectorState _state = UNINITIALIZED;
-
- private boolean _inputDone = false;
- private boolean _outputDone = false;
- private boolean _closed = false;
-
- private boolean _selected = false;
- private boolean _readAllowed = false;
-
- ConnectorImpl(DriverImpl driver, Listener<C> listener, SocketChannel c, C context, SelectionKey key)
- {
- _driver = driver;
- _listener = listener;
- _channel = c;
- _context = context;
- _key = key;
- }
-
- void selected()
- {
- if (!_selected) {
- _selected = true;
- _driver.selectConnector(this);
- _readAllowed = true;
- }
- }
-
- void unselected()
- {
- _selected = false;
- }
-
- public boolean process() throws IOException
- {
- if (isClosed() || !_channel.finishConnect()) return false;
-
- boolean processed = false;
- if (!_inputDone)
- {
- if (read()) {
- processed = true;
- }
- }
-
- if (!_outputDone)
- {
- if (write()) {
- processed = true;
- }
- }
-
- if (_outputDone && _inputDone)
- {
- close();
- }
-
- return processed;
- }
-
- private boolean read() throws IOException
- {
- if (!_readAllowed) return false;
- _readAllowed = false;
- boolean processed = false;
-
- int interest = _key.interestOps();
- int capacity = _transport.capacity();
- if (capacity == Transport.END_OF_STREAM)
- {
- _inputDone = true;
- }
- else
- {
- ByteBuffer tail = _transport.tail();
- int bytesRead = _channel.read(tail);
- if (bytesRead < 0) {
- _transport.close_tail();
- _inputDone = true;
- } else if (bytesRead > 0) {
- try {
- _transport.process();
- } catch (TransportException e) {
- _logger.log(Level.SEVERE, this + " error processing input", e);
- }
- processed = true;
- }
- }
-
- capacity = _transport.capacity();
- if (capacity > 0) {
- interest |= SelectionKey.OP_READ;
- } else {
- interest &= ~SelectionKey.OP_READ;
- if (capacity < 0) {
- _inputDone = true;
- }
- }
- _key.interestOps(interest);
-
- return processed;
- }
-
- private boolean write() throws IOException
- {
- boolean processed = false;
-
- int interest = _key.interestOps();
- boolean writeBlocked = false;
-
- try {
- while (_transport.pending() > 0 && !writeBlocked)
- {
- ByteBuffer head = _transport.head();
- int wrote = _channel.write(head);
- if (wrote > 0) {
- processed = true;
- _transport.pop(wrote);
- } else {
- writeBlocked = true;
- }
- }
-
- int pending = _transport.pending();
- if (pending > 0) {
- interest |= SelectionKey.OP_WRITE;
- } else {
- interest &= ~SelectionKey.OP_WRITE;
- if (pending < 0) {
- _outputDone = true;
- }
- }
- } catch (TransportException e) {
- _logger.log(Level.SEVERE, this + " error", e);
- interest &= ~SelectionKey.OP_WRITE;
- _inputDone = true;
- _outputDone = true;
- }
-
- _key.interestOps(interest);
-
- return processed;
- }
-
- public Listener<C> listener()
- {
- return _listener;
- }
-
- public Sasl sasl()
- {
- if (_transport != null)
- {
- return _transport.sasl();
- }
- else
- {
- return null;
- }
- }
-
- public Connection getConnection()
- {
- return _connection;
- }
-
- public void setConnection(Connection connection)
- {
- _connection = connection;
- _transport.bind(_connection);
- }
-
- public Transport getTransport()
- {
- return _transport;
- }
-
- public C getContext()
- {
- return _context;
- }
-
- public void setContext(C context)
- {
- _context = context;
- }
-
- public void close()
- {
- if (!isClosed())
- {
- try
- {
- _channel.close();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Exception when closing connection",e);
- }
- finally
- {
- _closed = true;
- selected();
- }
- }
- }
-
- public boolean isClosed()
- {
- return _closed;
- }
-
- public void destroy()
- {
- close(); // close if not closed already
- _driver.removeConnector(this);
- }
-
- @Override
- public String toString()
- {
- StringBuilder builder = new StringBuilder();
- builder.append("ConnectorImpl [_channel=").append(_channel).append("]");
- return builder.toString();
- }
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
deleted file mode 100644
index 6264d7a..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.driver.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.driver.Connector;
-import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.Listener;
-
-@SuppressWarnings("deprecation")
-public class DriverImpl implements Driver
-{
- private Selector _selector;
- private Collection<Listener> _listeners = new LinkedList();
- private Collection<Connector> _connectors = new LinkedList();
- private Logger _logger = Logger.getLogger("proton.driver");
- private Object _wakeupLock = new Object();
- private boolean _woken = false;
- private Queue<ConnectorImpl> _selectedConnectors = new ArrayDeque<ConnectorImpl>();
- private Queue<ListenerImpl> _selectedListeners = new ArrayDeque<ListenerImpl>();
-
- public DriverImpl() throws IOException
- {
- _selector = Selector.open();
- }
-
- public void wakeup()
- {
- synchronized (_wakeupLock) {
- _woken = true;
- }
- _selector.wakeup();
- }
-
- public boolean doWait(long timeout)
- {
- try
- {
- boolean woken;
- synchronized (_wakeupLock) {
- woken = _woken;
- }
-
- if (woken || timeout == 0) {
- _selector.selectNow();
- } else if (timeout < 0) {
- _selector.select();
- } else {
- _selector.select(timeout);
- }
-
- synchronized (_wakeupLock) {
- woken = woken || _woken;
- _woken = false;
- }
-
- for (SelectionKey key : _selector.selectedKeys()) {
- if (key.isAcceptable()) {
- ListenerImpl l = (ListenerImpl) key.attachment();
- l.selected();
- } else {
- ConnectorImpl c = (ConnectorImpl) key.attachment();
- c.selected();
- }
- }
-
- _selector.selectedKeys().clear();
-
- return woken;
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Exception when waiting for IO Event",e);
- throw new RuntimeException(e);
- }
- }
-
- void selectListener(ListenerImpl l)
- {
- _selectedListeners.add(l);
- }
-
- public Listener listener()
- {
- ListenerImpl listener = _selectedListeners.poll();
- if (listener != null) {
- listener.unselected();
- }
-
- return listener;
- }
-
- void selectConnector(ConnectorImpl c)
- {
- _selectedConnectors.add(c);
- }
-
- public Connector connector()
- {
- ConnectorImpl connector = _selectedConnectors.poll();
- if (connector != null) {
- connector.unselected();
- }
- return connector;
- }
-
- public void destroy()
- {
- try
- {
- _selector.close();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Exception when closing selector",e);
- throw new RuntimeException(e);
- }
- _listeners.clear();
- _connectors.clear();
- }
-
- public <C> Listener<C> createListener(String host, int port, C context)
- {
- try
- {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- ServerSocket serverSocket = serverSocketChannel.socket();
- serverSocket.bind(new InetSocketAddress(host, port));
- serverSocketChannel.configureBlocking(false);
- Listener<C> listener = createListener(serverSocketChannel, context);
- _logger.fine("Created listener on " + host + ":" + port + ": " + context);
-
- return listener;
- }
- catch (ClosedChannelException e)
- {
- e.printStackTrace(); // TODO - Implement
- }
- catch (IOException e)
- {
- e.printStackTrace(); // TODO - Implement
- }
- return null;
- }
-
- public <C> Listener<C> createListener(ServerSocketChannel c, C context)
- {
- Listener<C> l = new ListenerImpl<C>(this, c, context);
- SelectionKey key = registerInterest(c,SelectionKey.OP_ACCEPT);
- key.attach(l);
- _listeners.add(l);
- return l;
- }
-
- public <C> Connector<C> createConnector(String host, int port, C context)
- {
- try
- {
- SocketChannel channel = SocketChannel.open();
- channel.configureBlocking(false);
- // Disable the Nagle algorithm on TCP connections.
- channel.socket().setTcpNoDelay(true);
- channel.connect(new InetSocketAddress(host, port));
- return createConnector(channel, context);
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- public <C> Connector<C> createConnector(SelectableChannel c, C context)
- {
- SelectionKey key = registerInterest(c, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- Connector<C> co = new ConnectorImpl<C>(this, null, (SocketChannel)c, context, key);
- key.attach(co);
- _connectors.add(co);
- return co;
- }
-
- public <C> void removeConnector(Connector<C> c)
- {
- _connectors.remove(c);
- }
-
- public Iterable<Listener> listeners()
- {
- return _listeners;
- }
-
- public Iterable<Connector> connectors()
- {
- return _connectors;
- }
-
- protected <C> Connector<C> createServerConnector(SelectableChannel c, C context, Listener<C> l)
- {
- SelectionKey key = registerInterest(c, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- Connector<C> co = new ConnectorImpl<C>(this, l, (SocketChannel)c, context, key);
- key.attach(co);
- _connectors.add(co);
- return co;
- }
-
- private <C> SelectionKey registerInterest(SelectableChannel c, int opKeys)
- {
- try
- {
- return c.register(_selector, opKeys);
- }
- catch (ClosedChannelException e)
- {
- e.printStackTrace(); // TODO - Implement
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
deleted file mode 100644
index a7dd936..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ListenerImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.driver.impl;
-
-import java.io.IOException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.driver.Connector;
-import org.apache.qpid.proton.driver.Listener;
-
-@SuppressWarnings("deprecation")
-class ListenerImpl<C> implements Listener<C>
-{
- private C _context;
- private final ServerSocketChannel _channel;
- private final DriverImpl _driver;
- private final Logger _logger = Logger.getLogger("proton.driver");
- private boolean _selected = false;
-
- ListenerImpl(DriverImpl driver, ServerSocketChannel c, C context)
- {
- _driver = driver;
- _channel = c;
- _context = context;
- }
-
- void selected()
- {
- if (!_selected) {
- _selected = true;
- _driver.selectListener(this);
- }
- }
-
- void unselected()
- {
- _selected = false;
- }
-
- public Connector<C> accept()
- {
- try
- {
- SocketChannel c = _channel.accept();
- if(c != null)
- {
- c.configureBlocking(false);
- return _driver.createServerConnector(c, null, this);
- }
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Exception when accepting connection",e);
- }
- return null; //TODO - we should probably throw an exception instead of returning null?
- }
-
- public C getContext()
- {
- return _context;
- }
-
- public void setContext(C context)
- {
- _context = context;
- }
-
- public void close() throws IOException
- {
- _channel.socket().close();
- }
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
deleted file mode 100644
index bd19259..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger;
-
-import java.io.IOException;
-
-import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.impl.MessengerImpl;
-
-/**
- *
- * Messenger defines a high level interface for sending and receiving
- * messages. Every Messenger contains a single logical queue of
- * incoming messages and a single logical queue of outgoing
- * messages. These messages in these queues may be destined for, or
- * originate from, a variety of addresses.
- *
- * <h3>Address Syntax</h3>
- *
- * An address has the following form:
- *
- * [ amqp[s]:// ] [user[:password]@] domain [/[name]]
- *
- * Where domain can be one of:
- *
- * host | host:port | ip | ip:port | name
- *
- * The following are valid examples of addresses:
- *
- * - example.org
- * - example.org:1234
- * - amqp://example.org
- * - amqps://example.org
- * - example.org/incoming
- * - amqps://example.org/outgoing
- * - amqps://fred:trustno1@example.org
- * - 127.0.0.1:1234
- * - amqps://127.0.0.1:1234
- *
- * <h3>Sending & Receiving Messages</h3>
- *
- * The Messenger interface works in conjuction with the Message
- * class. The Message class is a mutable holder of message content.
- * The put method will encode the content in a given Message object
- * into the outgoing message queue leaving that Message object free
- * to be modified or discarded without having any impact on the
- * content in the outgoing queue.
- *
- * Similarly, the get method will decode the content in the incoming
- * message queue into the supplied Message object.
- *
- * @deprecated Messenger will be removed from upcoming proton-j releases.
-*/
-@Deprecated
-public interface Messenger
-{
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated
- public static final class Factory
- {
- public static Messenger create() {
- return new MessengerImpl();
- }
-
- public static Messenger create(String name) {
- return new MessengerImpl(name);
- }
- }
-
- /**
- * Flag for use with reject(), accept() and settle() methods.
- */
- static final int CUMULATIVE = 0x01;
-
- /**
- * Places the content contained in the message onto the outgoing
- * queue of the Messenger. This method will never block. The
- * send call may be used to block until the messages are
- * sent. Either a send() or a recv() call is neceesary at present
- * to cause the messages to actually be sent out.
- */
- void put(Message message) throws MessengerException;
-
- /**
- * Blocks until the outgoing queue is empty and, in the event that
- * an outgoing window has been set, until the messages in that
- * window have been received by the target to which they were
- * sent, or the operation times out. The timeout property
- * controls how long a Messenger will block before timing out.
- */
- void send() throws TimeoutException;
-
- void send(int n) throws TimeoutException;
-
- /**
- * Subscribes the Messenger to messages originating from the
- * specified source. The source is an address as specified in the
- * Messenger introduction with the following addition. If the
- * domain portion of the address begins with the '~' character,
- * the Messenger will interpret the domain as host/port, bind
- * to it, and listen for incoming messages. For example
- * "~0.0.0.0", "amqp://~0.0.0.0" will bind to any local interface
- * and listen for incoming messages.
- */
- void subscribe(String source) throws MessengerException;
- /**
- * Receives an arbitrary number of messages into the
- * incoming queue of the Messenger. This method will block until
- * at least one message is available or the operation times out.
- */
- void recv() throws TimeoutException;
- /**
- * Receives up to the specified number of messages into the
- * incoming queue of the Messenger. This method will block until
- * at least one message is available or the operation times out.
- */
- void recv(int count) throws TimeoutException;
- /**
- * Returns the capacity of the incoming message queue of
- * messenger. Note this count does not include those messages
- * already available on the incoming queue (see
- * incoming()). Rather it returns the number of incoming queue
- * entries available for receiving messages
- */
- int receiving();
- /**
- * Returns the message from the head of the incoming message
- * queue.
- */
- Message get();
-
- /**
- * Transitions the Messenger to an active state. A Messenger is
- * initially created in an inactive state. When inactive, a
- * Messenger will not send or receive messages from its internal
- * queues. A Messenger must be started before calling send() or
- * recv().
- */
- void start() throws IOException;
- /**
- * Transitions the Messenger to an inactive state. An inactive
- * Messenger will not send or receive messages from its internal
- * queues. A Messenger should be stopped before being discarded to
- * ensure a clean shutdown handshake occurs on any internally managed
- * connections.
- */
- void stop();
-
- boolean stopped();
-
- /** Sends or receives any outstanding messages queued for a
- * messenger. If timeout is zero, no blocking is done. A timeout
- * of -1 blocks forever, otherwise timeout is the maximum time (in
- * millisecs) to block. Returns True if work was performed.
- */
- boolean work(long timeout) throws TimeoutException;
-
- void interrupt();
-
- void setTimeout(long timeInMillis);
- long getTimeout();
-
- boolean isBlocking();
- void setBlocking(boolean b);
-
- /**
- * Returns a count of the messages currently on the outgoing queue
- * (i.e. those that have been put() but not yet actually sent
- * out).
- */
- int outgoing();
- /**
- * Returns a count of the messages available on the incoming
- * queue.
- */
- int incoming();
-
- int getIncomingWindow();
- void setIncomingWindow(int window);
-
- int getOutgoingWindow();
- void setOutgoingWindow(int window);
-
- /**
- * Returns a token which can be used to accept or reject the
- * message returned in the previous get() call.
- */
- Tracker incomingTracker();
- /**
- * Returns a token which can be used to track the status of the
- * message of the previous put() call.
- */
- Tracker outgoingTracker();
-
- /**
- * Rejects messages retrieved from the incoming message queue. The
- * tracker object for a message is obtained through a call to
- * incomingTracker() following a get(). If the flags argument
- * contains CUMULATIVE, then all message up to the one identified
- * by the tracker will be rejected.
- */
- void reject(Tracker tracker, int flags);
- /**
- * Accepts messages retrieved from the incoming message queue. The
- * tracker object for a message is obtained through a call to
- * incomingTracker() following a get(). If the flags argument
- * contains CUMULATIVE, then all message up to the one identified
- * by the tracker will be accepted.
- */
- void accept(Tracker tracker, int flags);
- void settle(Tracker tracker, int flags);
-
- /**
- * Gets the last known remote state of the delivery associated
- * with the given tracker.
- */
- Status getStatus(Tracker tracker);
-
- void route(String pattern, String address);
-
- void rewrite(String pattern, String address);
-
- /**
- * Set the path to the certificate file.
- */
- void setCertificate(String certificate);
-
- /**
- * Get the path to the certificate file.
- */
- String getCertificate();
-
- /**
- * Set the path to private key file.
- */
- void setPrivateKey(String privateKey);
-
- /**
- * Get the path to the private key file.
- */
- String getPrivateKey();
-
- /**
- * Set the password for private key file.
- */
- void setPassword(String password);
-
- /**
- * Get the password for the priate key file.
- */
- String getPassword();
-
- /**
- * Set the path to the trusted certificate database.
- */
- void setTrustedCertificates(String trusted);
-
- /**
- * Get the path to the trusted certificate database.
- */
- String getTrustedCertificates();
-
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
deleted file mode 100644
index c6f3570..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/MessengerException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.messenger;
-
-import org.apache.qpid.proton.ProtonException;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public class MessengerException extends ProtonException
-{
- public MessengerException()
- {
- }
-
- public MessengerException(String message)
- {
- super(message);
- }
-
- public MessengerException(String message, Throwable cause)
- {
- super(message, cause);
- }
-
- public MessengerException(Throwable cause)
- {
- super(cause);
- }
-
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Status.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Status.java
deleted file mode 100644
index ae7ca95..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Status.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public enum Status
-{
- UNKNOWN,
- PENDING,
- ACCEPTED,
- REJECTED,
- RELEASED,
- MODIFIED,
- ABORTED,
- SETTLED
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
deleted file mode 100644
index 974b1b6..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public interface Tracker { }
-
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
deleted file mode 100644
index e7c9d9e..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
+++ /dev/null
@@ -1,1555 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.InterruptException;
-import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.driver.Connector;
-import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.Listener;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.SslDomain;
-import org.apache.qpid.proton.engine.Ssl;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.MessengerException;
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.amqp.Binary;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public class MessengerImpl implements Messenger
-{
- private enum LinkCreditMode
- {
- // method for replenishing credit
- LINK_CREDIT_EXPLICIT, // recv(N)
- LINK_CREDIT_AUTO; // recv()
- }
-
- private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
- private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
- private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
- private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class);
-
- private final Logger _logger = Logger.getLogger("proton.messenger");
- private final String _name;
- private long _timeout = -1;
- private boolean _blocking = true;
- private long _nextTag = 1;
- private Driver _driver;
- private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
- private final int _credit_batch = 1024; // credit_mode == LINK_CREDIT_AUTO
- private int _credit; // available
- private int _distributed; // outstanding credit
- private int _receivers; // total # receiver Links
- private int _draining; // # Links in drain state
- private List<Receiver> _credited = new ArrayList<Receiver>();
- private List<Receiver> _blocked = new ArrayList<Receiver>();
- private long _next_drain;
- private TrackerImpl _incomingTracker;
- private TrackerImpl _outgoingTracker;
- private Store _incomingStore = new Store();
- private Store _outgoingStore = new Store();
- private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
- private int _sendThreshold;
-
- private Transform _routes = new Transform();
- private Transform _rewrites = new Transform();
-
- private String _certificate;
- private String _privateKey;
- private String _password;
- private String _trustedDb;
-
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated public MessengerImpl()
- {
- this(java.util.UUID.randomUUID().toString());
- }
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated public MessengerImpl(String name)
- {
- _name = name;
- }
-
- public void setTimeout(long timeInMillis)
- {
- _timeout = timeInMillis;
- }
-
- public long getTimeout()
- {
- return _timeout;
- }
-
- public boolean isBlocking()
- {
- return _blocking;
- }
-
- public void setBlocking(boolean b)
- {
- _blocking = b;
- }
-
- public void setCertificate(String certificate)
- {
- _certificate = certificate;
- }
-
- public String getCertificate()
- {
- return _certificate;
- }
-
- public void setPrivateKey(String privateKey)
- {
- _privateKey = privateKey;
- }
-
- public String getPrivateKey()
- {
- return _privateKey;
- }
-
- public void setPassword(String password)
- {
- _password = password;
- }
-
- public String getPassword()
- {
- return _password;
- }
-
- public void setTrustedCertificates(String trusted)
- {
- _trustedDb = trusted;
- }
-
- public String getTrustedCertificates()
- {
- return _trustedDb;
- }
-
- public void start() throws IOException
- {
- _driver = Proton.driver();
- }
-
- public void stop()
- {
- if (_driver != null) {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to stop");
- }
- //close all connections
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- connection.close();
- }
- //stop listeners
- for (Listener<?> l : _driver.listeners())
- {
- try
- {
- l.close();
- }
- catch (IOException e)
- {
- _logger.log(Level.WARNING, "Error while closing listener", e);
- }
- }
- waitUntil(_allClosed);
- }
- }
-
- public boolean stopped()
- {
- return _allClosed.test();
- }
-
- public boolean work(long timeout) throws TimeoutException
- {
- if (_driver == null) { return false; }
- _worked = false;
- return waitUntil(_workPred, timeout);
- }
-
- public void interrupt()
- {
- if (_driver != null) {
- _driver.wakeup();
- }
- }
-
- private String defaultRewrite(String address) {
- if (address != null && address.contains("@")) {
- Address addr = new Address(address);
- String scheme = addr.getScheme();
- String host = addr.getHost();
- String port = addr.getPort();
- String name = addr.getName();
-
- StringBuilder sb = new StringBuilder();
- if (scheme != null) {
- sb.append(scheme).append("://");
- }
- if (host != null) {
- sb.append(host);
- }
- if (port != null) {
- sb.append(":").append(port);
- }
- if (name != null) {
- sb.append("/").append(name);
- }
- return sb.toString();
- } else {
- return address;
- }
- }
-
-
- private String _original;
-
- private void rewriteMessage(Message m)
- {
- _original = m.getAddress();
- if (_rewrites.apply(_original)) {
- m.setAddress(_rewrites.result());
- } else {
- m.setAddress(defaultRewrite(_original));
- }
- }
-
- private void restoreMessage(Message m)
- {
- m.setAddress(_original);
- }
-
- private String routeAddress(String addr)
- {
- if (_routes.apply(addr)) {
- return _routes.result();
- } else {
- return addr;
- }
- }
-
- public void put(Message m) throws MessengerException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot put while messenger is stopped");
- }
-
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to put message: " + m);
- }
-
- StoreEntry entry = _outgoingStore.put( m.getAddress() );
- _outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
- _outgoingStore.trackEntry(entry));
-
- String routedAddress = routeAddress(m.getAddress());
- Address address = new Address(routedAddress);
- if (address.getHost() == null)
- {
- throw new MessengerException("unable to send to address: " + routedAddress);
- }
-
- rewriteMessage(m);
-
- try {
- adjustReplyTo(m);
-
- int encoded;
- byte[] buffer = new byte[5*1024];
- while (true)
- {
- try
- {
- encoded = m.encode(buffer, 0, buffer.length);
- break;
- } catch (java.nio.BufferOverflowException e) {
- buffer = new byte[buffer.length*2];
- }
- }
- entry.setEncodedMsg( buffer, encoded );
- }
- finally
- {
- restoreMessage(m);
- }
-
- Sender sender = getLink(address, new SenderFinder(address.getName()));
- pumpOut(m.getAddress(), sender);
- }
-
- private void reclaimLink(Link link)
- {
- if (link instanceof Receiver)
- {
- int credit = link.getCredit();
- if (credit > 0)
- {
- _credit += credit;
- _distributed -= credit;
- }
- }
-
- Delivery delivery = link.head();
- while (delivery != null)
- {
- StoreEntry entry = (StoreEntry) delivery.getContext();
- if (entry != null)
- {
- entry.setDelivery(null);
- if (delivery.isBuffered()) {
- entry.setStatus(Status.ABORTED);
- }
- }
- delivery = delivery.next();
- }
- linkRemoved(link);
- }
-
- private int pumpOut( String address, Sender sender )
- {
- StoreEntry entry = _outgoingStore.get( address );
- if (entry == null) {
- sender.drained();
- return 0;
- }
-
- byte[] tag = String.valueOf(_nextTag++).getBytes();
- Delivery delivery = sender.delivery(tag);
- entry.setDelivery( delivery );
- _logger.log(Level.FINE, "Sending on delivery: " + delivery);
- int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength());
- if (n < 0) {
- _outgoingStore.freeEntry( entry );
- _logger.log(Level.WARNING, "Send error: " + n);
- return n;
- } else {
- sender.advance();
- _outgoingStore.freeEntry( entry );
- return 0;
- }
- }
-
- public void send() throws TimeoutException
- {
- send(-1);
- }
-
- public void send(int n) throws TimeoutException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot send while messenger is stopped");
- }
-
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to send");
- }
-
- if (n == -1)
- _sendThreshold = 0;
- else
- {
- _sendThreshold = outgoing() - n;
- if (_sendThreshold < 0)
- _sendThreshold = 0;
- }
-
- waitUntil(_sentSettled);
- }
-
- public void recv(int n) throws TimeoutException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot recv while messenger is stopped");
- }
-
- if (_logger.isLoggable(Level.FINE) && n != -1)
- {
- _logger.fine(this + " about to wait for up to " + n + " messages to be received");
- }
-
- if (n == -1)
- {
- _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
- }
- else
- {
- _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
- if (n > _distributed)
- _credit = n - _distributed;
- else // cancel unallocated
- _credit = 0;
- }
-
- distributeCredit();
-
- waitUntil(_messageAvailable);
- }
-
- public void recv() throws TimeoutException
- {
- recv(-1);
- }
-
- public int receiving()
- {
- return _credit + _distributed;
- }
-
- public Message get()
- {
- StoreEntry entry = _incomingStore.get( null );
- if (entry != null)
- {
- Message message = Proton.message();
- message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
-
- _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
- _incomingStore.trackEntry(entry));
-
- _incomingStore.freeEntry( entry );
- return message;
- }
- return null;
- }
-
- private int pumpIn(String address, Receiver receiver)
- {
- Delivery delivery = receiver.current();
- if (delivery.isReadable() && !delivery.isPartial())
- {
- StoreEntry entry = _incomingStore.put( address );
- entry.setDelivery( delivery );
-
- _logger.log(Level.FINE, "Readable delivery found: " + delivery);
-
- int size = delivery.pending();
- byte[] buffer = new byte[size];
- int read = receiver.recv( buffer, 0, buffer.length );
- if (read != size) {
- throw new IllegalStateException();
- }
- entry.setEncodedMsg( buffer, size );
- receiver.advance();
-
- // account for the used credit, replenish if
- // low (< 20% maximum per-link batch) and
- // extra credit available
- assert(_distributed > 0);
- _distributed--;
- if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
- {
- final int max = perLinkCredit();
- final int lo_thresh = (int)(max * 0.2 + 0.5);
- if (receiver.getRemoteCredit() < lo_thresh)
- {
- final int more = Math.min(_credit, max - receiver.getRemoteCredit());
- _credit -= more;
- _distributed += more;
- receiver.flow(more);
- }
- }
- // check if blocked
- if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
- {
- _credited.remove(receiver);
- if (receiver.getDrain())
- {
- receiver.setDrain(false);
- assert( _draining > 0 );
- _draining--;
- }
- _blocked.add(receiver);
- }
- }
- return 0;
- }
-
- public void subscribe(String source) throws MessengerException
- {
- if (_driver == null) {
- throw new IllegalStateException("messenger is stopped");
- }
-
- String routed = routeAddress(source);
- Address address = new Address(routed);
-
- String hostName = address.getHost();
- if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed);
- int port = Integer.valueOf(address.getImpliedPort());
- if (address.isPassive())
- {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
- }
- ListenerContext ctx = new ListenerContext(address);
- _driver.createListener(hostName, port, ctx);
- }
- else
- {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to subscribe to source " + source);
- }
- getLink(address, new ReceiverFinder(address.getName()));
- }
- }
-
- public int outgoing()
- {
- return _outgoingStore.size() + queued(true);
- }
-
- public int incoming()
- {
- return _incomingStore.size() + queued(false);
- }
-
- public int getIncomingWindow()
- {
- return _incomingStore.getWindow();
- }
-
- public void setIncomingWindow(int window)
- {
- _incomingStore.setWindow(window);
- }
-
- public int getOutgoingWindow()
- {
- return _outgoingStore.getWindow();
- }
-
- public void setOutgoingWindow(int window)
- {
- _outgoingStore.setWindow(window);
- }
-
- public Tracker incomingTracker()
- {
- return _incomingTracker;
- }
- public Tracker outgoingTracker()
- {
- return _outgoingTracker;
- }
-
- private Store getTrackerStore(Tracker tracker)
- {
- return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore;
- }
-
- @Override
- public void reject(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false);
- }
-
- @Override
- public void accept(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false);
- }
-
- @Override
- public void settle(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true);
- }
-
- public Status getStatus(Tracker tracker)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- StoreEntry e = getTrackerStore(tracker).getEntry(id);
- if (e != null)
- {
- return e.getStatus();
- }
- return Status.UNKNOWN;
- }
-
- @Override
- public void route(String pattern, String address)
- {
- _routes.rule(pattern, address);
- }
-
- @Override
- public void rewrite(String pattern, String address)
- {
- _rewrites.rule(pattern, address);
- }
-
- private int queued(boolean outgoing)
- {
- int count = 0;
- if (_driver != null) {
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- if (outgoing)
- {
- if (link instanceof Sender) count += link.getQueued();
- }
- else
- {
- if (link instanceof Receiver) count += link.getQueued();
- }
- }
- }
- }
- return count;
- }
-
- private void bringDestruction()
- {
- for (Connector<?> c : _awaitingDestruction)
- {
- c.destroy();
- }
- _awaitingDestruction.clear();
- }
-
- private void processAllConnectors()
- {
- distributeCredit();
- for (Connector<?> c : _driver.connectors())
- {
- processEndpoints(c);
- try
- {
- if (c.process()) {
- _worked = true;
- }
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
- bringDestruction();
- distributeCredit();
- }
-
- private void processActive()
- {
- //process active listeners
- for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
- {
- _worked = true;
- Connector<?> c = l.accept();
- Connection connection = Proton.connection();
- connection.setContainer(_name);
- ListenerContext ctx = (ListenerContext) l.getContext();
- connection.setContext(new ConnectionContext(ctx.getAddress(), c));
- c.setConnection(connection);
- Transport transport = c.getTransport();
- //TODO: full SASL
- Sasl sasl = c.sasl();
- if (sasl != null)
- {
- sasl.server();
- sasl.setMechanisms(new String[]{"ANONYMOUS"});
- sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
- }
- transport.ssl(ctx.getDomain());
- connection.open();
- }
- // process connectors, reclaiming credit on closed connectors
- for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
- {
- _worked = true;
- if (c.isClosed())
- {
- _awaitingDestruction.add(c);
- reclaimCredit(c.getConnection());
- }
- else
- {
- _logger.log(Level.FINE, "Processing active connector " + c);
- try
- {
- c.process();
- processEndpoints(c);
- c.process();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
- }
- bringDestruction();
- distributeCredit();
- }
-
- private void processEndpoints(Connector c)
- {
- Connection connection = c.getConnection();
-
- if (connection.getLocalState() == EndpointState.UNINITIALIZED)
- {
- connection.open();
- }
-
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- Link link = delivery.getLink();
- if (delivery.isUpdated())
- {
- if (link instanceof Sender)
- {
- delivery.disposition(delivery.getRemoteState());
- }
- StoreEntry e = (StoreEntry) delivery.getContext();
- if (e != null) e.updated();
- }
-
- if (delivery.isReadable())
- {
- pumpIn( link.getSource().getAddress(), (Receiver)link );
- }
-
- Delivery next = delivery.getWorkNext();
- delivery.clear();
- delivery = next;
- }
-
- for (Session session : new Sessions(connection, UNINIT, ANY))
- {
- session.open();
- _logger.log(Level.FINE, "Opened session " + session);
- }
- for (Link link : new Links(connection, UNINIT, ANY))
- {
- //TODO: the following is not correct; should only copy those properties that we understand
- //TODO: is this any better:
- if (link.getRemoteSource() != null) {
- link.setSource(link.getRemoteSource().copy());
- }
- if (link.getRemoteTarget() != null) {
- link.setTarget(link.getRemoteTarget().copy());
- }
- linkAdded(link);
- link.open();
- _logger.log(Level.FINE, "Opened link " + link);
- }
-
- distributeCredit();
-
- for (Link link : new Links(connection, ACTIVE, ACTIVE))
- {
- if (link instanceof Sender)
- {
- pumpOut(link.getTarget().getAddress(), (Sender)link);
- }
- }
-
- for (Session session : new Sessions(connection, ACTIVE, CLOSED))
- {
- session.close();
- }
-
- for (Link link : new Links(connection, ANY, CLOSED))
- {
- if (link.getLocalState() == EndpointState.ACTIVE)
- {
- link.close();
- }
- else
- {
- reclaimLink(link);
- }
- }
-
- if (connection.getRemoteState() == EndpointState.CLOSED)
- {
- if (connection.getLocalState() == EndpointState.ACTIVE)
- {
- connection.close();
- }
- }
- }
-
- private boolean waitUntil(Predicate condition) throws TimeoutException
- {
- if (_blocking) {
- boolean done = waitUntil(condition, _timeout);
- if (!done) {
- _logger.log(Level.SEVERE, String.format
- ("Timeout when waiting for condition %s after %s ms",
- condition, _timeout));
- throw new TimeoutException();
- }
- return done;
- } else {
- return waitUntil(condition, 0);
- }
- }
-
- private boolean waitUntil(Predicate condition, long timeout)
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot wait while messenger is stopped");
- }
-
- processAllConnectors();
-
- // wait until timeout expires or until test is true
- long now = System.currentTimeMillis();
- final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
- boolean done = false;
-
- while (true)
- {
- done = condition.test();
- if (done) break;
-
- long remaining;
- if (timeout < 0)
- remaining = -1;
- else {
- remaining = deadline - now;
- if (remaining < 0) break;
- }
-
- // Update the credit scheduler. If the scheduler detects
- // credit imbalance on the links, wake up in time to
- // service credit drain
- distributeCredit();
- if (_next_drain != 0)
- {
- long wakeup = (_next_drain > now) ? _next_drain - now : 0;
- remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
- }
-
- boolean woken;
- woken = _driver.doWait(remaining);
- processActive();
- if (woken) {
- throw new InterruptException();
- }
- now = System.currentTimeMillis();
- }
-
- return done;
- }
-
- private Connection lookup(Address address)
- {
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- ConnectionContext ctx = (ConnectionContext) connection.getContext();
- if (ctx.matches(address))
- {
- return connection;
- }
- }
- return null;
- }
-
- private void reclaimCredit(Connection connection)
- {
- for (Link link : new Links(connection, ANY, ANY))
- {
- reclaimLink(link);
- }
- }
-
- private void distributeCredit()
- {
- if (_receivers == 0) return;
-
- if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
- {
- // replenish, but limit the max total messages buffered
- final int max = _receivers * _credit_batch;
- final int used = _distributed + incoming();
- if (max > used)
- _credit = max - used;
- }
-
- // reclaim any credit left over after draining links has completed
- if (_draining > 0)
- {
- Iterator<Receiver> itr = _credited.iterator();
- while (itr.hasNext())
- {
- Receiver link = (Receiver) itr.next();
- if (link.getDrain())
- {
- if (!link.draining())
- {
- // drain completed for this link
- int drained = link.drained();
- assert(_distributed >= drained);
- _distributed -= drained;
- _credit += drained;
- link.setDrain(false);
- _draining--;
- itr.remove();
- _blocked.add(link);
- }
- }
- }
- }
-
- // distribute available credit to blocked links
- final int batch = perLinkCredit();
- while (_credit > 0 && !_blocked.isEmpty())
- {
- Receiver link = _blocked.get(0);
- _blocked.remove(0);
-
- final int more = Math.min(_credit, batch);
- _distributed += more;
- _credit -= more;
-
- link.flow(more);
- _credited.add(link);
-
- // flow changed, must process it
- ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
- try
- {
- ctx.getConnector().process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
-
- if (_blocked.isEmpty())
- {
- _next_drain = 0;
- }
- else
- {
- // not enough credit for all links - start draining granted credit
- if (_draining == 0)
- {
- // don't do it too often - pace ourselves (it's expensive)
- if (_next_drain == 0)
- {
- _next_drain = System.currentTimeMillis() + 250;
- }
- else if (_next_drain <= System.currentTimeMillis())
- {
- // initiate drain, free up at most enough to satisfy blocked
- _next_drain = 0;
- int needed = _blocked.size() * batch;
-
- for (Receiver link : _credited)
- {
- if (!link.getDrain()) {
- link.setDrain(true);
- needed -= link.getRemoteCredit();
- _draining++;
- // drain requested on link, must process it
- ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
- try
- {
- ctx.getConnector().process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- if (needed <= 0) break;
- }
- }
- }
- }
- }
- }
-
- private interface Predicate
- {
- boolean test();
- }
-
- private class SentSettled implements Predicate
- {
- public boolean test()
- {
- //are all sent messages settled?
- int total = _outgoingStore.size();
-
- for (Connector<?> c : _driver.connectors())
- {
- // TBD
- // check if transport is done generating output
- // pn_transport_t *transport = pn_connector_transport(ctor);
- // if (transport) {
- // if (!pn_transport_quiesced(transport)) {
- // pn_connector_process(ctor);
- // return false;
- // }
- // }
-
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- if (link instanceof Sender)
- {
- total += link.getQueued();
- }
- }
-
- // TBD: there is no per-link unsettled
- // deliveries iterator, so for now get the
- // deliveries by walking the outgoing trackers
- Iterator<StoreEntry> entries = _outgoingStore.trackedEntries();
- while (entries.hasNext() && total <= _sendThreshold)
- {
- StoreEntry e = (StoreEntry) entries.next();
- if (e != null )
- {
- Delivery d = e.getDelivery();
- if (d != null)
- {
- if (d.getRemoteState() == null && !d.remotelySettled())
- {
- total++;
- }
- }
- }
- }
- }
- return total <= _sendThreshold;
- }
- }
-
- private class MessageAvailable implements Predicate
- {
- public boolean test()
- {
- //do we have at least one pending message?
- if (_incomingStore.size() > 0) return true;
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- if (delivery.isReadable() && !delivery.isPartial())
- {
- return true;
- }
- else
- {
- delivery = delivery.getWorkNext();
- }
- }
- }
- // if no connections, or not listening, exit as there won't ever be a message
- if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext())
- return true;
-
- return false;
- }
- }
-
- private class AllClosed implements Predicate
- {
- public boolean test()
- {
- if (_driver == null) {
- return true;
- }
-
- for (Connector<?> c : _driver.connectors()) {
- if (!c.isClosed()) {
- return false;
- }
- }
-
- _driver.destroy();
- _driver = null;
-
- return true;
- }
- }
-
- private boolean _worked = false;
-
- private class WorkPred implements Predicate
- {
- public boolean test()
- {
- return _worked;
- }
- }
-
- private final SentSettled _sentSettled = new SentSettled();
- private final MessageAvailable _messageAvailable = new MessageAvailable();
- private final AllClosed _allClosed = new AllClosed();
- private final WorkPred _workPred = new WorkPred();
-
- private interface LinkFinder<C extends Link>
- {
- C test(Link link);
- C create(Session session);
- }
-
- private class SenderFinder implements LinkFinder<Sender>
- {
- private final String _path;
-
- SenderFinder(String path)
- {
- _path = path == null ? "" : path;
- }
-
- public Sender test(Link link)
- {
- if (link instanceof Sender && matchTarget((Target) link.getTarget(), _path))
- {
- return (Sender) link;
- }
- else
- {
- return null;
- }
- }
-
- public Sender create(Session session)
- {
- Sender sender = session.sender(_path);
- Target target = new Target();
- target.setAddress(_path);
- sender.setTarget(target);
- // the C implemenation does this:
- Source source = new Source();
- source.setAddress(_path);
- sender.setSource(source);
- if (getOutgoingWindow() > 0)
- {
- // use explicit settlement via dispositions (not pre-settled)
- sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); // desired
- }
- return sender;
- }
- }
-
- private class ReceiverFinder implements LinkFinder<Receiver>
- {
- private final String _path;
-
- ReceiverFinder(String path)
- {
- _path = path == null ? "" : path;
- }
-
- public Receiver test(Link link)
- {
- if (link instanceof Receiver && matchSource((Source) link.getSource(), _path))
- {
- return (Receiver) link;
- }
- else
- {
- return null;
- }
- }
-
- public Receiver create(Session session)
- {
- Receiver receiver = session.receiver(_path);
- Source source = new Source();
- source.setAddress(_path);
- receiver.setSource(source);
- // the C implemenation does this:
- Target target = new Target();
- target.setAddress(_path);
- receiver.setTarget(target);
- if (getIncomingWindow() > 0)
- {
- // use explicit settlement via dispositions (not pre-settled)
- receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); // desired
- receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
- }
- return receiver;
- }
- }
-
- private <C extends Link> C getLink(Address address, LinkFinder<C> finder)
- {
- Connection connection = lookup(address);
- if (connection == null)
- {
- String host = address.getHost();
- int port = Integer.valueOf(address.getImpliedPort());
- Connector<?> connector = _driver.createConnector(host, port, null);
- _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
- connection = Proton.connection();
- connection.setContainer(_name);
- connection.setHostname(host);
- connection.setContext(new ConnectionContext(address, connector));
- connector.setConnection(connection);
- Sasl sasl = connector.sasl();
- if (sasl != null)
- {
- sasl.client();
- sasl.setMechanisms(new String[]{"ANONYMOUS"});
- }
- if ("amqps".equalsIgnoreCase(address.getScheme())) {
- Transport transport = connector.getTransport();
- SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
- if (_trustedDb != null) {
- domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
- //domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
- } else {
- domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
- }
- Ssl ssl = transport.ssl(domain);
- //ssl.setPeerHostname(host);
- }
- connection.open();
- }
-
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- C result = finder.test(link);
- if (result != null) return result;
- }
- Session session = connection.session();
- session.open();
- C link = finder.create(session);
- linkAdded(link);
- link.open();
- return link;
- }
-
- private static class Links implements Iterable<Link>
- {
- private final Connection _connection;
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
-
- Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _connection = connection;
- _local = local;
- _remote = remote;
- }
-
- public java.util.Iterator<Link> iterator()
- {
- return new LinkIterator(_connection, _local, _remote);
- }
- }
-
- private static class LinkIterator implements java.util.Iterator<Link>
- {
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
- private Link _next;
-
- LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _local = local;
- _remote = remote;
- _next = connection.linkHead(_local, _remote);
- }
-
- public boolean hasNext()
- {
- return _next != null;
- }
-
- public Link next()
- {
- try
- {
- return _next;
- }
- finally
- {
- _next = _next.next(_local, _remote);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class Sessions implements Iterable<Session>
- {
- private final Connection _connection;
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
-
- Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _connection = connection;
- _local = local;
- _remote = remote;
- }
-
- public java.util.Iterator<Session> iterator()
- {
- return new SessionIterator(_connection, _local, _remote);
- }
- }
-
- private static class SessionIterator implements java.util.Iterator<Session>
- {
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
- private Session _next;
-
- SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _local = local;
- _remote = remote;
- _next = connection.sessionHead(_local, _remote);
- }
-
- public boolean hasNext()
- {
- return _next != null;
- }
-
- public Session next()
- {
- try
- {
- return _next;
- }
- finally
- {
- _next = _next.next(_local, _remote);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private void adjustReplyTo(Message m)
- {
- String original = m.getReplyTo();
- if (original != null) {
- if (original.startsWith("~/"))
- {
- m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
- }
- else if (original.equals("~"))
- {
- m.setReplyTo("amqp://" + _name);
- }
- }
- }
-
- private static boolean matchTarget(Target target, String path)
- {
- if (target == null) return path.isEmpty();
- else return path.equals(target.getAddress());
- }
-
- private static boolean matchSource(Source source, String path)
- {
- if (source == null) return path.isEmpty();
- else return path.equals(source.getAddress());
- }
-
- @Override
- public String toString()
- {
- StringBuilder builder = new StringBuilder();
- builder.append("MessengerImpl [_name=").append(_name).append("]");
- return builder.toString();
- }
-
- // compute the maximum amount of credit each receiving link is
- // entitled to. The actual credit given to the link depends on
- // what amount of credit is actually available.
- private int perLinkCredit()
- {
- if (_receivers == 0) return 0;
- int total = _credit + _distributed;
- return Math.max(total/_receivers, 1);
- }
-
- // a new link has been created, account for it.
- private void linkAdded(Link link)
- {
- if (link instanceof Receiver)
- {
- _receivers++;
- _blocked.add((Receiver)link);
- link.setContext(Boolean.TRUE);
- }
- }
-
- // a link is being removed, account for it.
- private void linkRemoved(Link _link)
- {
- if (_link instanceof Receiver && (Boolean) _link.getContext())
- {
- _link.setContext(Boolean.FALSE);
- Receiver link = (Receiver)_link;
- assert _receivers > 0;
- _receivers--;
- if (link.getDrain())
- {
- link.setDrain(false);
- assert _draining > 0;
- _draining--;
- }
- if (_blocked.contains(link))
- _blocked.remove(link);
- else if (_credited.contains(link))
- _credited.remove(link);
- else
- assert(false);
- }
- }
-
- private static class ConnectionContext
- {
- private Address _address;
- private Connector _connector;
-
- public ConnectionContext(Address address, Connector connector)
- {
- _address = address;
- _connector = connector;
- }
-
- public Address getAddress()
- {
- return _address;
- }
-
- public boolean matches(Address address)
- {
- String host = address.getHost();
- String port = address.getImpliedPort();
- Connection conn = _connector.getConnection();
- return host.equals(conn.getRemoteContainer()) ||
- (_address.getHost().equals(host) && _address.getImpliedPort().equals(port));
- }
-
- public Connector getConnector()
- {
- return _connector;
- }
- }
-
- private SslDomain makeDomain(Address address, SslDomain.Mode mode)
- {
- SslDomain domain = Proton.sslDomain();
- domain.init(mode);
- if (_certificate != null) {
- domain.setCredentials(_certificate, _privateKey, _password);
- }
- if (_trustedDb != null) {
- domain.setTrustedCaDb(_trustedDb);
- }
-
- if ("amqps".equalsIgnoreCase(address.getScheme())) {
- domain.allowUnsecuredClient(false);
- } else {
- domain.allowUnsecuredClient(true);
- }
-
- return domain;
- }
-
-
- private class ListenerContext
- {
- private Address _address;
- private SslDomain _domain;
-
- public ListenerContext(Address address)
- {
- _address = address;
- _domain = makeDomain(address, SslDomain.Mode.SERVER);
- }
-
- public SslDomain getDomain()
- {
- return _domain;
- }
-
- public Address getAddress()
- {
- return _address;
- }
-
- }
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
deleted file mode 100644
index b60e8ed..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class Store
-{
- private static final Accepted ACCEPTED = Accepted.getInstance();
- private static final Rejected REJECTED = new Rejected();
-
- private LinkedList<StoreEntry> _store = new LinkedList<StoreEntry>();
- private HashMap<String, LinkedList<StoreEntry>> _stream = new HashMap<String, LinkedList<StoreEntry>>();
-
- // for incoming/outgoing window tracking
- int _window;
- int _lwm;
- int _hwm;
- private HashMap<Integer, StoreEntry> _tracked = new HashMap<Integer, StoreEntry>();
-
- Store()
- {
- }
-
- private boolean isTracking( Integer id )
- {
- return id != null && (id.intValue() - _lwm >= 0) && (_hwm - id.intValue() > 0);
- }
-
- int size()
- {
- return _store.size();
- }
-
- int getWindow()
- {
- return _window;
- }
-
- void setWindow(int window)
- {
- _window = window;
- }
-
- StoreEntry put(String address)
- {
- if (address == null) address = "";
- StoreEntry entry = new StoreEntry(this, address);
- _store.add( entry );
- LinkedList<StoreEntry> list = _stream.get( address );
- if (list != null) {
- list.add( entry );
- } else {
- list = new LinkedList<StoreEntry>();
- list.add( entry );
- _stream.put( address, list );
- }
- entry.stored();
- return entry;
- }
-
- StoreEntry get(String address)
- {
- if (address != null) {
- LinkedList<StoreEntry> list = _stream.get( address );
- if (list != null) return list.peekFirst();
- } else {
- return _store.peekFirst();
- }
- return null;
- }
-
- StoreEntry getEntry(int id)
- {
- return _tracked.get(id);
- }
-
- Iterator<StoreEntry> trackedEntries()
- {
- return _tracked.values().iterator();
- }
-
- void freeEntry(StoreEntry entry)
- {
- if (entry.isStored()) {
- _store.remove( entry );
- LinkedList<StoreEntry> list = _stream.get( entry.getAddress() );
- if (list != null) list.remove( entry );
- entry.notStored();
- }
- // note well: may still be in _tracked map if still in window!
- }
-
- public int trackEntry(StoreEntry entry)
- {
- assert( entry.getStore() == this );
- entry.setId(_hwm++);
- _tracked.put(entry.getId(), entry);
- slideWindow();
- return entry.getId();
- }
-
- private void slideWindow()
- {
- if (_window >= 0)
- {
- while (_hwm - _lwm > _window)
- {
- StoreEntry old = getEntry(_lwm);
- if (old != null)
- {
- _tracked.remove( old.getId() );
- Delivery d = old.getDelivery();
- if (d != null) {
- if (d.getLocalState() == null)
- d.disposition(ACCEPTED);
- d.settle();
- }
- }
- _lwm++;
- }
- }
- }
-
- int update(int id, Status status, int flags, boolean settle, boolean match )
- {
- if (!isTracking(id)) return 0;
-
- int start = (Messenger.CUMULATIVE & flags) != 0 ? _lwm : id;
- for (int i = start; (id - i) >= 0; i++)
- {
- StoreEntry e = getEntry(i);
- if (e != null)
- {
- Delivery d = e.getDelivery();
- if (d != null)
- {
- if (d.getLocalState() == null)
- {
- if (match)
- {
- d.disposition(d.getRemoteState());
- }
- else
- {
- switch (status)
- {
- case ACCEPTED:
- d.disposition(ACCEPTED);
- break;
- case REJECTED:
- d.disposition(REJECTED);
- break;
- default:
- break;
- }
- }
- e.updated();
- }
- }
- if (settle)
- {
- if (d != null)
- {
- d.settle();
- }
- _tracked.remove(e.getId());
- }
- }
- }
-
- while (_hwm - _lwm > 0 && !_tracked.containsKey(_lwm))
- {
- _lwm++;
- }
-
- return 0;
- }
-}
-
-
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
deleted file mode 100644
index 1687b94..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.messaging.Received;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class StoreEntry
-{
- private Store _store;
- private Integer _id;
- private String _address;
- private byte[] _encodedMsg;
- private int _encodedLength;
- private Delivery _delivery;
- private Status _status = Status.UNKNOWN;
- private Object _context;
- private boolean _inStore = false;
-
- public StoreEntry(Store store, String address)
- {
- _store = store;
- _address = address;
- }
-
- public Store getStore()
- {
- return _store;
- }
-
- public boolean isStored()
- {
- return _inStore;
- }
-
- public void stored()
- {
- _inStore = true;
- }
-
- public void notStored()
- {
- _inStore = false;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public byte[] getEncodedMsg()
- {
- return _encodedMsg;
- }
-
- public int getEncodedLength()
- {
- return _encodedLength;
- }
-
- public void setEncodedMsg( byte[] encodedMsg, int length )
- {
- _encodedMsg = encodedMsg;
- _encodedLength = length;
- }
-
- public void setId(int id)
- {
- _id = new Integer(id);
- }
-
- public Integer getId()
- {
- return _id;
- }
-
- public void setDelivery( Delivery d )
- {
- if (_delivery != null)
- {
- _delivery.setContext(null);
- }
- _delivery = d;
- if (_delivery != null)
- {
- _delivery.setContext(this);
- }
- updated();
- }
-
- public Delivery getDelivery()
- {
- return _delivery;
- }
-
- public Status getStatus()
- {
- return _status;
- }
-
- public void setStatus(Status status)
- {
- _status = status;
- }
-
- private static Status _disp2status(DeliveryState disp)
- {
- if (disp == null) return Status.PENDING;
-
- if (disp instanceof Received)
- return Status.PENDING;
- if (disp instanceof Accepted)
- return Status.ACCEPTED;
- if (disp instanceof Rejected)
- return Status.REJECTED;
- if (disp instanceof Released)
- return Status.RELEASED;
- if (disp instanceof Modified)
- return Status.MODIFIED;
- assert(false);
- return null;
- }
-
- public void updated()
- {
- if (_delivery != null)
- {
- if (_delivery.getRemoteState() != null)
- {
- _status = _disp2status(_delivery.getRemoteState());
- }
- else if (_delivery.remotelySettled())
- {
- DeliveryState disp = _delivery.getLocalState();
- if (disp == null) {
- _status = Status.SETTLED;
- } else {
- _status = _disp2status(_delivery.getLocalState());
- }
- }
- else
- {
- _status = Status.PENDING;
- }
- }
- }
-
- public void setContext(Object context)
- {
- _context = context;
- }
-
- public Object getContext()
- {
- return _context;
- }
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
deleted file mode 100644
index 2d8b584..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import org.apache.qpid.proton.messenger.Tracker;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class TrackerImpl implements Tracker
-{
- public enum Type {
- OUTGOING,
- INCOMING
- }
-
- private Type _type;
- private int _sequence;
-
- TrackerImpl(Type type, int sequence)
- {
- _type = type;
- _sequence = sequence;
- }
-
- boolean isOutgoing()
- {
- return _type == Type.OUTGOING;
- }
-
- int getSequence()
- {
- return _sequence;
- }
-
- public String toString()
- {
- return (isOutgoing() ? "O:" : "I:") + Integer.toString(_sequence);
- }
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
deleted file mode 100644
index c3a08ea..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.messenger.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-/**
- * Transform
- *
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class Transform
-{
-
- private static class Rule {
-
- String _pattern;
- String _substitution;
-
- Pattern _compiled;
- StringBuilder _sb = new StringBuilder();
- boolean _matched = false;
- String _result = null;
-
- Rule(String pattern, String substitution)
- {
- _pattern = pattern;
- _substitution = substitution;
- _compiled = Pattern.compile(_pattern.replace("*", "(.*)").replace("%", "([^/]*)"));
- }
-
- boolean apply(String src) {
- _matched = false;
- _result = null;
- Matcher m = _compiled.matcher(src);
- if (m.matches()) {
- _matched = true;
- if (_substitution != null) {
- _sb.setLength(0);
- int limit = _substitution.length();
- int idx = 0;
- while (idx < limit) {
- char c = _substitution.charAt(idx);
- switch (c) {
- case '$':
- idx++;
- if (idx < limit) {
- c = _substitution.charAt(idx);
- } else {
- throw new IllegalStateException("substition index truncated");
- }
-
- if (c == '$') {
- _sb.append(c);
- idx++;
- } else {
- int num = 0;
- while (Character.isDigit(c)) {
- num *= 10;
- num += c - '0';
- idx++;
- c = idx < limit ? _substitution.charAt(idx) : '\0';
- }
- if (num > 0) {
- _sb.append(m.group(num));
- } else {
- throw new IllegalStateException
- ("bad substitution index at character[" +
- idx + "]: " + _substitution);
- }
- }
- break;
- default:
- _sb.append(c);
- idx++;
- break;
- }
- }
- _result = _sb.toString();
- }
- }
-
- return _matched;
- }
-
- boolean matched() {
- return _matched;
- }
-
- String result() {
- return _result;
- }
-
- }
-
- private List<Rule> _rules = new ArrayList<Rule>();
- private Rule _matched = null;
-
- public void rule(String pattern, String substitution)
- {
- _rules.add(new Rule(pattern, substitution));
- }
-
- public boolean apply(String src)
- {
- _matched = null;
-
- for (Rule rule: _rules) {
- if (rule.apply(src)) {
- _matched = rule;
- break;
- }
- }
-
- return _matched != null;
- }
-
- public boolean matched()
- {
- return _matched != null;
- }
-
- public String result()
- {
- return _matched != null ? _matched.result() : null;
- }
-
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index c5abbd8..b9fd1de 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -40,7 +40,6 @@
import org.apache.qpid.proton.reactor.impl.ReactorImpl;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;
-import org.apache.qpid.proton.messenger.impl.Address;
@SuppressWarnings("deprecation")
public class AcceptorImpl implements Acceptor {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java
similarity index 97%
rename from proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
rename to proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java
index 27b0d39..619912f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java
@@ -18,13 +18,12 @@
* under the License.
*
*/
-package org.apache.qpid.proton.messenger.impl;
+package org.apache.qpid.proton.reactor.impl;
/**
* Address
*
- * @deprecated Messenger will be removed from upcoming proton-j releases.
*/
public class Address
{
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index 2dd7e1a..1282083 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -45,7 +45,6 @@
import org.apache.qpid.proton.reactor.Selector;
import org.apache.qpid.proton.reactor.Acceptor;
import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
-import org.apache.qpid.proton.messenger.impl.Address;
@SuppressWarnings("deprecation")
public class IOHandler extends BaseHandler {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 30c8df9..9d38b85 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -51,7 +51,6 @@
import org.apache.qpid.proton.reactor.Selectable.Callback;
import org.apache.qpid.proton.reactor.Selector;
import org.apache.qpid.proton.reactor.Task;
-import org.apache.qpid.proton.messenger.impl.Address;
@SuppressWarnings("deprecation")
public class ReactorImpl implements Reactor, Extendable {
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java
similarity index 98%
rename from proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java
rename to proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java
index 77154b6..6b2da05 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AddressTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.qpid.proton.messenger.impl;
+package org.apache.qpid.proton.reactor.impl;
import static org.junit.Assert.*;
diff --git a/tests/java/pythonTests.ignore b/tests/java/pythonTests.ignore
index 7911176..1fafa02 100644
--- a/tests/java/pythonTests.ignore
+++ b/tests/java/pythonTests.ignore
@@ -1,4 +1 @@
proton_tests.reactor_interop.*
-proton_tests.soak.*
-proton_tests.ssl.SslTest.test_defaults_messenger_app
-proton_tests.ssl.SslTest.test_server_authentication_messenger_app
diff --git a/tests/java/shim/binding/proton/__init__.py b/tests/java/shim/binding/proton/__init__.py
index d3f6922..ecb480f 100644
--- a/tests/java/shim/binding/proton/__init__.py
+++ b/tests/java/shim/binding/proton/__init__.py
@@ -23,7 +23,6 @@
The proton APIs consist of the following classes:
- - L{Messenger} -- A messaging endpoint.
- L{Message} -- A class for creating and/or accessing AMQP message content.
- L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
data.
@@ -148,13 +147,6 @@
"""
pass
-class MessengerException(ProtonException):
- """
- The root of the messenger exception hierarchy. All exceptions
- generated by the messenger class derive from this exception.
- """
- pass
-
class MessageException(ProtonException):
"""
The MessageException class is the root of the message exception
@@ -168,618 +160,10 @@
PN_INTR: Interrupt
}
-PENDING = Constant("PENDING")
-ACCEPTED = Constant("ACCEPTED")
-REJECTED = Constant("REJECTED")
-RELEASED = Constant("RELEASED")
-MODIFIED = Constant("MODIFIED")
-ABORTED = Constant("ABORTED")
-SETTLED = Constant("SETTLED")
-
-STATUSES = {
- PN_STATUS_ABORTED: ABORTED,
- PN_STATUS_ACCEPTED: ACCEPTED,
- PN_STATUS_REJECTED: REJECTED,
- PN_STATUS_RELEASED: RELEASED,
- PN_STATUS_MODIFIED: MODIFIED,
- PN_STATUS_PENDING: PENDING,
- PN_STATUS_SETTLED: SETTLED,
- PN_STATUS_UNKNOWN: None
- }
AUTOMATIC = Constant("AUTOMATIC")
MANUAL = Constant("MANUAL")
-class Messenger(object):
- """
- The L{Messenger} class defines a high level interface for sending
- and receiving L{Messages<Message>}. Every L{Messenger} contains a
- single logical queue of incoming messages and a single logical queue
- of outgoing messages. These messages in these queues may be destined
- for, or originate from, a variety of addresses.
-
- The messenger interface is single-threaded. All methods
- except one (L{interrupt}) are intended to be used from within
- the messenger thread.
-
-
- Address Syntax
- ==============
-
- An address has the following form::
-
- [ amqp[s]:// ] [user[:password]@] domain [/[name]]
-
- Where domain can be one of::
-
- host | host:port | ip | ip:port | name
-
- The following are valid examples of addresses:
-
- - example.org
- - example.org:1234
- - amqp://example.org
- - amqps://example.org
- - example.org/incoming
- - amqps://example.org/outgoing
- - amqps://fred:trustno1@example.org
- - 127.0.0.1:1234
- - amqps://127.0.0.1:1234
-
- Sending & Receiving Messages
- ============================
-
- The L{Messenger} class works in conjuction with the L{Message} class. The
- L{Message} class is a mutable holder of message content.
-
- The L{put} method copies its L{Message} to the outgoing queue, and may
- send queued messages if it can do so without blocking. The L{send}
- method blocks until it has sent the requested number of messages,
- or until a timeout interrupts the attempt.
-
-
- >>> message = Message()
- >>> for i in range(3):
- ... message.address = "amqp://host/queue"
- ... message.subject = "Hello World %i" % i
- ... messenger.put(message)
- >>> messenger.send()
-
- Similarly, the L{recv} method receives messages into the incoming
- queue, and may block as it attempts to receive the requested number
- of messages, or until timeout is reached. It may receive fewer
- than the requested number. The L{get} method pops the
- eldest L{Message} off the incoming queue and copies it into the L{Message}
- object that you supply. It will not block.
-
-
- >>> message = Message()
- >>> messenger.recv(10):
- >>> while messenger.incoming > 0:
- ... messenger.get(message)
- ... print message.subject
- Hello World 0
- Hello World 1
- Hello World 2
-
- The blocking flag allows you to turn off blocking behavior entirely,
- in which case L{send} and L{recv} will do whatever they can without
- blocking, and then return. You can then look at the number
- of incoming and outgoing messages to see how much outstanding work
- still remains.
- """
-
- def __init__(self, name=None):
- """
- Construct a new L{Messenger} with the given name. The name has
- global scope. If a NULL name is supplied, a UUID based name will
- be chosen.
-
- @type name: string
- @param name: the name of the messenger or None
-
- """
- self._mng = pn_messenger(name)
- self._selectables = {}
-
- def __del__(self):
- """
- Destroy the L{Messenger}. This will close all connections that
- are managed by the L{Messenger}. Call the L{stop} method before
- destroying the L{Messenger}.
- """
- if hasattr(self, "_mng"):
- pn_messenger_free(self._mng)
- del self._mng
-
- def _check(self, err):
- if err < 0:
- if (err == PN_INPROGRESS):
- return
- exc = EXCEPTIONS.get(err, MessengerException)
- raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
- else:
- return err
-
- @property
- def name(self):
- """
- The name of the L{Messenger}.
- """
- return pn_messenger_name(self._mng)
-
- def _get_certificate(self):
- return pn_messenger_get_certificate(self._mng)
-
- def _set_certificate(self, value):
- self._check(pn_messenger_set_certificate(self._mng, value))
-
- certificate = property(_get_certificate, _set_certificate,
- doc="""
-Path to a certificate file for the L{Messenger}. This certificate is
-used when the L{Messenger} accepts or establishes SSL/TLS connections.
-This property must be specified for the L{Messenger} to accept
-incoming SSL/TLS connections and to establish client authenticated
-outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
-connections do not require this property.
-""")
-
- def _get_private_key(self):
- return pn_messenger_get_private_key(self._mng)
-
- def _set_private_key(self, value):
- self._check(pn_messenger_set_private_key(self._mng, value))
-
- private_key = property(_get_private_key, _set_private_key,
- doc="""
-Path to a private key file for the L{Messenger's<Messenger>}
-certificate. This property must be specified for the L{Messenger} to
-accept incoming SSL/TLS connections and to establish client
-authenticated outgoing SSL/TLS connection. Non client authenticated
-SSL/TLS connections do not require this property.
-""")
-
- def _get_password(self):
- return pn_messenger_get_password(self._mng)
-
- def _set_password(self, value):
- self._check(pn_messenger_set_password(self._mng, value))
-
- password = property(_get_password, _set_password,
- doc="""
-This property contains the password for the L{Messenger.private_key}
-file, or None if the file is not encrypted.
-""")
-
- def _get_trusted_certificates(self):
- return pn_messenger_get_trusted_certificates(self._mng)
-
- def _set_trusted_certificates(self, value):
- self._check(pn_messenger_set_trusted_certificates(self._mng, value))
-
- trusted_certificates = property(_get_trusted_certificates,
- _set_trusted_certificates,
- doc="""
-A path to a database of trusted certificates for use in verifying the
-peer on an SSL/TLS connection. If this property is None, then the peer
-will not be verified.
-""")
-
- def _get_timeout(self):
- t = pn_messenger_get_timeout(self._mng)
- if t == -1:
- return None
- else:
- return millis2secs(t)
-
- def _set_timeout(self, value):
- if value is None:
- t = -1
- else:
- t = secs2millis(value)
- self._check(pn_messenger_set_timeout(self._mng, t))
-
- timeout = property(_get_timeout, _set_timeout,
- doc="""
-The timeout property contains the default timeout for blocking
-operations performed by the L{Messenger}.
-""")
-
- def _is_blocking(self):
- return pn_messenger_is_blocking(self._mng)
-
- def _set_blocking(self, b):
- self._check(pn_messenger_set_blocking(self._mng, b))
-
- blocking = property(_is_blocking, _set_blocking,
- doc="""
-Enable or disable blocking behavior during L{Message} sending
-and receiving. This affects every blocking call, with the
-exception of L{work}. Currently, the affected calls are
-L{send}, L{recv}, and L{stop}.
-""")
-
- def _is_passive(self):
- return pn_messenger_is_passive(self._mng)
-
- def _set_passive(self, b):
- self._check(pn_messenger_set_passive(self._mng, b))
-
- passive = property(_is_passive, _set_passive,
- doc="""
-When passive is set to true, Messenger will not attempt to perform I/O
-internally. In this mode it is necessary to use the selectables API to
-drive any I/O needed to perform requested actions. In this mode
-Messenger will never block.
-""")
-
- def _get_incoming_window(self):
- return pn_messenger_get_incoming_window(self._mng)
-
- def _set_incoming_window(self, window):
- self._check(pn_messenger_set_incoming_window(self._mng, window))
-
- incoming_window = property(_get_incoming_window, _set_incoming_window,
- doc="""
-The incoming tracking window for the messenger. The messenger will
-track the remote status of this many incoming deliveries after they
-have been accepted or rejected. Defaults to zero.
-
-L{Messages<Message>} enter this window only when you take them into your application
-using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
-without explicitly accepting or rejecting the oldest message, then the
-message that passes beyond the edge of the incoming window will be assigned
-the default disposition of its link.
-""")
-
- def _get_outgoing_window(self):
- return pn_messenger_get_outgoing_window(self._mng)
-
- def _set_outgoing_window(self, window):
- self._check(pn_messenger_set_outgoing_window(self._mng, window))
-
- outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
- doc="""
-The outgoing tracking window for the messenger. The messenger will
-track the remote status of this many outgoing deliveries after calling
-send. Defaults to zero.
-
-A L{Message} enters this window when you call the put() method with the
-message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
-times, status information will no longer be available for the
-first message.
-""")
-
- def start(self):
- """
- Currently a no-op placeholder.
- For future compatibility, do not L{send} or L{recv} messages
- before starting the L{Messenger}.
- """
- self._check(pn_messenger_start(self._mng))
-
- def stop(self):
- """
- Transitions the L{Messenger} to an inactive state. An inactive
- L{Messenger} will not send or receive messages from its internal
- queues. A L{Messenger} should be stopped before being discarded to
- ensure a clean shutdown handshake occurs on any internally managed
- connections.
- """
- self._check(pn_messenger_stop(self._mng))
-
- @property
- def stopped(self):
- """
- Returns true iff a L{Messenger} is in the stopped state.
- This function does not block.
- """
- return pn_messenger_stopped(self._mng)
-
- def subscribe(self, source):
- """
- Subscribes the L{Messenger} to messages originating from the
- specified source. The source is an address as specified in the
- L{Messenger} introduction with the following addition. If the
- domain portion of the address begins with the '~' character, the
- L{Messenger} will interpret the domain as host/port, bind to it,
- and listen for incoming messages. For example "~0.0.0.0",
- "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
- local interface and listen for incoming messages with the last
- variant only permitting incoming SSL connections.
-
- @type source: string
- @param source: the source of messages to subscribe to
- """
- sub_impl = pn_messenger_subscribe(self._mng, source)
- if not sub_impl:
- self._check(pn_error_code(pn_messenger_error(self._mng)))
- raise MessengerException("Cannot subscribe to %s"%source)
- return Subscription(sub_impl)
-
- def put(self, message):
- """
- Places the content contained in the message onto the outgoing
- queue of the L{Messenger}. This method will never block, however
- it will send any unblocked L{Messages<Message>} in the outgoing
- queue immediately and leave any blocked L{Messages<Message>}
- remaining in the outgoing queue. The L{send} call may be used to
- block until the outgoing queue is empty. The L{outgoing} property
- may be used to check the depth of the outgoing queue.
-
- When the content in a given L{Message} object is copied to the outgoing
- message queue, you may then modify or discard the L{Message} object
- without having any impact on the content in the outgoing queue.
-
- This method returns an outgoing tracker for the L{Message}. The tracker
- can be used to determine the delivery status of the L{Message}.
-
- @type message: Message
- @param message: the message to place in the outgoing queue
- @return: a tracker
- """
- message._pre_encode()
- self._check(pn_messenger_put(self._mng, message._msg))
- return pn_messenger_outgoing_tracker(self._mng)
-
- def status(self, tracker):
- """
- Gets the last known remote state of the delivery associated with
- the given tracker.
-
- @type tracker: tracker
- @param tracker: the tracker whose status is to be retrieved
-
- @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED
- """
- disp = pn_messenger_status(self._mng, tracker);
- return STATUSES.get(disp, disp)
-
- def buffered(self, tracker):
- """
- Checks if the delivery associated with the given tracker is still
- waiting to be sent.
-
- @type tracker: tracker
- @param tracker: the tracker whose status is to be retrieved
-
- @return: true if delivery is still buffered
- """
- return pn_messenger_buffered(self._mng, tracker);
-
- def settle(self, tracker=None):
- """
- Frees a L{Messenger} from tracking the status associated with a given
- tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
- to the most recent will be settled.
- """
- if tracker is None:
- tracker = pn_messenger_outgoing_tracker(self._mng)
- flags = PN_CUMULATIVE
- else:
- flags = 0
- self._check(pn_messenger_settle(self._mng, tracker, flags))
-
- def send(self, n=-1):
- """
- This call will block until the indicated number of L{messages<Message>}
- have been sent, or until the operation times out. If n is -1 this call will
- block until all outgoing L{messages<Message>} have been sent. If n is 0 then
- this call will send whatever it can without blocking.
- """
- self._check(pn_messenger_send(self._mng, n))
-
- def recv(self, n=None):
- """
- Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
- for I{n} is supplied, this call will receive as many L{messages<Message>} as it
- can buffer internally. If the L{Messenger} is in blocking mode, this
- call will block until at least one L{Message} is available in the
- incoming queue.
- """
- if n is None:
- n = -1
- self._check(pn_messenger_recv(self._mng, n))
-
- def work(self, timeout=None):
- """
- Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
- This will block for the indicated timeout.
- This method may also do I/O work other than sending and receiving
- L{messages<Message>}. For example, closing connections after messenger.L{stop}()
- has been called.
- """
- if timeout is None:
- t = -1
- else:
- t = secs2millis(timeout)
- err = pn_messenger_work(self._mng, t)
- if (err == PN_TIMEOUT):
- return False
- else:
- self._check(err)
- return True
-
- @property
- def receiving(self):
- return pn_messenger_receiving(self._mng)
-
- def interrupt(self):
- """
- The L{Messenger} interface is single-threaded.
- This is the only L{Messenger} function intended to be called
- from outside of the L{Messenger} thread.
- Call this from a non-messenger thread to interrupt
- a L{Messenger} that is blocking.
- This will cause any in-progress blocking call to throw
- the L{Interrupt} exception. If there is no currently blocking
- call, then the next blocking call will be affected, even if it
- is within the same thread that interrupt was called from.
- """
- self._check(pn_messenger_interrupt(self._mng))
-
- def get(self, message=None):
- """
- Moves the message from the head of the incoming message queue into
- the supplied message object. Any content in the message will be
- overwritten.
-
- A tracker for the incoming L{Message} is returned. The tracker can
- later be used to communicate your acceptance or rejection of the
- L{Message}.
-
- If None is passed in for the L{Message} object, the L{Message}
- popped from the head of the queue is discarded.
-
- @type message: Message
- @param message: the destination message object
- @return: a tracker
- """
- if message is None:
- impl = None
- else:
- impl = message._msg
- self._check(pn_messenger_get(self._mng, impl))
- if message is not None:
- message._post_decode()
- return pn_messenger_incoming_tracker(self._mng)
-
- def accept(self, tracker=None):
- """
- Signal the sender that you have acted on the L{Message}
- pointed to by the tracker. If no tracker is supplied,
- then all messages that have been returned by the L{get}
- method are accepted, except those that have already been
- auto-settled by passing beyond your incoming window size.
-
- @type tracker: tracker
- @param tracker: a tracker as returned by get
- """
- if tracker is None:
- tracker = pn_messenger_incoming_tracker(self._mng)
- flags = PN_CUMULATIVE
- else:
- flags = 0
- self._check(pn_messenger_accept(self._mng, tracker, flags))
-
- def reject(self, tracker=None):
- """
- Rejects the L{Message} indicated by the tracker. If no tracker
- is supplied, all messages that have been returned by the L{get}
- method are rejected, except those that have already been auto-settled
- by passing beyond your outgoing window size.
-
- @type tracker: tracker
- @param tracker: a tracker as returned by get
- """
- if tracker is None:
- tracker = pn_messenger_incoming_tracker(self._mng)
- flags = PN_CUMULATIVE
- else:
- flags = 0
- self._check(pn_messenger_reject(self._mng, tracker, flags))
-
- @property
- def outgoing(self):
- """
- The outgoing queue depth.
- """
- return pn_messenger_outgoing(self._mng)
-
- @property
- def incoming(self):
- """
- The incoming queue depth.
- """
- return pn_messenger_incoming(self._mng)
-
- def route(self, pattern, address):
- """
- Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
-
- The route procedure may be used to influence how a L{Messenger} will
- internally treat a given address or class of addresses. Every call
- to the route procedure will result in L{Messenger} appending a routing
- rule to its internal routing table.
-
- Whenever a L{Message} is presented to a L{Messenger} for delivery, it
- will match the address of this message against the set of routing
- rules in order. The first rule to match will be triggered, and
- instead of routing based on the address presented in the message,
- the L{Messenger} will route based on the address supplied in the rule.
-
- The pattern matching syntax supports two types of matches, a '%'
- will match any character except a '/', and a '*' will match any
- character including a '/'.
-
- A routing address is specified as a normal AMQP address, however it
- may additionally use substitution variables from the pattern match
- that triggered the rule.
-
- Any message sent to "foo" will be routed to "amqp://foo.com":
-
- >>> messenger.route("foo", "amqp://foo.com");
-
- Any message sent to "foobar" will be routed to
- "amqp://foo.com/bar":
-
- >>> messenger.route("foobar", "amqp://foo.com/bar");
-
- Any message sent to bar/<path> will be routed to the corresponding
- path within the amqp://bar.com domain:
-
- >>> messenger.route("bar/*", "amqp://bar.com/$1");
-
- Route all L{messages<Message>} over TLS:
-
- >>> messenger.route("amqp:*", "amqps:$1")
-
- Supply credentials for foo.com:
-
- >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
-
- Supply credentials for all domains:
-
- >>> messenger.route("amqp://*", "amqp://user:password@$1");
-
- Route all addresses through a single proxy while preserving the
- original destination:
-
- >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
-
- Route any address through a single broker:
-
- >>> messenger.route("*", "amqp://user:password@broker/$1");
- """
- self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
-
- def rewrite(self, pattern, address):
- """
- Similar to route(), except that the destination of
- the L{Message} is determined before the message address is rewritten.
-
- The outgoing address is only rewritten after routing has been
- finalized. If a message has an outgoing address of
- "amqp://0.0.0.0:5678", and a rewriting rule that changes its
- outgoing address to "foo", it will still arrive at the peer that
- is listening on "amqp://0.0.0.0:5678", but when it arrives there,
- the receiver will see its outgoing address as "foo".
-
- The default rewrite rule removes username and password from addresses
- before they are transmitted.
- """
- self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
-
- def selectable(self):
- return Selectable.wrap(pn_messenger_selectable(self._mng))
-
- @property
- def deadline(self):
- tstamp = pn_messenger_deadline(self._mng)
- if tstamp:
- return millis2secs(tstamp)
- else:
- return None
class Message(object):
"""The L{Message} class is a mutable holder of message content.
@@ -4259,8 +3643,6 @@
"Link",
"Message",
"MessageException",
- "Messenger",
- "MessengerException",
"ProtonException",
"VERSION_MAJOR",
"VERSION_MINOR",
diff --git a/tests/java/shim/cmessenger.py b/tests/java/shim/cmessenger.py
deleted file mode 100644
index 249e0dc..0000000
--- a/tests/java/shim/cmessenger.py
+++ /dev/null
@@ -1,225 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from org.apache.qpid.proton import Proton
-from org.apache.qpid.proton.messenger import Messenger, Status
-from org.apache.qpid.proton import InterruptException, TimeoutException
-
-from cerror import *
-
-# from proton/messenger.h
-PN_STATUS_UNKNOWN = 0
-PN_STATUS_PENDING = 1
-PN_STATUS_ACCEPTED = 2
-PN_STATUS_REJECTED = 3
-PN_STATUS_RELEASED = 4
-PN_STATUS_MODIFIED = 5
-PN_STATUS_ABORTED = 6
-PN_STATUS_SETTLED = 7
-
-PN_CUMULATIVE = 1
-
-class pn_messenger_wrapper:
-
- def __init__(self, impl):
- self.impl = impl
- self.error = pn_error(0, None)
-
-def pn_messenger(name):
- if name is None:
- return pn_messenger_wrapper(Proton.messenger())
- else:
- return pn_messenger_wrapper(Proton.messenger(name))
-
-def pn_messenger_error(m):
- return m.error
-
-def pn_messenger_set_timeout(m, t):
- m.impl.setTimeout(t)
- return 0
-
-def pn_messenger_set_blocking(m, b):
- m.impl.setBlocking(b)
- return 0
-
-def pn_messenger_set_certificate(m, c):
- m.impl.setCertificate(c)
- return 0
-
-def pn_messenger_set_private_key(m, p):
- m.impl.setPrivateKey(p)
- return 0
-
-def pn_messenger_set_password(m, p):
- m.impl.setPassword(p)
- return 0
-
-def pn_messenger_set_trusted_certificates(m, t):
- m.impl.setTrustedCertificates(t)
- return 0
-
-def pn_messenger_set_incoming_window(m, w):
- m.impl.setIncomingWindow(w)
- return 0
-
-def pn_messenger_set_outgoing_window(m, w):
- m.impl.setOutgoingWindow(w)
- return 0
-
-def pn_messenger_start(m):
- m.impl.start()
- return 0
-
-# XXX: ???
-def pn_messenger_work(m, t):
- try:
- if m.impl.work(t):
- return 1
- else:
- return PN_TIMEOUT
- except InterruptException, e:
- return PN_INTR
-
-class pn_subscription:
-
- def __init__(self):
- pass
-
-def pn_messenger_subscribe(m, source):
- m.impl.subscribe(source)
- return pn_subscription()
-
-def pn_messenger_route(m, pattern, address):
- m.impl.route(pattern, address)
- return 0
-
-def pn_messenger_rewrite(m, pattern, address):
- m.impl.rewrite(pattern, address)
- return 0
-
-def pn_messenger_interrupt(m):
- m.impl.interrupt()
- return 0
-
-def pn_messenger_buffered(m, t):
- raise Skipped()
-
-from org.apache.qpid.proton.engine import TransportException
-
-def pn_messenger_stop(m):
- m.impl.stop()
- return 0
-
-def pn_messenger_stopped(m):
- return m.impl.stopped()
-
-def pn_messenger_put(m, msg):
- msg.pre_encode()
- m.impl.put(msg.impl)
- return 0
-
-def pn_messenger_outgoing_tracker(m):
- return m.impl.outgoingTracker()
-
-def pn_messenger_send(m, n):
- try:
- m.impl.send(n)
- return 0
- except InterruptException, e:
- return PN_INTR
- except TimeoutException, e:
- return PN_TIMEOUT
-
-def pn_messenger_recv(m, n):
- try:
- m.impl.recv(n)
- return 0
- except InterruptException, e:
- return PN_INTR
- except TimeoutException, e:
- return PN_TIMEOUT
-
-def pn_messenger_receiving(m):
- return m.impl.receiving()
-
-def pn_messenger_incoming(m):
- return m.impl.incoming()
-
-def pn_messenger_outgoing(m):
- return m.impl.outgoing()
-
-def pn_messenger_get(m, msg):
- mimpl = m.impl.get()
- if msg:
- msg.decode(mimpl)
- return 0
-
-def pn_messenger_incoming_tracker(m):
- return m.impl.incomingTracker()
-
-def pn_messenger_accept(m, tracker, flags):
- if flags:
- m.impl.accept(tracker, Messenger.CUMULATIVE)
- else:
- m.impl.accept(tracker, 0)
- return 0
-
-def pn_messenger_reject(m, tracker, flags):
- if flags:
- m.impl.reject(tracker, Messenger.CUMULATIVE)
- else:
- m.impl.reject(tracker, 0)
- return 0
-
-def pn_messenger_settle(m, tracker, flags):
- if flags:
- m.impl.settle(tracker, Messenger.CUMULATIVE)
- else:
- m.impl.settle(tracker, 0)
- return 0
-
-STATUS_P2J = {
- PN_STATUS_UNKNOWN: Status.UNKNOWN,
- PN_STATUS_PENDING: Status.PENDING,
- PN_STATUS_ACCEPTED: Status.ACCEPTED,
- PN_STATUS_REJECTED: Status.REJECTED,
- PN_STATUS_RELEASED: Status.RELEASED,
- PN_STATUS_MODIFIED: Status.MODIFIED,
- PN_STATUS_ABORTED: Status.ABORTED,
- PN_STATUS_SETTLED: Status.SETTLED
-}
-
-STATUS_J2P = {
- Status.UNKNOWN: PN_STATUS_UNKNOWN,
- Status.PENDING: PN_STATUS_PENDING,
- Status.ACCEPTED: PN_STATUS_ACCEPTED,
- Status.REJECTED: PN_STATUS_REJECTED,
- Status.RELEASED: PN_STATUS_RELEASED,
- Status.MODIFIED: PN_STATUS_MODIFIED,
- Status.ABORTED: PN_STATUS_ABORTED,
- Status.SETTLED: PN_STATUS_SETTLED
-}
-
-def pn_messenger_status(m, tracker):
- return STATUS_J2P[m.impl.getStatus(tracker)]
-
-def pn_messenger_set_passive(m, passive):
- raise Skipped()
-
-def pn_messenger_selectable(m):
- raise Skipped()
diff --git a/tests/java/shim/cproton.py b/tests/java/shim/cproton.py
index d5ed574..ec7feec 100644
--- a/tests/java/shim/cproton.py
+++ b/tests/java/shim/cproton.py
@@ -35,7 +35,6 @@
from cengine import *
from csasl import *
from cssl import *
-from cmessenger import *
from cmessage import *
from curl import *
from creactor import *
diff --git a/tests/java/shim/curl.py b/tests/java/shim/curl.py
index d4d3d37..2b8c9c8 100644
--- a/tests/java/shim/curl.py
+++ b/tests/java/shim/curl.py
@@ -17,7 +17,7 @@
# under the License
#
-from org.apache.qpid.proton.messenger.impl import Address
+from org.apache.qpid.proton.reactor.impl import Address
def pn_url():
return Address()
diff --git a/tests/python/proton_tests/__init__.py b/tests/python/proton_tests/__init__.py
index 66ce650..647cbf5 100644
--- a/tests/python/proton_tests/__init__.py
+++ b/tests/python/proton_tests/__init__.py
@@ -22,11 +22,9 @@
import proton_tests.message
import proton_tests.handler
import proton_tests.reactor
-import proton_tests.messenger
import proton_tests.sasl
import proton_tests.transport
import proton_tests.ssl
import proton_tests.interop
-import proton_tests.soak
import proton_tests.url
import proton_tests.utils
diff --git a/tests/python/proton_tests/common.py b/tests/python/proton_tests/common.py
index aaefccd..02de4ff 100644
--- a/tests/python/proton_tests/common.py
+++ b/tests/python/proton_tests/common.py
@@ -264,308 +264,3 @@
def on_delivery(self, event):
event.delivery.settle()
-#
-# Classes that wrap the messenger applications msgr-send and msgr-recv.
-# These applications reside in the tests/tools/apps directory
-#
-
-class MessengerApp(object):
- """ Interface to control a MessengerApp """
- def __init__(self):
- self._cmdline = None
- # options common to Receivers and Senders:
- self.ca_db = None
- self.certificate = None
- self.privatekey = None
- self.password = None
- self._output = None
-
- def start(self, verbose=False):
- """ Begin executing the test """
- cmd = self.cmdline()
- self._verbose = verbose
- if self._verbose:
- print("COMMAND='%s'" % str(cmd))
- #print("ENV='%s'" % str(os.environ.copy()))
- try:
- # Handle python launch by replacing script 'filename' with
- # 'python abspath-to-filename' in cmdline arg list.
- if cmd[0].endswith('.py'):
- foundfile = findfileinpath(cmd[0], os.getenv('PATH'))
- if foundfile is None:
- msg = "Unable to locate file '%s' in PATH" % cmd[0]
- raise Skipped("Skipping test - %s" % msg)
-
- del cmd[0:1]
- cmd.insert(0, foundfile)
- cmd.insert(0, sys.executable)
- self._process = Popen(cmd, stdout=PIPE, stderr=STDOUT,
- bufsize=4096, universal_newlines=True)
- except OSError:
- e = sys.exc_info()[1]
- print("ERROR: '%s'" % e)
- msg = "Unable to execute command '%s', is it in your PATH?" % cmd[0]
-
- # NOTE(flaper87): Skip the test if the command is not found.
- if e.errno == 2:
- raise Skipped("Skipping test - %s" % msg)
- assert False, msg
-
- self._ready() # wait for it to initialize
-
- def stop(self):
- """ Signal the client to start clean shutdown """
- pass
-
- def wait(self):
- """ Wait for client to complete """
- self._output = self._process.communicate()
- if self._verbose:
- print("OUTPUT='%s'" % self.stdout())
-
- def status(self):
- """ Return status from client process """
- return self._process.returncode
-
- def stdout(self):
- #self._process.communicate()[0]
- if not self._output or not self._output[0]:
- return "*** NO STDOUT ***"
- return self._output[0]
-
- def stderr(self):
- if not self._output or not self._output[1]:
- return "*** NO STDERR ***"
- return self._output[1]
-
- def cmdline(self):
- if not self._cmdline:
- self._build_command()
- return self._cmdline
-
- def _build_command(self):
- assert False, "_build_command() needs override"
-
- def _ready(self):
- assert False, "_ready() needs override"
-
- def _do_common_options(self):
- """ Common option handling """
- if self.ca_db is not None:
- self._cmdline.append("-T")
- self._cmdline.append(str(self.ca_db))
- if self.certificate is not None:
- self._cmdline.append("-C")
- self._cmdline.append(str(self.certificate))
- if self.privatekey is not None:
- self._cmdline.append("-K")
- self._cmdline.append(str(self.privatekey))
- if self.password is not None:
- self._cmdline.append("-P")
- self._cmdline.append("pass:" + str(self.password))
-
-
-class MessengerSender(MessengerApp):
- """ Interface to configure a sending MessengerApp """
- def __init__(self):
- MessengerApp.__init__(self)
- self._command = None
- # @todo make these properties
- self.targets = []
- self.send_count = None
- self.msg_size = None
- self.send_batch = None
- self.outgoing_window = None
- self.report_interval = None
- self.get_reply = False
- self.timeout = None
- self.incoming_window = None
- self.recv_count = None
- self.name = None
-
- # command string?
- def _build_command(self):
- self._cmdline = self._command
- self._do_common_options()
- assert self.targets, "Missing targets, required for sender!"
- self._cmdline.append("-a")
- self._cmdline.append(",".join(self.targets))
- if self.send_count is not None:
- self._cmdline.append("-c")
- self._cmdline.append(str(self.send_count))
- if self.msg_size is not None:
- self._cmdline.append("-b")
- self._cmdline.append(str(self.msg_size))
- if self.send_batch is not None:
- self._cmdline.append("-p")
- self._cmdline.append(str(self.send_batch))
- if self.outgoing_window is not None:
- self._cmdline.append("-w")
- self._cmdline.append(str(self.outgoing_window))
- if self.report_interval is not None:
- self._cmdline.append("-e")
- self._cmdline.append(str(self.report_interval))
- if self.get_reply:
- self._cmdline.append("-R")
- if self.timeout is not None:
- self._cmdline.append("-t")
- self._cmdline.append(str(self.timeout))
- if self.incoming_window is not None:
- self._cmdline.append("-W")
- self._cmdline.append(str(self.incoming_window))
- if self.recv_count is not None:
- self._cmdline.append("-B")
- self._cmdline.append(str(self.recv_count))
- if self.name is not None:
- self._cmdline.append("-N")
- self._cmdline.append(str(self.name))
-
- def _ready(self):
- pass
-
-
-class MessengerReceiver(MessengerApp):
- """ Interface to configure a receiving MessengerApp """
- def __init__(self):
- MessengerApp.__init__(self)
- self._command = None
- # @todo make these properties
- self.subscriptions = []
- self.receive_count = None
- self.recv_count = None
- self.incoming_window = None
- self.timeout = None
- self.report_interval = None
- self.send_reply = False
- self.outgoing_window = None
- self.forwards = []
- self.name = None
-
- # command string?
- def _build_command(self):
- self._cmdline = self._command
- self._do_common_options()
- self._cmdline += ["-X", "READY"]
- assert self.subscriptions, "Missing subscriptions, required for receiver!"
- self._cmdline.append("-a")
- self._cmdline.append(",".join(self.subscriptions))
- if self.receive_count is not None:
- self._cmdline.append("-c")
- self._cmdline.append(str(self.receive_count))
- if self.recv_count is not None:
- self._cmdline.append("-b")
- self._cmdline.append(str(self.recv_count))
- if self.incoming_window is not None:
- self._cmdline.append("-w")
- self._cmdline.append(str(self.incoming_window))
- if self.timeout is not None:
- self._cmdline.append("-t")
- self._cmdline.append(str(self.timeout))
- if self.report_interval is not None:
- self._cmdline.append("-e")
- self._cmdline.append(str(self.report_interval))
- if self.send_reply:
- self._cmdline.append("-R")
- if self.outgoing_window is not None:
- self._cmdline.append("-W")
- self._cmdline.append(str(self.outgoing_window))
- if self.forwards:
- self._cmdline.append("-F")
- self._cmdline.append(",".join(self.forwards))
- if self.name is not None:
- self._cmdline.append("-N")
- self._cmdline.append(str(self.name))
-
- def _ready(self):
- """ wait for subscriptions to complete setup. """
- r = self._process.stdout.readline()
- assert r.strip() == "READY", "Unexpected input while waiting for receiver to initialize: %s" % r
-
-class MessengerSenderC(MessengerSender):
- def __init__(self):
- MessengerSender.__init__(self)
- self._command = ["msgr-send"]
-
-class MessengerSenderValgrind(MessengerSenderC):
- """ Run the C sender under Valgrind
- """
- def __init__(self, suppressions=None):
- if "VALGRIND" not in os.environ:
- raise Skipped("Skipping test - $VALGRIND not set.")
- MessengerSenderC.__init__(self)
- if not suppressions:
- suppressions = os.path.join(os.path.dirname(__file__),
- "valgrind.supp" )
- self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
- "--trace-children=yes", "--leak-check=full",
- "--suppressions=%s" % suppressions] + self._command
-
-class MessengerReceiverC(MessengerReceiver):
- def __init__(self):
- MessengerReceiver.__init__(self)
- self._command = ["msgr-recv"]
-
-class MessengerReceiverValgrind(MessengerReceiverC):
- """ Run the C receiver under Valgrind
- """
- def __init__(self, suppressions=None):
- if "VALGRIND" not in os.environ:
- raise Skipped("Skipping test - $VALGRIND not set.")
- MessengerReceiverC.__init__(self)
- if not suppressions:
- suppressions = os.path.join(os.path.dirname(__file__),
- "valgrind.supp" )
- self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
- "--trace-children=yes", "--leak-check=full",
- "--suppressions=%s" % suppressions] + self._command
-
-class MessengerSenderPython(MessengerSender):
- def __init__(self):
- MessengerSender.__init__(self)
- self._command = ["msgr-send.py"]
-
-
-class MessengerReceiverPython(MessengerReceiver):
- def __init__(self):
- MessengerReceiver.__init__(self)
- self._command = ["msgr-recv.py"]
-
-
-
-class ReactorSenderC(MessengerSender):
- def __init__(self):
- MessengerSender.__init__(self)
- self._command = ["reactor-send"]
-
-class ReactorSenderValgrind(ReactorSenderC):
- """ Run the C sender under Valgrind
- """
- def __init__(self, suppressions=None):
- if "VALGRIND" not in os.environ:
- raise Skipped("Skipping test - $VALGRIND not set.")
- ReactorSenderC.__init__(self)
- if not suppressions:
- suppressions = os.path.join(os.path.dirname(__file__),
- "valgrind.supp" )
- self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
- "--trace-children=yes", "--leak-check=full",
- "--suppressions=%s" % suppressions] + self._command
-
-class ReactorReceiverC(MessengerReceiver):
- def __init__(self):
- MessengerReceiver.__init__(self)
- self._command = ["reactor-recv"]
-
-class ReactorReceiverValgrind(ReactorReceiverC):
- """ Run the C receiver under Valgrind
- """
- def __init__(self, suppressions=None):
- if "VALGRIND" not in os.environ:
- raise Skipped("Skipping test - $VALGRIND not set.")
- ReactorReceiverC.__init__(self)
- if not suppressions:
- suppressions = os.path.join(os.path.dirname(__file__),
- "valgrind.supp" )
- self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
- "--trace-children=yes", "--leak-check=full",
- "--suppressions=%s" % suppressions] + self._command
diff --git a/tests/python/proton_tests/messenger.py b/tests/python/proton_tests/messenger.py
deleted file mode 100644
index 8da068e..0000000
--- a/tests/python/proton_tests/messenger.py
+++ /dev/null
@@ -1,1091 +0,0 @@
-from __future__ import absolute_import
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, sys, traceback
-from . import common
-from proton import *
-from threading import Thread, Event
-from time import sleep, time
-from .common import Skipped
-
-class Test(common.Test):
-
- def setUp(self):
- self.server_credit = 10
- self.server_received = 0
- self.server_finite_credit = False
- self.server = Messenger("server")
- self.server.timeout = self.timeout
- self.server.start()
- self.port = common.free_tcp_port()
- self.server.subscribe("amqp://~127.0.0.1:%d" % self.port)
- self.server_thread = Thread(name="server-thread", target=self.run_server)
- self.server_thread.daemon = True
- self.server_is_running_event = Event()
- self.running = True
- self.server_thread_started = False
-
- self.client = Messenger("client")
- self.client.timeout = self.timeout
-
- def start(self):
- self.server_thread_started = True
- self.server_thread.start()
- self.server_is_running_event.wait(self.timeout)
- self.client.start()
-
- def _safelyStopClient(self):
- self.server.interrupt()
- self.client.stop()
- self.client = None
-
- def tearDown(self):
- try:
- if self.running:
- if not self.server_thread_started: self.start()
- # send a message to cause the server to promptly exit
- self.running = False
- self._safelyStopClient()
- finally:
- self.server_thread.join(self.timeout)
- self.server = None
-
-REJECT_ME = "*REJECT-ME*"
-
-class MessengerTest(Test):
-
- def run_server(self):
- if self.server_finite_credit:
- self._run_server_finite_credit()
- else:
- self._run_server_recv()
-
- def _run_server_recv(self):
- """ Use recv() to replenish credit each time the server waits
- """
- msg = Message()
- try:
- while self.running:
- self.server_is_running_event.set()
- try:
- self.server.recv(self.server_credit)
- self.process_incoming(msg)
- except Interrupt:
- pass
- finally:
- self.server.stop()
- self.running = False
-
- def _run_server_finite_credit(self):
- """ Grant credit once, process until credit runs out
- """
- msg = Message()
- self.server_is_running_event.set()
- try:
- self.server.recv(self.server_credit)
- while self.running:
- try:
- # do not grant additional credit (eg. call recv())
- self.process_incoming(msg)
- self.server.work()
- except Interrupt:
- break
- finally:
- self.server.stop()
- self.running = False
-
- def process_incoming(self, msg):
- while self.server.incoming:
- self.server.get(msg)
- self.server_received += 1
- if msg.body == REJECT_ME:
- self.server.reject()
- else:
- self.server.accept()
- self.dispatch(msg)
-
- def dispatch(self, msg):
- if msg.reply_to:
- msg.address = msg.reply_to
- self.server.put(msg)
- self.server.settle()
-
- def testSendReceive(self, size=None, address_size=None):
- self.start()
- msg = Message()
- if address_size:
- msg.address="amqp://127.0.0.1:%d/%s" % (self.port, "x"*address_size)
- else:
- msg.address="amqp://127.0.0.1:%d" % self.port
- msg.reply_to = "~"
- msg.subject="Hello World!"
- body = "First the world, then the galaxy!"
- if size is not None:
- while len(body) < size:
- body = 2*body
- body = body[:size]
- msg.body = body
- self.client.put(msg)
- self.client.send()
-
- reply = Message()
- self.client.recv(1)
- assert self.client.incoming == 1, self.client.incoming
- self.client.get(reply)
-
- assert reply.subject == "Hello World!"
- rbod = reply.body
- assert rbod == body, (rbod, body)
-
- def testSendReceive1K(self):
- self.testSendReceive(1024)
-
- def testSendReceive2K(self):
- self.testSendReceive(2*1024)
-
- def testSendReceive4K(self):
- self.testSendReceive(4*1024)
-
- def testSendReceive10K(self):
- self.testSendReceive(10*1024)
-
- def testSendReceive100K(self):
- self.testSendReceive(100*1024)
-
- def testSendReceive1M(self):
- self.testSendReceive(1024*1024)
-
- def testSendReceiveLargeAddress(self):
- self.testSendReceive(address_size=2048)
-
- # PROTON-285 - prevent continually failing test
- def xtestSendBogus(self):
- self.start()
- msg = Message()
- msg.address="totally-bogus-address"
- try:
- self.client.put(msg)
- assert False, "Expecting MessengerException"
- except MessengerException:
- exc = sys.exc_info()[1]
- err = str(exc)
- assert "unable to send to address: totally-bogus-address" in err, err
-
- def testOutgoingWindow(self):
- self.server.incoming_window = 10
- self.start()
- msg = Message()
- msg.address="amqp://127.0.0.1:%d" % self.port
- msg.subject="Hello World!"
-
- trackers = []
- for i in range(10):
- trackers.append(self.client.put(msg))
-
- self.client.send()
-
- for t in trackers:
- assert self.client.status(t) is None
-
- # reduce outgoing_window to 5 and then try to send 10 messages
- self.client.outgoing_window = 5
-
- trackers = []
- for i in range(10):
- trackers.append(self.client.put(msg))
-
- for i in range(5):
- t = trackers[i]
- assert self.client.status(t) is None, (t, self.client.status(t))
-
- for i in range(5, 10):
- t = trackers[i]
- assert self.client.status(t) is PENDING, (t, self.client.status(t))
-
- self.client.send()
-
- for i in range(5):
- t = trackers[i]
- assert self.client.status(t) is None
-
- for i in range(5, 10):
- t = trackers[i]
- assert self.client.status(t) is ACCEPTED
-
- def testReject(self, process_incoming=None):
- if process_incoming:
- self.process_incoming = process_incoming
- self.server.incoming_window = 10
- self.start()
- msg = Message()
- msg.address="amqp://127.0.0.1:%d" % self.port
- msg.subject="Hello World!"
-
- self.client.outgoing_window = 10
- trackers = []
- rejected = []
- for i in range(10):
- if i == 5:
- msg.body = REJECT_ME
- else:
- msg.body = "Yay!"
- trackers.append(self.client.put(msg))
- if msg.body == REJECT_ME:
- rejected.append(trackers[-1])
-
- self.client.send()
-
- for t in trackers:
- if t in rejected:
- assert self.client.status(t) is REJECTED, (t, self.client.status(t))
- else:
- assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
-
- def testRejectIndividual(self):
- self.testReject(self.reject_individual)
-
- def reject_individual(self, msg):
- if self.server.incoming < 10:
- self.server.work(0)
- return
- while self.server.incoming:
- t = self.server.get(msg)
- if msg.body == REJECT_ME:
- self.server.reject(t)
- self.dispatch(msg)
- self.server.accept()
-
-
- def testIncomingWindow(self):
- self.server.incoming_window = 10
- self.server.outgoing_window = 10
- self.start()
- msg = Message()
- msg.address="amqp://127.0.0.1:%d" % self.port
- msg.reply_to = "~"
- msg.subject="Hello World!"
-
- self.client.outgoing_window = 10
- trackers = []
- for i in range(10):
- trackers.append(self.client.put(msg))
-
- self.client.send()
-
- for t in trackers:
- assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
-
- self.client.incoming_window = 10
- remaining = 10
-
- trackers = []
- while remaining:
- self.client.recv(remaining)
- while self.client.incoming:
- t = self.client.get()
- trackers.append(t)
- self.client.accept(t)
- remaining -= 1
- for t in trackers:
- assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
-
- def testIncomingQueueBiggerThanWindow(self, size=10):
- self.server.outgoing_window = size
- self.client.incoming_window = size
- self.start()
-
- msg = Message()
- msg.address = "amqp://127.0.0.1:%d" % self.port
- msg.reply_to = "~"
- msg.subject = "Hello World!"
-
- for i in range(2*size):
- self.client.put(msg)
-
- trackers = []
- while len(trackers) < 2*size:
- self.client.recv(2*size - len(trackers))
- while self.client.incoming:
- t = self.client.get(msg)
- assert self.client.status(t) is SETTLED, (t, self.client.status(t))
- trackers.append(t)
-
- for t in trackers[:size]:
- assert self.client.status(t) is None, (t, self.client.status(t))
- for t in trackers[size:]:
- assert self.client.status(t) is SETTLED, (t, self.client.status(t))
-
- self.client.accept()
-
- for t in trackers[:size]:
- assert self.client.status(t) is None, (t, self.client.status(t))
- for t in trackers[size:]:
- assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
-
- def testIncomingQueueBiggerThanSessionWindow(self):
- self.testIncomingQueueBiggerThanWindow(2048)
-
- def testBuffered(self):
- self.client.outgoing_window = 1000
- self.client.incoming_window = 1000
- self.start();
- assert self.server_received == 0
- buffering = 0
- count = 100
- for i in range(count):
- msg = Message()
- msg.address="amqp://127.0.0.1:%d" % self.port
- msg.subject="Hello World!"
- msg.body = "First the world, then the galaxy!"
- t = self.client.put(msg)
- buffered = self.client.buffered(t)
- # allow transition from False to True, but not back
- if buffered:
- buffering += 1
- else:
- assert not buffering, ("saw %s buffered deliveries before?" % buffering)
-
- while self.client.outgoing:
- last = self.client.outgoing
- self.client.send()
- #print "sent ", last - self.client.outgoing
-
- assert self.server_received == count
-
- def test_proton222(self):
- self.start()
- msg = Message()
- msg.address="amqp://127.0.0.1:%d" % self.port
- msg.subject="Hello World!"
- msg.body = "First the world, then the galaxy!"
- assert self.server_received == 0
- self.client.put(msg)
- self.client.send()
- # ensure the server got the message without requiring client to stop first
- deadline = time() + 10
- while self.server_received == 0:
- assert time() < deadline, "Server did not receive message!"
- sleep(.1)
- assert self.server_received == 1
-
- def testUnlimitedCredit(self):
- """ Bring up two links. Verify credit is granted to each link by
- transferring a message over each.
- """
- self.server_credit = -1
- self.start()
-
- msg = Message()
- msg.address="amqp://127.0.0.1:%d/XXX" % self.port
- msg.reply_to = "~"
- msg.subject="Hello World!"
- body = "First the world, then the galaxy!"
- msg.body = body
- self.client.put(msg)
- self.client.send()
-
- reply = Message()
- self.client.recv(1)
- assert self.client.incoming == 1
- self.client.get(reply)
-
- assert reply.subject == "Hello World!"
- rbod = reply.body
- assert rbod == body, (rbod, body)
-
- msg = Message()
- msg.address="amqp://127.0.0.1:%d/YYY" % self.port
- msg.reply_to = "~"
- msg.subject="Hello World!"
- body = "First the world, then the galaxy!"
- msg.body = body
- self.client.put(msg)
- self.client.send()
-
- reply = Message()
- self.client.recv(1)
- assert self.client.incoming == 1
- self.client.get(reply)
-
- assert reply.subject == "Hello World!"
- rbod = reply.body
- assert rbod == body, (rbod, body)
-
- def _DISABLE_test_proton268(self):
- """ Reproducer for JIRA Proton-268 """
- """ DISABLED: Causes failure on Jenkins, appears to be unrelated to fix """
- self.server_credit = 2048
- self.start()
-
- msg = Message()
- msg.address="amqp://127.0.0.1:%d" % self.port
- msg.body = "X" * 1024
-
- for x in range( 100 ):
- self.client.put( msg )
- self.client.send()
-
- try:
- self.client.stop()
- except Timeout:
- assert False, "Timeout waiting for client stop()"
-
- # need to restart client, as tearDown() uses it to stop server
- self.client.start()
-
- def testRoute(self):
- # anonymous cipher not supported on Windows
- if os.name == "nt" or not common.isSSLPresent():
- domain = "amqp"
- else:
- domain = "amqps"
- port = common.free_tcp_port()
- self.server.subscribe(domain + "://~0.0.0.0:%d" % port)
- self.start()
- self.client.route("route1", "amqp://127.0.0.1:%d" % self.port)
- self.client.route("route2", domain + "://127.0.0.1:%d" % port)
-
- msg = Message()
- msg.address = "route1"
- msg.reply_to = "~"
- msg.body = "test"
- self.client.put(msg)
- self.client.recv(1)
-
- reply = Message()
- self.client.get(reply)
-
- msg = Message()
- msg.address = "route2"
- msg.reply_to = "~"
- msg.body = "test"
- self.client.put(msg)
- self.client.recv(1)
-
- self.client.get(reply)
- assert reply.body == "test"
-
- def testDefaultRoute(self):
- self.start()
- self.client.route("*", "amqp://127.0.0.1:%d" % self.port)
-
- msg = Message()
- msg.address = "asdf"
- msg.body = "test"
- msg.reply_to = "~"
-
- self.client.put(msg)
- self.client.recv(1)
-
- reply = Message()
- self.client.get(reply)
- assert reply.body == "test"
-
- def testDefaultRouteSubstitution(self):
- self.start()
- self.client.route("*", "amqp://127.0.0.1:%d/$1" % self.port)
-
- msg = Message()
- msg.address = "asdf"
- msg.body = "test"
- msg.reply_to = "~"
-
- self.client.put(msg)
- self.client.recv(1)
-
- reply = Message()
- self.client.get(reply)
- assert reply.body == "test"
-
- def testIncomingRoute(self):
- self.start()
- port = common.free_tcp_port()
- self.client.route("in", "amqp://~0.0.0.0:%d" % port)
- self.client.subscribe("in")
-
- msg = Message()
- msg.address = "amqp://127.0.0.1:%d" %self.port
- msg.reply_to = "amqp://127.0.0.1:%d" % port
- msg.body = "test"
-
- self.client.put(msg)
- self.client.recv(1)
- reply = Message()
- self.client.get(reply)
- assert reply.body == "test"
-
- def echo_address(self, msg):
- while self.server.incoming:
- self.server.get(msg)
- msg.body = msg.address
- self.dispatch(msg)
-
- def _testRewrite(self, original, rewritten):
- self.start()
- self.process_incoming = self.echo_address
- self.client.route("*", "amqp://127.0.0.1:%d" % self.port)
-
- msg = Message()
- msg.address = original
- msg.body = "test"
- msg.reply_to = "~"
-
- self.client.put(msg)
- assert msg.address == original
- self.client.recv(1)
- assert self.client.incoming == 1
-
- echo = Message()
- self.client.get(echo)
- assert echo.body == rewritten, (echo.body, rewritten)
- assert msg.address == original
-
- def testDefaultRewriteH(self):
- self._testRewrite("original", "original")
-
- def testDefaultRewriteUH(self):
- self._testRewrite("user@original", "original")
-
- def testDefaultRewriteUPH(self):
- self._testRewrite("user:pass@original", "original")
-
- def testDefaultRewriteHP(self):
- self._testRewrite("original:123", "original:123")
-
- def testDefaultRewriteUHP(self):
- self._testRewrite("user@original:123", "original:123")
-
- def testDefaultRewriteUPHP(self):
- self._testRewrite("user:pass@original:123", "original:123")
-
- def testDefaultRewriteHN(self):
- self._testRewrite("original/name", "original/name")
-
- def testDefaultRewriteUHN(self):
- self._testRewrite("user@original/name", "original/name")
-
- def testDefaultRewriteUPHN(self):
- self._testRewrite("user:pass@original/name", "original/name")
-
- def testDefaultRewriteHPN(self):
- self._testRewrite("original:123/name", "original:123/name")
-
- def testDefaultRewriteUHPN(self):
- self._testRewrite("user@original:123/name", "original:123/name")
-
- def testDefaultRewriteUPHPN(self):
- self._testRewrite("user:pass@original:123/name", "original:123/name")
-
- def testDefaultRewriteSH(self):
- self._testRewrite("amqp://original", "amqp://original")
-
- def testDefaultRewriteSUH(self):
- self._testRewrite("amqp://user@original", "amqp://original")
-
- def testDefaultRewriteSUPH(self):
- self._testRewrite("amqp://user:pass@original", "amqp://original")
-
- def testDefaultRewriteSHP(self):
- self._testRewrite("amqp://original:123", "amqp://original:123")
-
- def testDefaultRewriteSUHP(self):
- self._testRewrite("amqp://user@original:123", "amqp://original:123")
-
- def testDefaultRewriteSUPHP(self):
- self._testRewrite("amqp://user:pass@original:123", "amqp://original:123")
-
- def testDefaultRewriteSHN(self):
- self._testRewrite("amqp://original/name", "amqp://original/name")
-
- def testDefaultRewriteSUHN(self):
- self._testRewrite("amqp://user@original/name", "amqp://original/name")
-
- def testDefaultRewriteSUPHN(self):
- self._testRewrite("amqp://user:pass@original/name", "amqp://original/name")
-
- def testDefaultRewriteSHPN(self):
- self._testRewrite("amqp://original:123/name", "amqp://original:123/name")
-
- def testDefaultRewriteSUHPN(self):
- self._testRewrite("amqp://user@original:123/name", "amqp://original:123/name")
-
- def testDefaultRewriteSUPHPN(self):
- self._testRewrite("amqp://user:pass@original:123/name", "amqp://original:123/name")
-
- def testRewriteSupress(self):
- self.client.rewrite("*", None)
- self._testRewrite("asdf", None)
-
- def testRewrite(self):
- self.client.rewrite("a", "b")
- self._testRewrite("a", "b")
-
- def testRewritePattern(self):
- self.client.rewrite("amqp://%@*", "amqp://$2")
- self._testRewrite("amqp://foo@bar", "amqp://bar")
-
- def testRewriteToAt(self):
- self.client.rewrite("amqp://%/*", "$2@$1")
- self._testRewrite("amqp://domain/name", "name@domain")
-
- def testRewriteOverrideDefault(self):
- self.client.rewrite("*", "$1")
- self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host")
-
- def testCreditBlockingRebalance(self):
- """ The server is given a fixed amount of credit, and runs until that
- credit is exhausted.
- """
- self.server_finite_credit = True
- self.server_credit = 11
- self.start()
-
- # put one message out on "Link1" - since there are no other links, it
- # should get all the credit (10 after sending)
- msg = Message()
- msg.address="amqp://127.0.0.1:%d/Link1" % self.port
- msg.subject="Hello World!"
- body = "First the world, then the galaxy!"
- msg.body = body
- msg.reply_to = "~"
- self.client.put(msg)
- self.client.send()
- self.client.recv(1)
- assert self.client.incoming == 1
-
- # Now attempt to exhaust credit using a different link
- for i in range(10):
- msg.address="amqp://127.0.0.1:%d/Link2" % self.port
- self.client.put(msg)
- self.client.send()
-
- deadline = time() + self.timeout
- count = 0
- while count < 11 and time() < deadline:
- self.client.recv(-1)
- while self.client.incoming:
- self.client.get(msg)
- count += 1
- assert count == 11, count
-
- # now attempt to send one more. There isn't enough credit, so it should
- # not be sent
- self.client.timeout = 1
- msg.address="amqp://127.0.0.1:%d/Link2" % self.port
- self.client.put(msg)
- try:
- self.client.send()
- assert False, "expected client to time out in send()"
- except Timeout:
- pass
- assert self.client.outgoing == 1
-
-
-class NBMessengerTest(common.Test):
-
- def setUp(self):
- self.client = Messenger("client")
- self.client2 = Messenger("client2")
- self.server = Messenger("server")
- self.messengers = [self.client, self.client2, self.server]
- self.client.blocking = False
- self.client2.blocking = False
- self.server.blocking = False
- self.server.start()
- self.client.start()
- self.client2.start()
- port = common.free_tcp_port()
- self.address = "amqp://127.0.0.1:%d" % port
- self.server.subscribe("amqp://~0.0.0.0:%d" % port)
-
- def _pump(self, timeout, work_triggers_exit):
- for msgr in self.messengers:
- if msgr.work(timeout) and work_triggers_exit:
- return True
- return False
-
- def pump(self, timeout=0):
- while self._pump(0, True): pass
- self._pump(timeout, False)
- while self._pump(0, True): pass
-
- def tearDown(self):
- self.server.stop()
- self.client.stop()
- self.client2.stop()
- self.pump()
- assert self.server.stopped
- assert self.client.stopped
- assert self.client2.stopped
-
- def testSmoke(self, count=1):
- self.server.recv()
-
- msg = Message()
- msg.address = self.address
- for i in range(count):
- msg.body = "Hello %s" % i
- self.client.put(msg)
-
- msg2 = Message()
- for i in range(count):
- if self.server.incoming == 0:
- self.pump()
- assert self.server.incoming > 0, self.server.incoming
- self.server.get(msg2)
- assert msg2.body == "Hello %s" % i, (msg2.body, i)
-
- assert self.client.outgoing == 0, self.client.outgoing
- assert self.server.incoming == 0, self.client.incoming
-
- def testSmoke1024(self):
- self.testSmoke(1024)
-
- def testSmoke4096(self):
- self.testSmoke(4096)
-
- def testPushback(self):
- self.server.recv()
-
- msg = Message()
- msg.address = self.address
- for i in range(16):
- for i in range(1024):
- self.client.put(msg)
- self.pump()
- if self.client.outgoing > 0:
- break
-
- assert self.client.outgoing > 0
-
- def testRecvBeforeSubscribe(self):
- self.client.recv()
- self.client.subscribe(self.address + "/foo")
-
- self.pump()
-
- msg = Message()
- msg.address = "amqp://client/foo"
- msg.body = "Hello World!"
- self.server.put(msg)
-
- assert self.client.incoming == 0
- self.pump(self.delay)
- assert self.client.incoming == 1
-
- msg2 = Message()
- self.client.get(msg2)
- assert msg2.address == msg.address
- assert msg2.body == msg.body
-
- def testCreditAutoBackpressure(self):
- """ Verify that use of automatic credit (pn_messenger_recv(-1)) does not
- fill the incoming queue indefinitely. If the receiver does not 'get' the
- message, eventually the sender will block. See PROTON-350 """
- self.server.recv()
- msg = Message()
- msg.address = self.address
- deadline = time() + self.timeout
- while time() < deadline:
- old = self.server.incoming
- for j in range(1001):
- self.client.put(msg)
- self.pump()
- if old == self.server.incoming:
- break;
- assert old == self.server.incoming, "Backpressure not active!"
-
- def testCreditRedistribution(self):
- """ Verify that a fixed amount of credit will redistribute to new
- links.
- """
- self.server.recv( 5 )
-
- # first link will get all credit
- msg1 = Message()
- msg1.address = self.address + "/msg1"
- self.client.put(msg1)
- self.pump()
- assert self.server.incoming == 1, self.server.incoming
- assert self.server.receiving == 4, self.server.receiving
-
- # no credit left over for this link
- msg2 = Message()
- msg2.address = self.address + "/msg2"
- self.client.put(msg2)
- self.pump()
- assert self.server.incoming == 1, self.server.incoming
- assert self.server.receiving == 4, self.server.receiving
-
- # eventually, credit will rebalance and the new link will send
- deadline = time() + self.timeout
- while time() < deadline:
- sleep(.1)
- self.pump()
- if self.server.incoming == 2:
- break;
- assert self.server.incoming == 2, self.server.incoming
- assert self.server.receiving == 3, self.server.receiving
-
- def testCreditReclaim(self):
- """ Verify that credit is reclaimed when a link with outstanding credit is
- torn down.
- """
- self.server.recv( 9 )
-
- # first link will get all credit
- msg1 = Message()
- msg1.address = self.address + "/msg1"
- self.client.put(msg1)
- self.pump()
- assert self.server.incoming == 1, self.server.incoming
- assert self.server.receiving == 8, self.server.receiving
-
- # no credit left over for this link
- msg2 = Message()
- msg2.address = self.address + "/msg2"
- self.client.put(msg2)
- self.pump()
- assert self.server.incoming == 1, self.server.incoming
- assert self.server.receiving == 8, self.server.receiving
-
- # and none for this new client
- msg3 = Message()
- msg3.address = self.address + "/msg3"
- self.client2.put(msg3)
- self.pump()
-
- # eventually, credit will rebalance and all links will
- # send a message
- deadline = time() + self.timeout
- while time() < deadline:
- sleep(.1)
- self.pump()
- if self.server.incoming == 3:
- break;
- assert self.server.incoming == 3, self.server.incoming
- assert self.server.receiving == 6, self.server.receiving
-
- # now tear down client two, this should cause its outstanding credit to be
- # made available to the other links
- self.client2.stop()
- self.pump()
-
- for i in range(4):
- self.client.put(msg1)
- self.client.put(msg2)
-
- # should exhaust all credit
- deadline = time() + self.timeout
- while time() < deadline:
- sleep(.1)
- self.pump()
- if self.server.incoming == 9:
- break;
- assert self.server.incoming == 9, self.server.incoming
- assert self.server.receiving == 0, self.server.receiving
-
- def testCreditReplenish(self):
- """ When extra credit is available it should be granted to the first
- link that can use it.
- """
- # create three links
- msg = Message()
- for i in range(3):
- msg.address = self.address + "/%d" % i
- self.client.put(msg)
-
- self.server.recv( 50 ) # 50/3 = 16 per link + 2 extra
-
- self.pump()
- assert self.server.incoming == 3, self.server.incoming
- assert self.server.receiving == 47, self.server.receiving
-
- # 47/3 = 15 per link, + 2 extra
-
- # verify one link can send 15 + the two extra (17)
- for i in range(17):
- msg.address = self.address + "/0"
- self.client.put(msg)
- self.pump()
- assert self.server.incoming == 20, self.server.incoming
- assert self.server.receiving == 30, self.server.receiving
-
- # now verify that the remaining credit (30) will eventually rebalance
- # across all links (10 per link)
- for j in range(10):
- for i in range(3):
- msg.address = self.address + "/%d" % i
- self.client.put(msg)
-
- deadline = time() + self.timeout
- while time() < deadline:
- sleep(.1)
- self.pump()
- if self.server.incoming == 50:
- break
- assert self.server.incoming == 50, self.server.incoming
- assert self.server.receiving == 0, self.server.receiving
-
-from select import select
-
-class Pump:
-
- def __init__(self, *messengers):
- self.messengers = messengers
- self.selectables = []
-
- def pump_once(self):
- for m in self.messengers:
- while True:
- sel = m.selectable()
- if sel:
- self.selectables.append(sel)
- else:
- break
-
- reading = []
- writing = []
-
- for sel in self.selectables[:]:
- if sel.is_terminal:
- sel.release()
- self.selectables.remove(sel)
- else:
- if sel.reading:
- reading.append(sel)
- if sel.writing:
- writing.append(sel)
-
- readable, writable, _ = select(reading, writing, [], 0.1)
-
- count = 0
- for s in readable:
- s.readable()
- count += 1
- for s in writable:
- s.writable()
- count += 1
- return count
-
- def pump(self):
- while self.pump_once(): pass
-
-class SelectableMessengerTest(common.Test):
-
- def testSelectable(self, count = 1):
- if os.name=="nt":
- # Conflict between native OS select() in Pump and IOCP based pn_selector_t
- # makes this fail on Windows (see PROTON-668).
- raise Skipped("Invalid test on Windows with IOCP.")
-
- mrcv = Messenger()
- mrcv.passive = True
- port = common.free_tcp_port()
- mrcv.subscribe("amqp://~0.0.0.0:%d" % port)
-
- msnd = Messenger()
- msnd.passive = True
- m = Message()
- m.address = "amqp://127.0.0.1:%d" % port
-
- for i in range(count):
- m.body = u"Hello World! %s" % i
- msnd.put(m)
-
- p = Pump(msnd, mrcv)
- p.pump()
-
- assert msnd.outgoing == count
- assert mrcv.incoming == 0
-
- mrcv.recv()
-
- mc = Message()
-
- try:
- for i in range(count):
- while mrcv.incoming == 0:
- p.pump()
- assert mrcv.incoming > 0, (count, msnd.outgoing, mrcv.incoming)
- mrcv.get(mc)
- assert mc.body == u"Hello World! %s" % i, (i, mc.body)
- finally:
- mrcv.stop()
- msnd.stop()
- assert not mrcv.stopped
- assert not msnd.stopped
- p.pump()
- assert mrcv.stopped
- assert msnd.stopped
-
- def testSelectable16(self):
- self.testSelectable(count=16)
-
- def testSelectable1024(self):
- self.testSelectable(count=1024)
-
- def testSelectable4096(self):
- self.testSelectable(count=4096)
-
-
-class IdleTimeoutTest(common.Test):
-
- def testIdleTimeout(self):
- """
- Verify that a Messenger connection is kept alive using empty idle frames
- when a idle_timeout is advertised by the remote peer.
- """
- if "java" in sys.platform:
- raise Skipped()
- idle_timeout_secs = self.delay
-
- try:
- idle_server = common.TestServer(idle_timeout=idle_timeout_secs)
- idle_server.timeout = self.timeout
- idle_server.start()
-
- idle_client = Messenger("idle_client")
- idle_client.timeout = self.timeout
- idle_client.start()
-
- idle_client.subscribe("amqp://%s:%s/foo" %
- (idle_server.host, idle_server.port))
- idle_client.work(idle_timeout_secs/10)
-
- # wait up to 3x the idle timeout and hence verify that everything stays
- # connected during that time by virtue of no Exception being raised
- duration = 3 * idle_timeout_secs
- deadline = time() + duration
- while time() <= deadline:
- idle_client.work(idle_timeout_secs/10)
- continue
-
- # confirm link is still active
- assert not idle_server.conditions, idle_server.conditions
- finally:
- try:
- idle_client.stop()
- except:
- pass
- try:
- idle_server.stop()
- except:
- pass
diff --git a/tests/python/proton_tests/soak.py b/tests/python/proton_tests/soak.py
deleted file mode 100644
index 52382ba..0000000
--- a/tests/python/proton_tests/soak.py
+++ /dev/null
@@ -1,368 +0,0 @@
-from __future__ import absolute_import
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import os
-import sys
-from .common import Test, Skipped, free_tcp_ports, \
- MessengerReceiverC, MessengerSenderC, \
- MessengerReceiverValgrind, MessengerSenderValgrind, \
- MessengerReceiverPython, MessengerSenderPython, \
- ReactorReceiverC, ReactorSenderC, \
- ReactorReceiverValgrind, ReactorSenderValgrind, \
- isSSLPresent
-from proton import *
-
-#
-# Tests that run the apps
-#
-
-class AppTests(Test):
-
- def __init__(self, *args):
- Test.__init__(self, *args)
- self.is_valgrind = False
-
- def default(self, name, value, **kwargs):
- if self.is_valgrind:
- default = kwargs.get("valgrind", value)
- else:
- default = value
- return Test.default(self, name, default, **kwargs)
-
- @property
- def iterations(self):
- return int(self.default("iterations", 2, fast=1, valgrind=2))
-
- @property
- def send_count(self):
- return int(self.default("send_count", 17, fast=1, valgrind=2))
-
- @property
- def target_count(self):
- return int(self.default("target_count", 5, fast=1, valgrind=2))
-
- @property
- def send_batch(self):
- return int(self.default("send_batch", 7, fast=1, valgrind=2))
-
- @property
- def forward_count(self):
- return int(self.default("forward_count", 5, fast=1, valgrind=2))
-
- @property
- def port_count(self):
- return int(self.default("port_count", 3, fast=1, valgrind=2))
-
- @property
- def sender_count(self):
- return int(self.default("sender_count", 3, fast=1, valgrind=2))
-
- def valgrind_test(self):
- self.is_valgrind = True
-
- def setUp(self):
- self.senders = []
- self.receivers = []
-
- def tearDown(self):
- pass
-
- def _do_test(self, iterations=1):
- verbose = self.verbose
-
- for R in self.receivers:
- R.start( verbose )
-
- for j in range(iterations):
- for S in self.senders:
- S.start( verbose )
-
- for S in self.senders:
- S.wait()
- #print("SENDER OUTPUT:")
- #print( S.stdout() )
- assert S.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
- % (str(S.cmdline()),
- S.status(),
- S.stdout(),
- S.stderr()))
-
- for R in self.receivers:
- R.wait()
- #print("RECEIVER OUTPUT")
- #print( R.stdout() )
- assert R.status() == 0, ("Command '%s' failed status=%d: '%s' '%s'"
- % (str(R.cmdline()),
- R.status(),
- R.stdout(),
- R.stderr()))
-
-#
-# Traffic passing tests based on the Messenger apps
-#
-
-class MessengerTests(AppTests):
-
- _timeout = 60
-
- def _ssl_check(self):
- if not isSSLPresent():
- raise Skipped("No SSL libraries found.")
- if os.name=="nt":
- raise Skipped("Windows SChannel lacks anonymous cipher support.")
-
- def __init__(self, *args):
- AppTests.__init__(self, *args)
-
- def _do_oneway_test(self, receiver, sender, domain="amqp"):
- """ Send N messages to a receiver.
- Parameters:
- iterations - repeat the senders this many times
- target_count = # of targets to send to.
- send_count = # messages sent to each target
- """
- iterations = self.iterations
- send_count = self.send_count
- target_count = self.target_count
-
- send_total = send_count * target_count
- receive_total = send_total * iterations
-
- port = free_tcp_ports()[0]
-
- receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
- receiver.receive_count = receive_total
- receiver.timeout = MessengerTests._timeout
- self.receivers.append( receiver )
-
- sender.targets = ["%s://0.0.0.0:%s/X%d" % (domain, port, j) for j in range(target_count)]
- sender.send_count = send_total
- sender.timeout = MessengerTests._timeout
- self.senders.append( sender )
-
- self._do_test(iterations)
-
- def _do_echo_test(self, receiver, sender, domain="amqp"):
- """ Send N messages to a receiver, which responds to each.
- Parameters:
- iterations - repeat the senders this many times
- target_count - # targets to send to
- send_count = # messages sent to each target
- send_batch - wait for replies after this many messages sent
- """
- iterations = self.iterations
- send_count = self.send_count
- target_count = self.target_count
- send_batch = self.send_batch
-
- send_total = send_count * target_count
- receive_total = send_total * iterations
-
- port = free_tcp_ports()[0]
-
- receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
- receiver.receive_count = receive_total
- receiver.send_reply = True
- receiver.timeout = MessengerTests._timeout
- self.receivers.append( receiver )
-
- sender.targets = ["%s://0.0.0.0:%s/%dY" % (domain, port, j) for j in range(target_count)]
- sender.send_count = send_total
- sender.get_reply = True
- sender.send_batch = send_batch
- sender.timeout = MessengerTests._timeout
- self.senders.append( sender )
-
- self._do_test(iterations)
-
- def _do_relay_test(self, receiver, relay, sender, domain="amqp"):
- """ Send N messages to a receiver, which replies to each and forwards
- each of them to different receiver.
- Parameters:
- iterations - repeat the senders this many times
- target_count - # targets to send to
- send_count = # messages sent to each target
- send_batch - wait for replies after this many messages sent
- forward_count - forward to this many targets
- """
- iterations = self.iterations
- send_count = self.send_count
- target_count = self.target_count
- send_batch = self.send_batch
- forward_count = self.forward_count
-
- send_total = send_count * target_count
- receive_total = send_total * iterations
-
- port = free_tcp_ports()[0]
-
- receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port)]
- receiver.receive_count = receive_total
- receiver.send_reply = True
- # forward to 'relay' - uses two links
- # ## THIS FAILS:
- # receiver.forwards = ["amqp://Relay/%d" % j for j in range(forward_count)]
- receiver.forwards = ["%s://Relay" % domain]
- receiver.timeout = MessengerTests._timeout
- self.receivers.append( receiver )
-
- relay.subscriptions = ["%s://0.0.0.0:%s" % (domain, port)]
- relay.name = "Relay"
- relay.receive_count = receive_total
- relay.timeout = MessengerTests._timeout
- self.receivers.append( relay )
-
- # send to 'receiver'
- sender.targets = ["%s://0.0.0.0:%s/X%dY" % (domain, port, j) for j in range(target_count)]
- sender.send_count = send_total
- sender.get_reply = True
- sender.timeout = MessengerTests._timeout
- self.senders.append( sender )
-
- self._do_test(iterations)
-
-
- def _do_star_topology_test(self, r_factory, s_factory, domain="amqp"):
- """
- A star-like topology, with a central receiver at the hub, and senders at
- the spokes. Each sender will connect to each of the ports the receiver is
- listening on. Each sender will then create N links per each connection.
- Each sender will send X messages per link, waiting for a response.
- Parameters:
- iterations - repeat the senders this many times
- port_count - # of ports the receiver will listen on. Each sender connects
- to all ports.
- sender_count - # of senders
- target_count - # of targets per connection
- send_count - # of messages sent to each target
- send_batch - # of messages to send before waiting for response
- """
- iterations = self.iterations
- port_count = self.port_count
- sender_count = self.sender_count
- target_count = self.target_count
- send_count = self.send_count
- send_batch = self.send_batch
-
- send_total = port_count * target_count * send_count
- receive_total = send_total * sender_count * iterations
-
- ports = free_tcp_ports(port_count)
-
- receiver = r_factory()
- receiver.subscriptions = ["%s://~0.0.0.0:%s" % (domain, port) for port in ports]
- receiver.receive_count = receive_total
- receiver.send_reply = True
- receiver.timeout = MessengerTests._timeout
- self.receivers.append( receiver )
-
- for i in range(sender_count):
- sender = s_factory()
- sender.targets = ["%s://0.0.0.0:%s/%d" % (domain, port, j) for port in ports for j in range(target_count)]
- sender.send_count = send_total
- sender.send_batch = send_batch
- sender.get_reply = True
- sender.timeout = MessengerTests._timeout
- self.senders.append( sender )
-
- self._do_test(iterations)
-
- def test_oneway_C(self):
- self._do_oneway_test(MessengerReceiverC(), MessengerSenderC())
-
- def test_oneway_C_SSL(self):
- self._ssl_check()
- self._do_oneway_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
-
- def test_oneway_valgrind(self):
- self.valgrind_test()
- self._do_oneway_test(MessengerReceiverValgrind(), MessengerSenderValgrind())
-
- def test_oneway_Python(self):
- self._do_oneway_test(MessengerReceiverPython(), MessengerSenderPython())
-
- def test_oneway_C_Python(self):
- self._do_oneway_test(MessengerReceiverC(), MessengerSenderPython())
-
- def test_oneway_Python_C(self):
- self._do_oneway_test(MessengerReceiverPython(), MessengerSenderC())
-
- def test_echo_C(self):
- self._do_echo_test(MessengerReceiverC(), MessengerSenderC())
-
- def test_echo_C_SSL(self):
- self._ssl_check()
- self._do_echo_test(MessengerReceiverC(), MessengerSenderC(), "amqps")
-
- def test_echo_valgrind(self):
- self.valgrind_test()
- self._do_echo_test(MessengerReceiverValgrind(), MessengerSenderValgrind())
-
- def test_echo_Python(self):
- self._do_echo_test(MessengerReceiverPython(), MessengerSenderPython())
-
- def test_echo_C_Python(self):
- self._do_echo_test(MessengerReceiverC(), MessengerSenderPython())
-
- def test_echo_Python_C(self):
- self._do_echo_test(MessengerReceiverPython(), MessengerSenderC())
-
- def test_relay_C(self):
- self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC())
-
- def test_relay_C_SSL(self):
- self._ssl_check()
- self._do_relay_test(MessengerReceiverC(), MessengerReceiverC(), MessengerSenderC(), "amqps")
-
- def test_relay_valgrind(self):
- self.valgrind_test()
- self._do_relay_test(MessengerReceiverValgrind(), MessengerReceiverValgrind(), MessengerSenderValgrind())
-
- def test_relay_C_Python(self):
- self._do_relay_test(MessengerReceiverC(), MessengerReceiverPython(), MessengerSenderPython())
-
- def test_relay_Python(self):
- self._do_relay_test(MessengerReceiverPython(), MessengerReceiverPython(), MessengerSenderPython())
-
- def test_star_topology_C(self):
- self._do_star_topology_test( MessengerReceiverC, MessengerSenderC )
-
- def test_star_topology_C_SSL(self):
- self._ssl_check()
- self._do_star_topology_test( MessengerReceiverC, MessengerSenderC, "amqps" )
-
- def test_star_topology_valgrind(self):
- self.valgrind_test()
- self._do_star_topology_test( MessengerReceiverValgrind, MessengerSenderValgrind )
-
- def test_star_topology_Python(self):
- self._do_star_topology_test( MessengerReceiverPython, MessengerSenderPython )
-
- def test_star_topology_Python_C(self):
- self._do_star_topology_test( MessengerReceiverPython, MessengerSenderC )
-
- def test_star_topology_C_Python(self):
- self._do_star_topology_test( MessengerReceiverPython, MessengerSenderC )
-
- def test_oneway_reactor(self):
- self._do_oneway_test(ReactorReceiverC(), ReactorSenderC())
-
- def test_oneway_reactor_valgrind(self):
- self.valgrind_test()
- self._do_oneway_test(ReactorReceiverValgrind(), ReactorSenderValgrind())
diff --git a/tests/python/proton_tests/ssl.py b/tests/python/proton_tests/ssl.py
index 89fe828..225afa7 100644
--- a/tests/python/proton_tests/ssl.py
+++ b/tests/python/proton_tests/ssl.py
@@ -832,93 +832,6 @@
assert server.connection.state & Endpoint.REMOTE_UNINIT
self.tearDown()
- def test_defaults_messenger_app(self):
- """ Test an SSL connection using the Messenger apps (no certificates)
- """
- if os.name=="nt":
- raise Skipped("Windows SChannel lacks anonymous cipher support.")
- port = common.free_tcp_ports()[0]
-
- receiver = common.MessengerReceiverC()
- receiver.subscriptions = ["amqps://~0.0.0.0:%s" % port]
- receiver.receive_count = 1
- receiver.timeout = self.timeout
- receiver.start()
-
- sender = common.MessengerSenderC()
- sender.targets = ["amqps://0.0.0.0:%s/X" % port]
- sender.send_count = 1
- sender.timeout = self.timeout
- sender.start()
- sender.wait()
- assert sender.status() == 0, "Command '%s' failed" % str(sender.cmdline())
-
- receiver.wait()
- assert receiver.status() == 0, "Command '%s' failed" % str(receiver.cmdline())
-
- def test_server_authentication_messenger_app(self):
- """ Test an SSL authentication using the Messenger apps.
- """
- port = common.free_tcp_ports()[0]
-
- receiver = common.MessengerReceiverC()
- receiver.subscriptions = ["amqps://~0.0.0.0:%s" % port]
- receiver.receive_count = 1
- receiver.timeout = self.timeout
- # Note hack - by default we use the client-certificate for the
- # _server_ because the client-certificate's common name field
- # is "127.0.0.1", which will match the target address used by
- # the sender.
- receiver.certificate = self._testpath("client-certificate.pem")
- receiver.privatekey = self._testpath("client-private-key.pem")
- receiver.password = "client-password"
- receiver.start()
-
- sender = common.MessengerSenderC()
- sender.targets = ["amqps://127.0.0.1:%s/X" % port]
- sender.send_count = 1
- sender.timeout = self.timeout
- sender.ca_db = self._testpath("ca-certificate.pem")
- sender.start()
- sender.wait()
- assert sender.status() == 0, "Command '%s' failed" % str(sender.cmdline())
-
- receiver.wait()
- assert receiver.status() == 0, "Command '%s' failed" % str(receiver.cmdline())
-
- def DISABLED_test_defaults_valgrind(self):
- """ Run valgrind over a simple SSL connection (no certificates)
- """
- # the openssl libraries produce far too many valgrind errors to be
- # useful. AFAIK, there is no way to wriate a valgrind suppression
- # expression that will ignore all errors from a given library.
- # Until we can, skip this test.
- port = common.free_tcp_ports()[0]
-
- receiver = common.MessengerReceiverValgrind()
- receiver.subscriptions = ["amqps://~127.0.0.1:%s" % port]
- receiver.receive_count = 1
- receiver.timeout = self.timeout
- receiver.start()
-
- sender = common.MessengerSenderValgrind()
- sender.targets = ["amqps://127.0.0.1:%s/X" % port]
- sender.send_count = 1
- sender.timeout = self.timeout
- sender.start()
- sender.wait()
- assert sender.status() == 0, "Command '%s' failed" % str(sender.cmdline())
-
- receiver.wait()
- assert receiver.status() == 0, "Command '%s' failed" % str(receiver.cmdline())
-
- # self.server_domain.set_credentials(self._testpath("client-certificate.pem"),
- # self._testpath("client-private-key.pem"),
- # "client-password")
-
- # self.client_domain.set_trusted_ca_db(self._testpath("ca-certificate.pem"))
- # self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER )
-
def test_singleton(self):
"""Verify that only a single instance of SSL can exist per Transport"""
transport = Transport()
@@ -938,127 +851,3 @@
except SSLException:
pass
-class MessengerSSLTests(common.Test):
-
- def setUp(self):
- if not common.isSSLPresent():
- raise Skipped("No SSL libraries found.")
- self.server = Messenger()
- self.client = Messenger()
- self.server.blocking = False
- self.client.blocking = False
-
- def tearDown(self):
- self.server.stop()
- self.client.stop()
- self.pump()
- assert self.server.stopped
- assert self.client.stopped
-
- def pump(self, timeout=0):
- while self.client.work(0) or self.server.work(0): pass
- self.client.work(timeout)
- self.server.work(timeout)
- while self.client.work(0) or self.server.work(0): pass
-
- def test_server_credentials(self,
- cert="server-certificate.pem",
- key="server-private-key.pem",
- password="server-password",
- exception=None):
- import sys
- # java doesn't do validation in the same way (yet)
- if exception and "java" in sys.platform:
- raise Skipped()
- self.server.certificate = _testpath(cert)
- self.server.private_key = _testpath(key)
- self.server.password = password
- port = common.free_tcp_ports()[0]
- try:
- self.server.start()
- self.server.subscribe("amqps://~0.0.0.0:%s" % port)
- if exception is not None:
- assert False, "expected failure did not occur"
- except MessengerException:
- e = sys.exc_info()[1]
- if exception:
- assert exception in str(e), str(e)
- else:
- raise e
-
- def test_server_credentials_bad_cert(self):
- self.test_server_credentials(cert="bad",
- exception="invalid credentials")
-
- def test_server_credentials_bad_key(self):
- self.test_server_credentials(key="bad",
- exception="invalid credentials")
-
- def test_server_credentials_bad_password(self):
- self.test_server_credentials(password="bad",
- exception="invalid credentials")
-
- def test_client_credentials(self,
- trusted="ca-certificate.pem",
- cert="client-certificate.pem",
- key="client-private-key.pem",
- password="client-password",
- altserv=False,
- fail=False):
- if altserv:
- self.server.certificate = _testpath("bad-server-certificate.pem")
- self.server.private_key = _testpath("bad-server-private-key.pem")
- self.server.password = "server-password"
- else:
- self.server.certificate = _testpath("client-certificate.pem")
- self.server.private_key = _testpath("client-private-key.pem")
- self.server.password = "client-password"
- self.server.start()
- port = common.free_tcp_ports()[0]
- self.server.subscribe("amqps://~0.0.0.0:%s" % port)
- self.server.incoming_window = 10
-
- self.client.trusted_certificates = _testpath(trusted)
- self.client.certificate = _testpath(cert)
- self.client.private_key = _testpath(key)
- self.client.password = password
- self.client.outgoing_window = 10
- self.client.start()
-
- self.server.recv()
-
- msg = Message()
- msg.address = "amqps://127.0.0.1:%s" % port
- # make sure a large, uncompressible message body works!
- msg.body = "".join(random.choice(string.ascii_letters)
- for x in range(10099))
- trk = self.client.put(msg)
- self.client.send()
-
- self.pump()
-
- if fail:
- assert self.server.incoming == 0, self.server.incoming
- assert self.client.status(trk) == ABORTED, self.client.status(trk)
- else:
- assert self.server.incoming == 1, self.server.incoming
-
- rmsg = Message()
- self.server.get(rmsg)
- assert rmsg.body == msg.body
- self.server.accept()
- self.pump()
-
- assert self.client.status(trk) == ACCEPTED, self.client.status(trk)
-
- def test_client_credentials_bad_cert(self):
- self.test_client_credentials(cert="bad", fail=True)
-
- def test_client_credentials_bad_trusted(self):
- self.test_client_credentials(trusted="bad", fail=True)
-
- def test_client_credentials_bad_password(self):
- self.test_client_credentials(password="bad", fail=True)
-
- def test_client_credentials_untrusted(self):
- self.test_client_credentials(altserv=True, fail=True)