[maven-release-plugin] copy for tag 6.0.3
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
index cc87269..3f1d858 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/BrokerMessages.java
@@ -45,56 +45,111 @@
private static Locale _currentLocale = BrokerProperties.getLocale();
public static final String BROKER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker";
- public static final String CONFIG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.config";
- public static final String STATS_DATA_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_data";
- public static final String STOPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stopped";
- public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_msgs";
- public static final String LISTENING_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.listening";
- public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_inactive";
- public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_active";
- public static final String MAX_MEMORY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.max_memory";
- public static final String PLATFORM_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.platform";
- public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.operation";
- public static final String PROCESS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.process";
- public static final String SHUTTING_DOWN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.shutting_down";
- public static final String MANAGEMENT_MODE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.management_mode";
- public static final String STARTUP_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.startup";
- public static final String FATAL_ERROR_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.fatal_error";
public static final String READY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.ready";
+ public static final String FAILED_CHILDREN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.failed_children";
+ public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_active";
+ public static final String LISTENING_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.listening";
+ public static final String STARTUP_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.startup";
+ public static final String MANAGEMENT_MODE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.management_mode";
+ public static final String FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.flow_to_disk_inactive";
+ public static final String STATS_MSGS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_msgs";
+ public static final String PLATFORM_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.platform";
+ public static final String CONFIG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.config";
+ public static final String SHUTTING_DOWN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.shutting_down";
+ public static final String STATS_DATA_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stats_data";
+ public static final String FATAL_ERROR_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.fatal_error";
+ public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.operation";
+ public static final String STOPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.stopped";
+ public static final String PROCESS_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.process";
+ public static final String MAX_MEMORY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "broker.max_memory";
static
{
LoggerFactory.getLogger(BROKER_LOG_HIERARCHY);
- LoggerFactory.getLogger(CONFIG_LOG_HIERARCHY);
- LoggerFactory.getLogger(STATS_DATA_LOG_HIERARCHY);
- LoggerFactory.getLogger(STOPPED_LOG_HIERARCHY);
- LoggerFactory.getLogger(STATS_MSGS_LOG_HIERARCHY);
- LoggerFactory.getLogger(LISTENING_LOG_HIERARCHY);
- LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
- LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
- LoggerFactory.getLogger(MAX_MEMORY_LOG_HIERARCHY);
- LoggerFactory.getLogger(PLATFORM_LOG_HIERARCHY);
- LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
- LoggerFactory.getLogger(PROCESS_LOG_HIERARCHY);
- LoggerFactory.getLogger(SHUTTING_DOWN_LOG_HIERARCHY);
- LoggerFactory.getLogger(MANAGEMENT_MODE_LOG_HIERARCHY);
- LoggerFactory.getLogger(STARTUP_LOG_HIERARCHY);
- LoggerFactory.getLogger(FATAL_ERROR_LOG_HIERARCHY);
LoggerFactory.getLogger(READY_LOG_HIERARCHY);
+ LoggerFactory.getLogger(FAILED_CHILDREN_LOG_HIERARCHY);
+ LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
+ LoggerFactory.getLogger(LISTENING_LOG_HIERARCHY);
+ LoggerFactory.getLogger(STARTUP_LOG_HIERARCHY);
+ LoggerFactory.getLogger(MANAGEMENT_MODE_LOG_HIERARCHY);
+ LoggerFactory.getLogger(FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY);
+ LoggerFactory.getLogger(STATS_MSGS_LOG_HIERARCHY);
+ LoggerFactory.getLogger(PLATFORM_LOG_HIERARCHY);
+ LoggerFactory.getLogger(CONFIG_LOG_HIERARCHY);
+ LoggerFactory.getLogger(SHUTTING_DOWN_LOG_HIERARCHY);
+ LoggerFactory.getLogger(STATS_DATA_LOG_HIERARCHY);
+ LoggerFactory.getLogger(FATAL_ERROR_LOG_HIERARCHY);
+ LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
+ LoggerFactory.getLogger(STOPPED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(PROCESS_LOG_HIERARCHY);
+ LoggerFactory.getLogger(MAX_MEMORY_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages", _currentLocale);
}
/**
* Log a Broker message of the Format:
- * <pre>BRK-1006 : Using configuration : {0}</pre>
+ * <pre>BRK-1004 : Qpid Broker Ready</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage CONFIG(String param1)
+ public static LogMessage READY()
{
- String rawMessage = _messages.getString("CONFIG");
+ String rawMessage = _messages.getString("READY");
+
+ final String message = rawMessage;
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return READY_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1019 : WARNING - some services were unable to start. The following components are in the ERRORed state {0}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FAILED_CHILDREN(String param1)
+ {
+ String rawMessage = _messages.getString("FAILED_CHILDREN");
final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
@@ -112,292 +167,7 @@
public String getLogHierarchy()
{
- return CONFIG_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1008 : {0,choice,0#delivered|1#received} : {1,number,#.###} kB/s peak : {2,number,#} bytes total</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage STATS_DATA(Number param1, Number param2, Number param3)
- {
- String rawMessage = _messages.getString("STATS_DATA");
-
- final Object[] messageArguments = {param1, param2, param3};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return STATS_DATA_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1005 : Stopped</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage STOPPED()
- {
- String rawMessage = _messages.getString("STOPPED");
-
- final String message = rawMessage;
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return STOPPED_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1009 : {0,choice,0#delivered|1#received} : {1,number,#.###} msg/s peak : {2,number,#} msgs total</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage STATS_MSGS(Number param1, Number param2, Number param3)
- {
- String rawMessage = _messages.getString("STATS_MSGS");
-
- final Object[] messageArguments = {param1, param2, param3};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return STATS_MSGS_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1002 : Starting : Listening on {0} port {1,number,#}</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage LISTENING(String param1, Number param2)
- {
- String rawMessage = _messages.getString("LISTENING");
-
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return LISTENING_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2)
- {
- String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
-
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
+ return FAILED_CHILDREN_LOG_HIERARCHY;
}
@Override
@@ -488,14 +258,14 @@
/**
* Log a Broker message of the Format:
- * <pre>BRK-1011 : Maximum Memory : Heap : {0,number} bytes Direct : {1,number} bytes</pre>
+ * <pre>BRK-1002 : Starting : Listening on {0} port {1,number,#}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage MAX_MEMORY(Number param1, Number param2)
+ public static LogMessage LISTENING(String param1, Number param2)
{
- String rawMessage = _messages.getString("MAX_MEMORY");
+ String rawMessage = _messages.getString("LISTENING");
final Object[] messageArguments = {param1, param2};
// Create a new MessageFormat to ensure thread safety.
@@ -513,297 +283,7 @@
public String getLogHierarchy()
{
- return MAX_MEMORY_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1010 : Platform : JVM : {0} version: {1} OS : {2} version: {3} arch: {4} cores: {5}</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage PLATFORM(String param1, String param2, String param3, String param4, String param5, String param6)
- {
- String rawMessage = _messages.getString("PLATFORM");
-
- final Object[] messageArguments = {param1, param2, param3, param4, param5, param6};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return PLATFORM_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1018 : Operation : {0}</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage OPERATION(String param1)
- {
- String rawMessage = _messages.getString("OPERATION");
-
- final Object[] messageArguments = {param1};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return OPERATION_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1017 : Process : PID : {0}</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage PROCESS(String param1)
- {
- String rawMessage = _messages.getString("PROCESS");
-
- final Object[] messageArguments = {param1};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return PROCESS_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1003 : Shutting down : {0} port {1,number,#}</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage SHUTTING_DOWN(String param1, Number param2)
- {
- String rawMessage = _messages.getString("SHUTTING_DOWN");
-
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return SHUTTING_DOWN_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Broker message of the Format:
- * <pre>BRK-1012 : Management Mode : User Details : {0} / {1}</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage MANAGEMENT_MODE(String param1, String param2)
- {
- String rawMessage = _messages.getString("MANAGEMENT_MODE");
-
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return MANAGEMENT_MODE_LOG_HIERARCHY;
+ return LISTENING_LOG_HIERARCHY;
}
@Override
@@ -894,6 +374,412 @@
/**
* Log a Broker message of the Format:
+ * <pre>BRK-1012 : Management Mode : User Details : {0} / {1}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage MANAGEMENT_MODE(String param1, String param2)
+ {
+ String rawMessage = _messages.getString("MANAGEMENT_MODE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return MANAGEMENT_MODE_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1015 : Message flow to disk inactive : Message memory use {0,number,#}KB within threshold {1,number,#.##}KB</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FLOW_TO_DISK_INACTIVE(Number param1, Number param2)
+ {
+ String rawMessage = _messages.getString("FLOW_TO_DISK_INACTIVE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return FLOW_TO_DISK_INACTIVE_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1009 : {0,choice,0#delivered|1#received} : {1,number,#.###} msg/s peak : {2,number,#} msgs total</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage STATS_MSGS(Number param1, Number param2, Number param3)
+ {
+ String rawMessage = _messages.getString("STATS_MSGS");
+
+ final Object[] messageArguments = {param1, param2, param3};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return STATS_MSGS_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1010 : Platform : JVM : {0} version: {1} OS : {2} version: {3} arch: {4} cores: {5}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage PLATFORM(String param1, String param2, String param3, String param4, String param5, String param6)
+ {
+ String rawMessage = _messages.getString("PLATFORM");
+
+ final Object[] messageArguments = {param1, param2, param3, param4, param5, param6};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return PLATFORM_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1006 : Using configuration : {0}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CONFIG(String param1)
+ {
+ String rawMessage = _messages.getString("CONFIG");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CONFIG_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1003 : Shutting down : {0} port {1,number,#}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage SHUTTING_DOWN(String param1, Number param2)
+ {
+ String rawMessage = _messages.getString("SHUTTING_DOWN");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return SHUTTING_DOWN_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1008 : {0,choice,0#delivered|1#received} : {1,number,#.###} kB/s peak : {2,number,#} bytes total</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage STATS_DATA(Number param1, Number param2, Number param3)
+ {
+ String rawMessage = _messages.getString("STATS_DATA");
+
+ final Object[] messageArguments = {param1, param2, param3};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return STATS_DATA_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
* <pre>BRK-1016 : Fatal error : {0} : See log file for more information</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
@@ -952,14 +838,72 @@
/**
* Log a Broker message of the Format:
- * <pre>BRK-1004 : Qpid Broker Ready</pre>
+ * <pre>BRK-1018 : Operation : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage READY()
+ public static LogMessage OPERATION(String param1)
{
- String rawMessage = _messages.getString("READY");
+ String rawMessage = _messages.getString("OPERATION");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return OPERATION_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1005 : Stopped</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage STOPPED()
+ {
+ String rawMessage = _messages.getString("STOPPED");
final String message = rawMessage;
@@ -972,7 +916,123 @@
public String getLogHierarchy()
{
- return READY_LOG_HIERARCHY;
+ return STOPPED_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1017 : Process : PID : {0}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage PROCESS(String param1)
+ {
+ String rawMessage = _messages.getString("PROCESS");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return PROCESS_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Broker message of the Format:
+ * <pre>BRK-1011 : Maximum Memory : Heap : {0,number} bytes Direct : {1,number} bytes</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage MAX_MEMORY(Number param1, Number param2)
+ {
+ String rawMessage = _messages.getString("MAX_MEMORY");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return MAX_MEMORY_LOG_HIERARCHY;
}
@Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
index 52d849c..b74ad36 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
@@ -61,3 +61,5 @@
# 0 - operation name
OPERATION = BRK-1018 : Operation : {0}
+
+FAILED_CHILDREN = BRK-1019 : WARNING - some services were unable to start. The following components are in the ERRORed state {0}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
index 8b8a84e..01be864 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java
@@ -45,42 +45,49 @@
private static Locale _currentLocale = BrokerProperties.getLocale();
public static final String PORT_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port";
- public static final String OPEN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.open";
- public static final String CREATE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.create";
public static final String DELETE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.delete";
- public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.close";
- public static final String CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected_closed";
- public static final String CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected_too_many";
- public static final String CONNECTION_COUNT_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_count_warn";
+ public static final String CREATE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.create";
public static final String UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.unsupported_protocol_header";
+ public static final String CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected_closed";
+ public static final String CONNECTION_COUNT_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_count_warn";
+ public static final String BIND_FAILED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.bind_failed";
+ public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.close";
+ public static final String CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected_too_many";
+ public static final String OPEN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.open";
static
{
LoggerFactory.getLogger(PORT_LOG_HIERARCHY);
- LoggerFactory.getLogger(OPEN_LOG_HIERARCHY);
- LoggerFactory.getLogger(CREATE_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETE_LOG_HIERARCHY);
- LoggerFactory.getLogger(CLOSE_LOG_HIERARCHY);
- LoggerFactory.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY);
+ LoggerFactory.getLogger(CREATE_LOG_HIERARCHY);
+ LoggerFactory.getLogger(UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY);
LoggerFactory.getLogger(CONNECTION_REJECTED_CLOSED_LOG_HIERARCHY);
LoggerFactory.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY);
- LoggerFactory.getLogger(UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY);
+ LoggerFactory.getLogger(BIND_FAILED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(CLOSE_LOG_HIERARCHY);
+ LoggerFactory.getLogger(CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY);
+ LoggerFactory.getLogger(OPEN_LOG_HIERARCHY);
_messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Port_logmessages", _currentLocale);
}
/**
* Log a Port message of the Format:
- * <pre>PRT-1002 : Open</pre>
+ * <pre>PRT-1006 : Delete {0} Port "{1}"</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage OPEN()
+ public static LogMessage DELETE(String param1, String param2)
{
- String rawMessage = _messages.getString("OPEN");
+ String rawMessage = _messages.getString("DELETE");
- final String message = rawMessage;
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
return new LogMessage()
{
@@ -91,7 +98,7 @@
public String getLogHierarchy()
{
- return OPEN_LOG_HIERARCHY;
+ return DELETE_LOG_HIERARCHY;
}
@Override
@@ -182,16 +189,16 @@
/**
* Log a Port message of the Format:
- * <pre>PRT-1006 : Delete {0} Port "{1}"</pre>
+ * <pre>PRT-1007 : Unsupported protocol header received, replying with {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage DELETE(String param1, String param2)
+ public static LogMessage UNSUPPORTED_PROTOCOL_HEADER(String param1)
{
- String rawMessage = _messages.getString("DELETE");
+ String rawMessage = _messages.getString("UNSUPPORTED_PROTOCOL_HEADER");
- final Object[] messageArguments = {param1, param2};
+ final Object[] messageArguments = {param1};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -207,118 +214,7 @@
public String getLogHierarchy()
{
- return DELETE_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Port message of the Format:
- * <pre>PRT-1003 : Close</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage CLOSE()
- {
- String rawMessage = _messages.getString("CLOSE");
-
- final String message = rawMessage;
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return CLOSE_LOG_HIERARCHY;
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
-
- final LogMessage that = (LogMessage) o;
-
- return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
-
- }
-
- @Override
- public int hashCode()
- {
- int result = toString().hashCode();
- result = 31 * result + getLogHierarchy().hashCode();
- return result;
- }
- };
- }
-
- /**
- * Log a Port message of the Format:
- * <pre>PRT-1005 : Connection from {0} rejected. Maximum connection count ({1, number}) for this port already reached.</pre>
- * Optional values are contained in [square brackets] and are numbered
- * sequentially in the method call.
- *
- */
- public static LogMessage CONNECTION_REJECTED_TOO_MANY(String param1, Number param2)
- {
- String rawMessage = _messages.getString("CONNECTION_REJECTED_TOO_MANY");
-
- final Object[] messageArguments = {param1, param2};
- // Create a new MessageFormat to ensure thread safety.
- // Sharing a MessageFormat and using applyPattern is not thread safe
- MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
-
- final String message = formatter.format(messageArguments);
-
- return new LogMessage()
- {
- public String toString()
- {
- return message;
- }
-
- public String getLogHierarchy()
- {
- return CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY;
+ return UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY;
}
@Override
@@ -467,16 +363,16 @@
/**
* Log a Port message of the Format:
- * <pre>PRT-1007 : Unsupported protocol header received, replying with {0}</pre>
+ * <pre>PRT-1009 : FAILED to bind {0} service to {1,number,#} - port in use</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage UNSUPPORTED_PROTOCOL_HEADER(String param1)
+ public static LogMessage BIND_FAILED(String param1, Number param2)
{
- String rawMessage = _messages.getString("UNSUPPORTED_PROTOCOL_HEADER");
+ String rawMessage = _messages.getString("BIND_FAILED");
- final Object[] messageArguments = {param1};
+ final Object[] messageArguments = {param1, param2};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -492,7 +388,7 @@
public String getLogHierarchy()
{
- return UNSUPPORTED_PROTOCOL_HEADER_LOG_HIERARCHY;
+ return BIND_FAILED_LOG_HIERARCHY;
}
@Override
@@ -523,6 +419,171 @@
};
}
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1003 : Close</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CLOSE()
+ {
+ String rawMessage = _messages.getString("CLOSE");
+
+ final String message = rawMessage;
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CLOSE_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1005 : Connection from {0} rejected. Maximum connection count ({1, number}) for this port already reached.</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage CONNECTION_REJECTED_TOO_MANY(String param1, Number param2)
+ {
+ String rawMessage = _messages.getString("CONNECTION_REJECTED_TOO_MANY");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return CONNECTION_REJECTED_TOO_MANY_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Port message of the Format:
+ * <pre>PRT-1002 : Open</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage OPEN()
+ {
+ String rawMessage = _messages.getString("OPEN");
+
+ final String message = rawMessage;
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return OPEN_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+
private PortMessages()
{
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
index 341c229..7df73db 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties
@@ -28,3 +28,5 @@
UNSUPPORTED_PROTOCOL_HEADER = PRT-1007 : Unsupported protocol header received, replying with {0}
CONNECTION_REJECTED_CLOSED = PRT-1008 : Connection from {0} rejected. Port closed.
+BIND_FAILED = PRT-1009 : FAILED to bind {0} service to {1,number,#} - port in use
+
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
index d19f127..a920d89 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
@@ -43,6 +43,8 @@
long getTimestamp();
+ long getNotValidBefore();
+
String getType();
String getReplyTo();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 680eb41..6b31bac 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -94,7 +94,7 @@
public abstract class EntryState
{
- private EntryState()
+ protected EntryState()
{
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
index 75c5a41..734a71b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
@@ -43,19 +43,24 @@
private final String _encoding;
private final byte _priority;
private final long _timestamp;
+ private final long _notValidBefore;
private final String _type;
private final String _replyTo;
private long _arrivalTime;
public InternalMessageHeader(final Map<String, Object> headers,
- final String correlationId,
- final long expiration,
- final String userId,
- final String appId,
- final String messageId,
- final String mimeType,
- final String encoding,
- final byte priority, final long timestamp, final String type, final String replyTo)
+ final String correlationId,
+ final long expiration,
+ final String userId,
+ final String appId,
+ final String messageId,
+ final String mimeType,
+ final String encoding,
+ final byte priority,
+ final long timestamp,
+ final long notValidBefore,
+ final String type,
+ final String replyTo)
{
_headers = headers == null ? new LinkedHashMap<String, Object>()
: new LinkedHashMap<String, Object>(headers);
@@ -69,6 +74,7 @@
_encoding = encoding;
_priority = priority;
_timestamp = timestamp;
+ _notValidBefore = notValidBefore;
_type = type;
_replyTo = replyTo;
_arrivalTime = System.currentTimeMillis();
@@ -85,6 +91,7 @@
_encoding = header.getEncoding();
_priority = header.getPriority();
_timestamp = header.getTimestamp();
+ _notValidBefore = header.getNotValidBefore();
_type = header.getType();
_replyTo = header.getReplyTo();
_headers = new LinkedHashMap<String, Object>();
@@ -150,6 +157,12 @@
}
@Override
+ public long getNotValidBefore()
+ {
+ return _notValidBefore;
+ }
+
+ @Override
public String getType()
{
return _type;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 9a7528f..6737c75 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -1744,7 +1744,7 @@
@Override
public String toString()
{
- return getClass().getSimpleName() + " [id=" + _id + ", name=" + getName() + "]";
+ return getCategoryClass().getSimpleName() + "[id=" + _id + ", name=" + getName() + ", type=" + getType() + "]";
}
public final ConfiguredObjectRecord asObjectRecord()
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 69ad83e..46d45ab 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -27,6 +27,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.message.MessageInfo;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.store.MessageDurability;
@@ -60,6 +61,8 @@
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
String DEFAULT_FILTERS = "defaultFilters";
String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
+ String HOLD_ON_PUBLISH_ENABLED = "holdOnPublishEnabled";
+
String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
@ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
@@ -173,6 +176,13 @@
@ManagedAttribute
Map<String, Map<String,List<String>>> getDefaultFilters();
+
+ @ManagedContextDefault( name = "queue.holdOnPublishEnabled")
+ boolean DEFAULT_HOLD_ON_PUBLISH_ENABLED = false;
+
+ @ManagedAttribute( defaultValue = "${queue.holdOnPublishEnabled}")
+ boolean isHoldOnPublishEnabled();
+
//children
Collection<? extends Binding> getBindings();
@@ -259,4 +269,6 @@
@ManagedOperation(nonModifying = true)
MessageInfo getMessageInfoById(@Param(name = "messageId") long messageId);
+
+ boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 8ac97d0..ab21a7a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -341,6 +341,7 @@
{
boolean hasBrokerAnyErroredChildren = false;
+ List<ConfiguredObject<?>> failedChildren = new ArrayList<>();
for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))
{
final Collection<? extends ConfiguredObject> children = getChildren(childClass);
@@ -352,11 +353,17 @@
hasBrokerAnyErroredChildren = true;
LOGGER.warn("Broker child object '{}' of type '{}' is {}",
child.getName(), childClass.getSimpleName(), State.ERRORED);
+ failedChildren.add(child);
}
}
}
}
+ if(!failedChildren.isEmpty())
+ {
+ getEventLogger().message(BrokerMessages.FAILED_CHILDREN(failedChildren.toString()));
+ }
+
final boolean brokerShutdownOnErroredChild = getContextValue(Boolean.class,
BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD);
if (!_parent.isManagementMode() && brokerShutdownOnErroredChild && hasBrokerAnyErroredChildren)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index f2aa933..47e2798 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -328,7 +328,7 @@
@Override
public String toString()
{
- return getClass().getSimpleName() + " [id=" + getId() + ", name=" + getName() + ", port=" + getPort() + "]";
+ return getCategoryClass().getSimpleName() + "[id=" + getId() + ", name=" + getName() + ", type=" + getType() + ", port=" + getPort() + "]";
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index f2c6d89..d141a91 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -69,6 +69,7 @@
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.TransportProviderFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.transport.PortBindFailureException;
import org.apache.qpid.server.transport.TransportProvider;
import org.apache.qpid.server.util.PortUtil;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -260,20 +261,28 @@
_sslContext = createSslContext();
}
Protocol defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
-
- _transport = transportProvider.createTransport(transportSet,
- _sslContext,
- this,
- getProtocols(),
- defaultSupportedProtocolReply);
-
- _transport.start();
- for (Transport transport : getTransports())
+ try
{
- _broker.getEventLogger().message(BrokerMessages.LISTENING(String.valueOf(transport), _transport.getAcceptingPort()));
- }
+ _transport = transportProvider.createTransport(transportSet,
+ _sslContext,
+ this,
+ getProtocols(),
+ defaultSupportedProtocolReply);
- return State.ACTIVE;
+ _transport.start();
+ for (Transport transport : getTransports())
+ {
+ _broker.getEventLogger()
+ .message(BrokerMessages.LISTENING(String.valueOf(transport),
+ _transport.getAcceptingPort()));
+ }
+ return State.ACTIVE;
+ }
+ catch (PortBindFailureException e)
+ {
+ _broker.getEventLogger().message(PortMessages.BIND_FAILED(getType().toUpperCase(),getPort()));
+ throw e;
+ }
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 26ecf77..f48b0b1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -269,6 +269,9 @@
private long _maximumMessageTtl;
@ManagedAttributeField
private boolean _ensureNondestructiveConsumers;
+ @ManagedAttributeField
+ private volatile boolean _holdOnPublishEnabled;
+
private static final int RECOVERING = 1;
private static final int COMPLETING_RECOVERY = 2;
@@ -282,6 +285,12 @@
private final QueueRunner _queueRunner;
private boolean _closing;
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
+ private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
+
+ private interface HoldMethod
+ {
+ boolean isHeld(MessageReference<?> message, long evalutaionTime);
+ }
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -513,6 +522,18 @@
}
}
+ if(isHoldOnPublishEnabled())
+ {
+ _holdMethods.add(new HoldMethod()
+ {
+ @Override
+ public boolean isHeld(final MessageReference<?> messageReference, final long evaluationTime)
+ {
+ return messageReference.getMessage().getMessageHeader().getNotValidBefore() >= evaluationTime;
+ }
+ });
+ }
+
updateAlertChecks();
}
@@ -645,7 +666,11 @@
return _ensureNondestructiveConsumers;
}
-
+ @Override
+ public boolean isHoldOnPublishEnabled()
+ {
+ return _holdOnPublishEnabled;
+ }
@Override
public Collection<String> getAvailableAttributes()
@@ -2464,6 +2489,8 @@
final long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
_flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
+ final long currentTime = System.currentTimeMillis();
+
long cumulativeQueueSize = 0;
while (queueListIterator.advance())
{
@@ -2492,6 +2519,8 @@
}
else
{
+ node.checkHeld(currentTime);
+
// There is a chance that the node could be deleted by
// the time the check actually occurs. So verify we
// can actually get the message to perform the check.
@@ -2686,6 +2715,42 @@
return _persistentMessageDequeueCount.get();
}
+ @Override
+ public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+ {
+ if(!_holdMethods.isEmpty())
+ {
+ ServerMessage message = queueEntry.getMessage();
+ try
+ {
+ MessageReference ref = message.newReference();
+ try
+ {
+ for(HoldMethod method : _holdMethods)
+ {
+ if(method.isHeld(ref, evaluationTime))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ finally
+ {
+ ref.release();
+ }
+ }
+ catch (MessageDeletedException e)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+
+ }
@Override
public String toString()
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index cb48a50..4156882 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -414,9 +414,8 @@
public final boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
- if (entry.isRejectedBy(this))
+ if (entry.isRejectedBy(this) || entry.checkHeld(System.currentTimeMillis()))
{
-
return false;
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 794206a..bb2be7f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -43,4 +43,6 @@
void setExpiration(long calculatedExpiration);
MessageReference newMessageReference();
+
+ boolean checkHeld(final long evaluationTime);
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index a128c8b..db5497d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -56,6 +56,21 @@
private Set<Long> _rejectedBy = null;
+ private static final EntryState HELD_STATE = new EntryState()
+ {
+ @Override
+ public State getState()
+ {
+ return State.AVAILABLE;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HELD";
+ }
+ };
+
private volatile EntryState _state = AVAILABLE_STATE;
private static final
@@ -196,7 +211,7 @@
public boolean isAvailable()
{
- return _state == AVAILABLE_STATE;
+ return _state.getState() == State.AVAILABLE;
}
public boolean isAcquired()
@@ -268,7 +283,17 @@
private boolean acquire(final EntryState state)
{
- boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
+ boolean acquired = false;
+
+ EntryState currentState;
+
+ while((currentState = _state).getState() == State.AVAILABLE)
+ {
+ if(acquired = _stateUpdater.compareAndSet(this, currentState, state))
+ {
+ break;
+ }
+ }
if(acquired && _stateChangeListeners != null)
{
@@ -405,7 +430,7 @@
if(!getQueue().isDeleted())
{
getQueue().requeue(this);
- if(_stateChangeListeners != null)
+ if(_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
{
notifyStateChange(State.ACQUIRED, State.AVAILABLE);
}
@@ -418,6 +443,38 @@
}
@Override
+ public boolean checkHeld(final long evaluationTime)
+ {
+ EntryState state;
+ while((state = _state).getState() == State.AVAILABLE)
+ {
+ boolean isHeld = getQueue().isHeld(this, evaluationTime);
+ if(state == AVAILABLE_STATE && isHeld)
+ {
+ if(!_stateUpdater.compareAndSet(this, state, HELD_STATE))
+ {
+ continue;
+ }
+ }
+ else if(state == HELD_STATE && !isHeld)
+ {
+
+ if(_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+ {
+ postRelease(state);
+ }
+ else
+ {
+ continue;
+ }
+ }
+ return isHeld;
+
+ }
+ return false;
+ }
+
+ @Override
public QueueConsumer getDeliveredConsumer()
{
EntryState state = _state;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
index ea41f51..7cd7940 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
@@ -165,7 +165,7 @@
InternalMessageHeader header = new InternalMessageHeader(Collections.<String,Object>emptyMap(),
null, 0l, null, null, UUID.randomUUID().toString(),
null, null, (byte)4, System.currentTimeMillis(),
- null, null);
+ 0L, null, null);
return InternalMessage.createListMessage(_virtualHost.getMessageStore(), header, messageList);
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java b/broker-core/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java
index a811806..85a562c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java
@@ -95,6 +95,10 @@
{
throw new SaslException("Unable to obtain data from callback handler: " + e, e);
}
+ catch (IllegalArgumentException e)
+ {
+ throw new SaslException("Error processing SASL response: " + e.getMessage(), e);
+ }
if (passwordCb.isAuthenticated())
{
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
index 2178473..27d7cab 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.transport;
import java.io.IOException;
+import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
@@ -35,8 +36,6 @@
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.AggregateTicker;
-import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
@@ -86,7 +85,14 @@
_serverSocket = ServerSocketChannel.open();
_serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- _serverSocket.bind(_address, acceptBacklog);
+ try
+ {
+ _serverSocket.bind(_address, acceptBacklog);
+ }
+ catch (BindException e)
+ {
+ throw new PortBindFailureException(_address);
+ }
_serverSocket.configureBlocking(false);
_encryptionSet = encryptionSet;
_scheduler = scheduler;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/PortBindFailureException.java b/broker-core/src/main/java/org/apache/qpid/server/transport/PortBindFailureException.java
new file mode 100644
index 0000000..06dfed6
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/PortBindFailureException.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.net.InetSocketAddress;
+
+public class PortBindFailureException extends RuntimeException
+{
+ private final InetSocketAddress _address;
+
+ public PortBindFailureException(final InetSocketAddress address)
+ {
+ super("Unable to bind to address " + address);
+ _address = address;
+ }
+
+ public InetSocketAddress getAddress()
+ {
+ return _address;
+ }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
index 5721991..a1b8368 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
@@ -84,7 +84,7 @@
InternalMessageHeader header = new InternalMessageHeader(headers,
null, 0l, null, null, UUID.randomUUID().toString(),
null, null, (byte) 4, System.currentTimeMillis(),
- null, null);
+ 0L, null, null);
final InternalMessage message =
InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), header, new byte[0]);
message.setInitialRoutingAddress(getName());
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 153d49c..21a03a0 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -97,6 +97,12 @@
return 0;
}
+ @Override
+ public long getNotValidBefore()
+ {
+ return 0;
+ }
+
public String getType()
{
return null;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 33fb517..e260e20 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -71,7 +71,7 @@
abstract class AbstractQueueTestBase extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractQueueTestBase.class);
-
+ private static final long QUEUE_RUNNER_WAIT_TIME = Long.getLong("AbstractQueueTestBase.queueRunnerWaitTime", 150L);
private AMQQueue<?> _queue;
private VirtualHostImpl _virtualHost;
@@ -235,6 +235,88 @@
_consumer.getQueueContext().getReleasedEntry());
}
+ public void testMessageHeldIfNotYetValidWhenConsumerAdded() throws Exception
+ {
+ _queue.close();
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, _qname);
+ attributes.put(Queue.OWNER, _owner);
+ attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.TRUE);
+
+ _queue = (AMQQueue<?>) _virtualHost.createChild(Queue.class, attributes);
+
+ ServerMessage messageA = createMessage(new Long(24));
+ AMQMessageHeader messageHeader = messageA.getMessageHeader();
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+ _queue.enqueue(messageA, null, null);
+ _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+ assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
+ _queue.checkMessageStatus();
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+ assertEquals("Message which was valid was not received", 1, _consumerTarget.getMessages().size());
+ }
+
+ public void testMessageHoldingDependentOnQueueProperty() throws Exception
+ {
+ _queue.close();
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, _qname);
+ attributes.put(Queue.OWNER, _owner);
+ attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.FALSE);
+
+ _queue = (AMQQueue<?>) _virtualHost.createChild(Queue.class, attributes);
+
+ ServerMessage messageA = createMessage(new Long(24));
+ AMQMessageHeader messageHeader = messageA.getMessageHeader();
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+ _queue.enqueue(messageA, null, null);
+ _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+ assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
+
+ }
+
+ public void testUnheldMessageOvertakesHeld() throws Exception
+ {
+ _queue.close();
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, _qname);
+ attributes.put(Queue.OWNER, _owner);
+ attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.TRUE);
+
+ _queue = (AMQQueue<?>) _virtualHost.createChild(Queue.class, attributes);
+
+ ServerMessage messageA = createMessage(new Long(24));
+ AMQMessageHeader messageHeader = messageA.getMessageHeader();
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+ _queue.enqueue(messageA, null, null);
+ ServerMessage messageB = createMessage(new Long(25));
+ _queue.enqueue(messageB, null, null);
+
+ _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+ assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
+ assertEquals("Wrong message received", messageB.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
+
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
+ _queue.checkMessageStatus();
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+ assertEquals("Message which was valid was not received", 2, _consumerTarget.getMessages().size());
+ assertEquals("Wrong message received", messageA.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
+
+ }
+
/**
* Tests that a released queue entry is resent to the subscriber. Verifies also that the
* QueueContext._releasedEntry is reset to null after the entry has been reset.
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index c4da2a5..b09000d 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -136,6 +136,12 @@
}
@Override
+ public long getNotValidBefore()
+ {
+ return _delegate.getNotValidBefore();
+ }
+
+ @Override
public String getType()
{
return _delegate.getType();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
index 1b506d9..81df444 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
@@ -34,6 +34,7 @@
private final DeliveryProperties _deliveryProps;
private final MessageProperties _messageProps;
+ private long _notValidBefore;
public MessageTransferHeader(DeliveryProperties deliveryProps, MessageProperties messageProps)
{
@@ -100,6 +101,14 @@
return _deliveryProps == null ? 0L : _deliveryProps.getTimestamp();
}
+
+ @Override
+ public long getNotValidBefore()
+ {
+ Object header = getHeader("x-qpid-not-valid-before");
+ return header instanceof Number ? ((Number)header).longValue() : 0L;
+ }
+
public String getType()
{
Object type = getHeader(JMS_TYPE);
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index 5bb9811..9131376 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -185,6 +185,12 @@
}
@Override
+ public long getNotValidBefore()
+ {
+ return _delegate.getNotValidBefore();
+ }
+
+ @Override
public String getType()
{
return _delegate.getType();
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 55e967b..6448e0d 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -315,6 +315,13 @@
return getProperties().getReplyToAsString();
}
+ @Override
+ public long getNotValidBefore()
+ {
+ Object header = getHeader("x-qpid-not-valid-before");
+ return header instanceof Number ? ((Number)header).longValue() : 0L;
+ }
+
public Object getHeader(String name)
{
FieldTable ft = getProperties().getHeaders();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index c54eab8..985442e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -412,8 +412,14 @@
{
public void postCommit()
{
- //_link.getEndpoint().settle(_deliveryTag);
- _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState)outcome, true);
+ if(Boolean.TRUE.equals(settled))
+ {
+ _link.getEndpoint().settle(_deliveryTag);
+ }
+ else
+ {
+ _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState) outcome, true);
+ }
_link.getEndpoint().sendFlowConditional();
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
index 0a6fec1..cdebdee 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
@@ -57,7 +57,7 @@
@Override
protected MessageMetaData_1_0 convertMetaData(final InternalMessage serverMessage,
- final SectionEncoder sectionEncoder)
+ final Section bodySection, final SectionEncoder sectionEncoder)
{
List<Section> sections = new ArrayList<Section>(3);
Header header = new Header();
@@ -89,6 +89,10 @@
ApplicationProperties applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() );
sections.add(applicationProperties);
}
+ if(bodySection != null)
+ {
+ sections.add(bodySection);
+ }
return new MessageMetaData_1_0(sections, sectionEncoder);
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index bc44a66..a9a2f1c 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -74,11 +74,15 @@
private StoredMessage<MessageMetaData_1_0> convertToStoredMessage(final M serverMessage, SectionEncoder sectionEncoder)
{
- final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, sectionEncoder);
- return convertServerMessage(metaData, serverMessage, sectionEncoder);
+ Section bodySection = getBodySection(serverMessage);
+
+ final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, bodySection, sectionEncoder);
+ return convertServerMessage(metaData, serverMessage);
}
- abstract protected MessageMetaData_1_0 convertMetaData(final M serverMessage, SectionEncoder sectionEncoder);
+ abstract protected MessageMetaData_1_0 convertMetaData(final M serverMessage,
+ final Section bodySection,
+ SectionEncoder sectionEncoder);
private static Section convertMessageBody(String mimeType, byte[] data)
@@ -203,26 +207,19 @@
}
private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
- final M serverMessage,
- SectionEncoder sectionEncoder)
+ final M serverMessage)
{
- final String mimeType = serverMessage.getMessageHeader().getMimeType();
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getContent(ByteBuffer.wrap(data));
- byte[] uncompressed;
- if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(metaData.getPropertiesSection().getContentEncoding())
- && (uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)))!=null)
+ final QpidByteBuffer allData = QpidByteBuffer.allocateDirect(metaData.getStorableSize());
+ metaData.writeToBuffer(allData);
+ allData.rewind();
+
+ if(metaData.getPropertiesSection() != null)
{
- data = uncompressed;
metaData.getPropertiesSection().setContentEncoding(null);
}
- Section bodySection = convertMessageBody(mimeType, data);
-
- final QpidByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
-
return new StoredMessage<MessageMetaData_1_0>()
{
@Override
@@ -283,17 +280,20 @@
};
}
- private QpidByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
+ private Section getBodySection(final M serverMessage)
{
- int headerSize = (int) metaData.getStorableSize();
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data));
+ byte[] uncompressed;
- sectionEncoder.reset();
- sectionEncoder.encodeObject(bodySection);
- Binary dataEncoding = sectionEncoder.getEncoding();
+ if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(serverMessage.getMessageHeader().getEncoding())
+ && (uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)))!=null)
+ {
+ data = uncompressed;
+ }
- final QpidByteBuffer allData = QpidByteBuffer.allocateDirect(headerSize + dataEncoding.getLength());
- metaData.writeToBuffer(allData);
- allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
- return allData;
+ return convertMessageBody(mimeType, data);
}
+
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 7c8433d..730afb1 100755
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -528,6 +528,23 @@
}
+
+ @Override
+ public long getNotValidBefore()
+ {
+ long notValidBefore;
+ Object annotation;
+ if(_messageAnnotations != null && (annotation = _messageAnnotations.get(Symbol.valueOf("x-qpid-not-valid-before"))) instanceof Number)
+ {
+ notValidBefore = ((Number)annotation).longValue();
+ }
+ else
+ {
+ notValidBefore = 0L;
+ }
+ return notValidBefore;
+ }
+
public String getType()
{
String subject = getSubject();
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
index 506e6b1..7cc4adc 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
@@ -55,7 +55,7 @@
@Override
protected MessageMetaData_1_0 convertMetaData(MessageTransferMessage serverMessage,
- SectionEncoder sectionEncoder)
+ final Section bodySection, SectionEncoder sectionEncoder)
{
List<Section> sections = new ArrayList<Section>(3);
final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties();
@@ -143,6 +143,10 @@
}
}
+ if(bodySection != null)
+ {
+ sections.add(bodySection);
+ }
return new MessageMetaData_1_0(sections, sectionEncoder);
}
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
index c90d4dc..82a89a3 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
@@ -53,7 +53,9 @@
return AMQMessage.class;
}
- protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage, SectionEncoder sectionEncoder)
+ protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage,
+ final Section bodySection,
+ SectionEncoder sectionEncoder)
{
List<Section> sections = new ArrayList<>(3);
@@ -164,7 +166,10 @@
applicationProperties.remove("qpid.subject");
}
sections.add(new ApplicationProperties(applicationProperties));
-
+ if(bodySection != null)
+ {
+ sections.add(bodySection);
+ }
return new MessageMetaData_1_0(sections, sectionEncoder);
}
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 3639118..3f4996e 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1301,6 +1301,7 @@
private String _encoding;
private byte _priority;
private long _timestamp;
+ private long _notValidBefore;
private String _type;
private String _replyTo;
@@ -1349,6 +1350,11 @@
_timestamp = timestamp;
}
+ public void setNotValidBefore(final long notValidBefore)
+ {
+ _notValidBefore = notValidBefore;
+ }
+
public void setType(final String type)
{
_type = type;
@@ -1404,6 +1410,12 @@
return _timestamp;
}
+ @Override
+ public long getNotValidBefore()
+ {
+ return _notValidBefore;
+ }
+
public String getType()
{
return _type;
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index cd6439f..b6e9c17 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.Writer;
+import java.net.BindException;
+import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,6 +47,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.qpid.server.logging.messages.PortMessages;
import org.apache.qpid.server.management.plugin.filter.ExceptionHandlingFilter;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
@@ -87,6 +90,7 @@
import org.apache.qpid.server.model.adapter.AbstractPluginAdapter;
import org.apache.qpid.server.model.port.HttpPort;
import org.apache.qpid.server.model.port.PortManager;
+import org.apache.qpid.server.transport.PortBindFailureException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
@@ -165,6 +169,11 @@
_server.start();
logOperationalListenMessages();
}
+ catch (PortBindFailureException e)
+ {
+ getBroker().getEventLogger().message(PortMessages.BIND_FAILED("HTTP", e.getAddress().getPort()));
+ throw e;
+ }
catch (Exception e)
{
throw new ServerScopedRuntimeException("Failed to start HTTP management on ports : " + httpPorts, e);
@@ -324,6 +333,20 @@
super.customize(endpoint, request);
request.setAttribute(PORT_SERVLET_ATTRIBUTE, thePort);
}
+
+ public void open() throws IOException
+ {
+ try
+ {
+ super.open();
+ }
+ catch (BindException e)
+ {
+ InetSocketAddress addr = getHost() == null ? new InetSocketAddress(getPort())
+ : new InetSocketAddress(getHost(), getPort());
+ throw new PortBindFailureException(addr);
+ }
+ }
};
}
else if (transports.contains(Transport.SSL))
@@ -469,6 +492,20 @@
super.customize(endpoint, request);
request.setAttribute(PORT_SERVLET_ATTRIBUTE, port);
}
+
+ public void open() throws IOException
+ {
+ try
+ {
+ super.open();
+ }
+ catch (BindException e)
+ {
+ InetSocketAddress addr = getHost() == null ? new InetSocketAddress(getPort())
+ : new InetSocketAddress(getHost(), getPort());
+ throw new PortBindFailureException(addr);
+ }
+ }
}
: new SslSelectChannelConnector(factory)
{
@@ -478,6 +515,20 @@
super.customize(endpoint, request);
request.setAttribute(PORT_SERVLET_ATTRIBUTE, port);
}
+
+ public void open() throws IOException
+ {
+ try
+ {
+ super.open();
+ }
+ catch (BindException e)
+ {
+ InetSocketAddress addr = getHost() == null ? new InetSocketAddress(getPort())
+ : new InetSocketAddress(getHost(), getPort());
+ throw new PortBindFailureException(addr);
+ }
+ }
};
return connector;
}
diff --git a/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 6adeb24..ae1c69f 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -99,6 +99,7 @@
public static final int UNKNOWN_TYPE = 3;
private boolean _sendEncrypted;
private String _encryptedRecipients;
+ private long _deliveryDelay;
protected void setExclusive(boolean exclusive)
{
@@ -130,6 +131,11 @@
return _encryptedRecipients;
}
+ public long getDeliveryDelay()
+ {
+ return _deliveryDelay;
+ }
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -314,6 +320,8 @@
_consumerArguments = binding.getConsumerOptions();
_sendEncrypted = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_SEND_ENCRYPTED));
_encryptedRecipients = binding.getOption(BindingURL.OPTION_ENCRYPTED_RECIPIENTS);
+ String deliveryDelayVal = binding.getOption(BindingURL.OPTION_DELIVERY_DELAY);
+ _deliveryDelay = deliveryDelayVal == null ? 0L : Long.parseLong(deliveryDelayVal);
}
protected AMQDestination(String exchangeName, String exchangeClass, String routingKey, String queueName)
@@ -957,7 +965,7 @@
_addressType = _addrHelper.getNodeType();
_node = _addrHelper.getNode();
_link = _addrHelper.getLink();
-
+ _deliveryDelay = _link.getDelay();
_sendEncrypted = _addrHelper.getSendEncrypted();
_encryptedRecipients = _addrHelper.getEncryptedRecipients();
}
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index c4a64e6..e7e29ba 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -100,6 +100,8 @@
private String _userID; // ref user id used in the connection.
+ private long _deliveryDelay;
+
/**
* The default value for immediate flag used this producer is false. That is, a consumer does
@@ -150,6 +152,11 @@
: mandatory;
_userID = connection.isPopulateUserId() ? connection.getUsername() : null;
+
+ if(destination != null && destination.getDeliveryDelay() != 0L)
+ {
+ _deliveryDelay = destination.getDeliveryDelay();
+ }
setPublishMode();
}
@@ -323,7 +330,8 @@
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
+ sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
+ _deliveryDelay);
}
}
@@ -334,7 +342,8 @@
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
+ _deliveryDelay);
}
}
@@ -344,7 +353,8 @@
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate,
+ _deliveryDelay);
}
}
@@ -354,7 +364,7 @@
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+ sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate, _deliveryDelay);
}
}
@@ -365,13 +375,15 @@
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
+ AMQDestination amqDestination = (AMQDestination) destination;
+ sendImpl(amqDestination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory == null
? destination instanceof Topic
? _defaultMandatoryTopicValue
: _defaultMandatoryValue
: _mandatory,
- _immediate);
+ _immediate,
+ amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
@@ -383,13 +395,15 @@
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
+ AMQDestination amqDestination = (AMQDestination) destination;
+ sendImpl(amqDestination, message, deliveryMode, priority, timeToLive,
_mandatory == null
? destination instanceof Topic
? _defaultMandatoryTopicValue
: _defaultMandatoryValue
: _mandatory,
- _immediate);
+ _immediate,
+ amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
@@ -401,7 +415,9 @@
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
+ AMQDestination amqDestination = (AMQDestination) destination;
+ sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, _immediate,
+ amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
@@ -413,7 +429,9 @@
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
+ AMQDestination amqDestination = (AMQDestination) destination;
+ sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
+ amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
@@ -498,10 +516,17 @@
* @param mandatory
* @param immediate
*
+ * @param deliveryDelay
* @throws JMSException
*/
- protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ protected void sendImpl(AMQDestination destination,
+ Message origMessage,
+ int deliveryMode,
+ int priority,
+ long timeToLive,
+ boolean mandatory,
+ boolean immediate,
+ long deliveryDelay) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
@@ -521,7 +546,8 @@
try
{
- sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate,
+ deliveryDelay);
}
catch (TransportException e)
{
@@ -549,7 +575,7 @@
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException;
+ boolean immediate, final long deliveryDelay) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws InvalidDestinationException
{
@@ -724,6 +750,17 @@
_publishMode = publishMode;
}
+ public long getDeliveryDelay()
+ {
+ return _deliveryDelay;
+ }
+
+ @Override
+ public void setDeliveryDelay(final long deliveryDelay)
+ {
+ _deliveryDelay = deliveryDelay;
+ }
+
Logger getLogger()
{
return _logger;
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index d25fe06..a387fba 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -113,7 +113,7 @@
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException
+ boolean immediate, final long deliveryDelay) throws JMSException
{
message.prepareForSending();
@@ -138,7 +138,7 @@
}
long currentTime = 0;
- if (timeToLive > 0 || !isDisableTimestamps())
+ if (timeToLive > 0 || !isDisableTimestamps() || deliveryDelay != 0L)
{
currentTime = System.currentTimeMillis();
}
@@ -189,12 +189,13 @@
deliveryProp.setRoutingKey(routingKey);
}
- if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
- (destination.getSubject() != null ||
+ Map<String,Object> appProps = messageProps.getApplicationHeaders();
+
+ if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
+ (destination.getSubject() != null ||
(messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
)
{
- Map<String,Object> appProps = messageProps.getApplicationHeaders();
if (appProps == null)
{
appProps = new HashMap<String,Object>();
@@ -214,6 +215,18 @@
}
}
+
+ if(deliveryDelay != 0L && (appProps == null || appProps.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null))
+ {
+ if (appProps == null)
+ {
+ appProps = new HashMap<String,Object>();
+ messageProps.setApplicationHeaders(appProps);
+ }
+
+ appProps.put(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime);
+ }
+
ByteBuffer data = message.getData();
boolean encrypt = message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) || destination.sendEncrypted();
if(encrypt)
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 9f869e2..48656dc 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -109,8 +109,8 @@
}
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException
+ UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, final long deliveryDelay) throws JMSException
{
@@ -171,9 +171,11 @@
.getHeaders()
.setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
+ long currentTime;
if (!isDisableTimestamps())
{
- final long currentTime = System.currentTimeMillis();
+
+ currentTime = System.currentTimeMillis();
contentHeaderProperties.setTimestamp(currentTime);
if (timeToLive > 0)
@@ -194,6 +196,20 @@
contentHeaderProperties.setExpiration(0);
}
}
+ else
+ {
+ currentTime = 0L;
+ }
+
+ if(deliveryDelay != 0L && headers.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null)
+ {
+ if(currentTime == 0L)
+ {
+ currentTime = System.currentTimeMillis();
+ }
+ headers.setLong(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime);
+ }
+
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
diff --git a/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 433fb1a..71801d0 100644
--- a/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -135,7 +135,12 @@
dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
_deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE);
}
-
+
+ if(messageProps != null && messageProps.getApplicationHeaders() != null)
+ {
+ messageProps.getApplicationHeaders().remove(QpidMessageProperties.QPID_NOT_VALID_BEFORE);
+ }
+
setJMSDestination(dest);
}
diff --git a/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index a4d61c8..5f13b9d 100644
--- a/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -97,15 +97,20 @@
}
// Used when generating a received message object
- protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, String exchange,
- String routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
- int addressType)
+ protected AMQMessageDelegate_0_8(long deliveryTag,
+ BasicContentHeaderProperties contentHeader,
+ String exchange,
+ String routingKey,
+ AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ int addressType)
{
this(contentHeader, deliveryTag);
Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+ contentHeader.getHeaders().remove(QpidMessageProperties.QPID_NOT_VALID_BEFORE);
+
AMQDestination dest = null;
if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
diff --git a/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java b/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
index 254980d..ad6cdfb 100644
--- a/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
+++ b/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
@@ -32,7 +32,10 @@
public static final String QPID_SUBJECT = "qpid.subject";
public static final String QPID_SUBJECT_JMS_PROPERTY = "JMS_qpid_subject";
public static final String QPID_SUBJECT_JMS_PROPER = QPID_SUBJECT_JMS_PROPERTY.substring(4);
-
+
+ public static final String QPID_NOT_VALID_BEFORE = "x-qpid-not-valid-before";
+
+
// AMQP 0-10 related properties
public static final String AMQP_0_10_APP_ID = "x-amqp-0-10.app-id";
public static final String AMQP_0_10_ROUTING_KEY = "x-amqp-0-10.routing-key";
diff --git a/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 68bb7ee..84ddea1 100644
--- a/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -245,7 +245,11 @@
reliability + "' is not yet supported");
}
}
-
+ Long delay = _linkPropAccess.getLong("delay");
+ if(delay != null)
+ {
+ link.setDelay(delay);
+ }
if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY));
diff --git a/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index b40abf3..639cebf 100644
--- a/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -38,6 +38,7 @@
private FilterType _filterType = FilterType.SUBJECT;
private boolean _isNoLocal;
private boolean _isDurable;
+ private long _delay;
private int _consumerCapacity = -1;
private int _producerCapacity = -1;
private Subscription subscription;
@@ -145,6 +146,16 @@
_bindings = bindings;
}
+ public long getDelay()
+ {
+ return _delay;
+ }
+
+ public void setDelay(final long delay)
+ {
+ _delay = delay;
+ }
+
public SubscriptionQueue getSubscriptionQueue()
{
return _subscriptionQueue;
diff --git a/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
index 82c2b88..2ae27f8 100644
--- a/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
+++ b/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
@@ -36,4 +36,6 @@
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException;
+ void setDeliveryDelay(long delay);
+
}
diff --git a/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/common/src/main/java/org/apache/qpid/configuration/Accessor.java
index b73f08f..eba4db3 100644
--- a/common/src/main/java/org/apache/qpid/configuration/Accessor.java
+++ b/common/src/main/java/org/apache/qpid/configuration/Accessor.java
@@ -120,7 +120,7 @@
}
else
{
- return Long.parseLong((String)source.get(name));
+ return Long.parseLong(source.get(name).toString());
}
}
else
diff --git a/common/src/main/java/org/apache/qpid/url/BindingURL.java b/common/src/main/java/org/apache/qpid/url/BindingURL.java
index da9e8e3..07df69d 100644
--- a/common/src/main/java/org/apache/qpid/url/BindingURL.java
+++ b/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -43,6 +43,7 @@
String OPTION_EXCHANGE_INTERNAL = "exchangeinternal";
String OPTION_SEND_ENCRYPTED = "sendencrypted";
String OPTION_ENCRYPTED_RECIPIENTS = "encryptedrecipients";
+ String OPTION_DELIVERY_DELAY = "deliveryDelay";
/**
@@ -66,7 +67,8 @@
OPTION_EXCHANGE_DURABLE,
OPTION_REJECT_BEHAVIOUR,
OPTION_SEND_ENCRYPTED,
- OPTION_ENCRYPTED_RECIPIENTS)));
+ OPTION_ENCRYPTED_RECIPIENTS,
+ OPTION_DELIVERY_DELAY)));
String getURL();