BAHIR-59[AMQ] Fix constructor visibility and error messages (rmetzger)
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
index 86254ff..e10c3c8 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSinkConfig.java
@@ -34,9 +34,9 @@
public AMQSinkConfig(ActiveMQConnectionFactory connectionFactory, String queueName,
SerializationSchema<IN> serializationSchema, boolean persistentDelivery,
DestinationType destinationType) {
- this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
- this.queueName = Preconditions.checkNotNull(queueName, "destinationName");
- this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema");
+ this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory not set");
+ this.queueName = Preconditions.checkNotNull(queueName, "destinationName not set");
+ this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
this.persistentDelivery = persistentDelivery;
this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
}
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
index 49f2cf7..e64b8fd 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
@@ -92,7 +92,7 @@
*
* @param config AMQSource configuration
*/
- AMQSource(AMQSourceConfig<OUT> config) {
+ public AMQSource(AMQSourceConfig<OUT> config) {
super(String.class);
this.connectionFactory = config.getConnectionFactory();
this.destinationName = config.getDestinationName();
@@ -218,7 +218,7 @@
Message message = consumer.receive(1000);
if (! (message instanceof BytesMessage)) {
- LOG.warn("Active MQ source received non bytes message: {}");
+ LOG.warn("Active MQ source received non bytes message: {}", message);
return;
}
BytesMessage bytesMessage = (BytesMessage) message;
diff --git a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
index 2dcb2cb..dd73b0e 100644
--- a/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
+++ b/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSourceConfig.java
@@ -37,11 +37,11 @@
AMQSourceConfig(ActiveMQConnectionFactory connectionFactory, String destinationName,
DeserializationSchema<OUT> deserializationSchema, RunningChecker runningChecker,
DestinationType destinationType) {
- this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory");
- this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName");
- this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
- this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker");
- this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType");
+ this.connectionFactory = Preconditions.checkNotNull(connectionFactory, "connectionFactory not set");
+ this.destinationName = Preconditions.checkNotNull(destinationName, "destinationName not set");
+ this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema not set");
+ this.runningChecker = Preconditions.checkNotNull(runningChecker, "runningChecker not set");
+ this.destinationType = Preconditions.checkNotNull(destinationType, "destinationType not set");
}
public ActiveMQConnectionFactory getConnectionFactory() {