Merge branch 'EDGENT-448' of https://github.com/yanghua/incubator-edgent into develop
diff --git a/connectors/pom.xml b/connectors/pom.xml
index 9190cfa..edc2e5b 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -49,6 +49,7 @@
<module>websocket-jetty</module>
<module>websocket-misc</module>
<module>websocket-server</module>
+ <module>rabbitmq</module>
</modules>
</project>
diff --git a/connectors/rabbitmq/pom.xml b/connectors/rabbitmq/pom.xml
new file mode 100644
index 0000000..a648b61
--- /dev/null
+++ b/connectors/rabbitmq/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>edgent-connectors</artifactId>
+ <groupId>org.apache.edgent</groupId>
+ <version>1.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>edgent-connectors-rabbitmq</artifactId>
+
+ <name>Apache Edgent (Java 8): Connectors: RabbitMQ</name>
+
+ <properties>
+ <remote-resources-maven-plugin.remote-resources.dir>../../src/main/ibm-remote-resources</remote-resources-maven-plugin.remote-resources.dir>
+ <edgent.version></edgent.version>
+ <rabbitmq.version>5.1.2</rabbitmq.version>
+ </properties>
+
+ <dependencies>
+ <!-- edgent start -->
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-api-topology</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </dependency>
+ <!-- edgent end -->
+
+ <!-- edgent test start -->
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-providers-direct</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-providers-direct</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-connectors-common</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-api-topology</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <!-- edgent test end -->
+
+ <!-- rabbitmq start -->
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>${rabbitmq.version}</version>
+ </dependency>
+ <!-- rabbitmq end -->
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java
new file mode 100644
index 0000000..f35a8fe
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+/**
+ * Defines all RabbitMQ config key constant items.
+ */
+public class RabbitmqConfigKeyConstants {
+
+ /**
+ * config key for connection URI, eg: amqp://userName:password@hostName:portNumber/virtualHost
+ */
+ public static final String RABBITMQ_CONFIG_KEY_URI = "rabbitmq.connection.uri";
+
+ /**
+ * config key for RabbitMQ server host
+ */
+ public static final String RABBITMQ_CONFIG_KEY_HOST = "rabbitmq.connection.host";
+
+ /**
+ * config key for RabbitMQ server port, default port is : 5672
+ */
+ public static final String RABBITMQ_CONFIG_KEY_PORT = "rabbitmq.connection.port";
+
+ /**
+ * config key for virtual host which used to split multi-users
+ */
+ public static final String RABBITMQ_CONFIG_KEY_VIRTUAL_HOST = "rabbitmq.connection.virtualHost";
+
+ /**
+ * config key for authorization (user name)
+ */
+ public static final String RABBITMQ_CONFIG_KEY_AUTH_NAME = "rabbitmq.connection.authUsername";
+
+ /**
+ * config key for authorization (password)
+ */
+ public static final String RABBITMQ_CONFIG_KEY_AUTH_PASSWORD = "rabbitmq.connection.authPassword";
+
+ /**
+ * config key for specifying whether enable auto recovery or not.
+ */
+ public static final String RABBITMQ_CONFIG_KEY_AUTO_RECOVERY = "rabbitmq.connection.autoRecovery";
+
+ /**
+ * config key for connection timeout
+ */
+ public static final String RABBITMQ_CONFIG_KEY_TIMEOUT = "rabbitmq.connection.timeout";
+
+ /**
+ * config key for connection network recovery interval
+ */
+ public static final String RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL = "rabbitmq.connection.networkRecoveryInterval";
+
+ /**
+ * config key for connection requested-heartbeat
+ */
+ public static final String RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT = "rabbitmq.connection.requestedHeartbeat";
+
+ /**
+ * config key for specifying whether enable topology recovery or not.
+ */
+ public static final String RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.connection.topologyRecoveryEnabled";
+
+ /**
+ * config key for connection requested channel max num
+ */
+ public static final String RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX = "rabbitmq.connection.requestedChannelMax";
+
+ /**
+ * config key for connection requested frame max
+ */
+ public static final String RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX = "rabbitmq.connection.requestedFrameMax";
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java
new file mode 100644
index 0000000..4c2e881
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector;
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqSubscriber;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import java.util.Map;
+
+/**
+ * {@code RabbitmqConsumer} is a consumer to consume messages from a RabbitMQ messaging broker
+ * <p>
+ * The connector uses and includes components from the RabbitMQ 3.7.3 release.
+ * It has been successfully tested against 3.7.3.
+ * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ * </p>
+ * Smaple use:
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+ * String queue = "testQueue";
+ *
+ * Topology t = ...
+ *
+ * RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
+ * TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
+ *
+ * //...
+ * }
+ * </pre>
+ */
+public class RabbitmqConsumer {
+
+ private final RabbitmqConnector connector;
+ private final Topology topology;
+
+ /**
+ * Create a consumer connector for consuming tuples from a RabbitMQ queue.
+ * <p>
+ * See the RabbitMQ java client document :
+ * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a>
+ * The full config option please see RabbitMQ java client API Reference :
+ * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</>
+ * </p>
+ * @param topology topology to add to
+ * @param config RabbitmqProducer configuration information.
+ */
+ public RabbitmqConsumer(Topology topology, Supplier<Map<String, Object>> config) {
+ this.topology = topology;
+ this.connector = new RabbitmqConnector(config);
+ }
+
+ /**
+ * Subscribe to the specified topics and yield a stream of tuples from the published RabbitMQ records.
+ *
+ * @param toTupleFn A function that yields a tuple from a byte array,
+ * @param queue the specified RabbitMQ queue
+ * @param <T> A function that yields a tuple from a
+ * @return stream of tuples
+ */
+ public <T> TStream<T> subscribe(Function<byte[], T> toTupleFn, String queue) {
+ return topology.events(new RabbitmqSubscriber<>(connector, queue, toTupleFn));
+ }
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java
new file mode 100644
index 0000000..8437285
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector;
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqPublisher;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import java.util.Map;
+
+/**
+ * {@code RabbitmqProducer} is a producer to produce messages to a RabbitMQ messaging broker
+ * <p>
+ * The connector uses and includes components from the RabbitMQ 3.7.3 release.
+ * It has been successfully tested against 3.7.3.
+ * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ * </p>
+ * Sample use:
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+ * String queue = "testQueue";
+ *
+ * Topology t = newTopology("testSimple");
+ * RabbitmqProducer producer = new RabbitmqProducer(t, () -> config);
+ *
+ * //TStream<String> stream = ...
+ *
+ * TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
+ * }
+ * </pre>
+ */
+public class RabbitmqProducer {
+
+ private final RabbitmqConnector connector;
+ private final Topology topology;
+
+ /**
+ * Create a producer connector for publishing tuples to a RabbitMQ queue.
+ * <p>
+ * See the RabbitMQ java client document :
+ * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a>
+ * The full config option please see RabbitMQ java client API Reference :
+ * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</>
+ * </p>
+ * @param topology topology to add to
+ * @param config RabbitmqProducer configuration information.
+ */
+ public RabbitmqProducer(Topology topology, Supplier<Map<String, Object>> config) {
+ this.topology = topology;
+ this.connector = new RabbitmqConnector(config);
+ }
+
+ /**
+ * Publish the stream of tuples to the specified queue.
+ * @param stream The stream to publish
+ * @param queue The specified queue of RabbitMQ
+ * @param msgFn A function that yields the byte[] records from the tuple
+ * @param <T> Tuple type
+ * @return {@link TSink}
+ */
+ public <T> TSink<T> publish(TStream<T> stream, String queue, Function<T, byte[]> msgFn) {
+ return stream.sink(new RabbitmqPublisher<>(connector, queue, msgFn));
+ }
+
+ /**
+ * Publish the stream of tuples to the specified queue.
+ * @param stream The stream to publish
+ * @param queue The specified queue of RabbitMQ
+ * @param msg The string message to publish
+ * @return
+ */
+ public TSink<String> publish(TStream<String> stream, String queue, String msg) {
+ return publish(stream, queue, (String m) -> msg.getBytes());
+ }
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java
new file mode 100644
index 0000000..49dafd2
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * RabbitMQ stream connector.
+ * <P>
+ * Stream tuples may be published to RabbitMQ queues
+ * and created by subscribing to queues.
+ * For more information about RabbitMQ see
+ * <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ */
+package org.apache.edgent.connectors.rabbitmq;
\ No newline at end of file
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java
new file mode 100644
index 0000000..b83cfb6
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java
@@ -0,0 +1,188 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.edgent.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_NAME;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_PASSWORD;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTO_RECOVERY;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TIMEOUT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_URI;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_VIRTUAL_HOST;
+
+/**
+ * A connector to an RabbitMQ server.
+ */
+public class RabbitmqConnector implements AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(RabbitmqConnector.class);
+
+ private final Supplier<Map<String, Object>> configFn;
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Channel channel;
+ private String id;
+
+ public RabbitmqConnector(Supplier<Map<String, Object>> configFn) {
+ this.configFn = configFn;
+ initConnection();
+ }
+
+ public synchronized Channel channel() {
+ if (channel == null) {
+ if (connection != null) {
+ try {
+ channel = connection.createChannel();
+ } catch (IOException e) {
+ logger.error("IOExcetion occurs when create connection channel {}", e);
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ logger.error("Unknown Exception : {}", e);
+ }
+ } else {
+ logger.error("Inner statue inconformity : the rabbitmq connection is null.");
+ throw new RuntimeException("Inner statue inconformity : the rabbitmq connection is null.");
+ }
+ }
+
+ return channel;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ if (channel != null) {
+ channel.close();
+ }
+
+ if (connection != null) {
+ connection.close();
+ }
+
+ if (connectionFactory != null) {
+ connectionFactory = null;
+ }
+ }
+
+ public String id() {
+ if (id == null) {
+ // include our short object Id
+ id = "RabbitMQ " + toString().substring(toString().indexOf('@') + 1);
+ }
+ return id;
+ }
+
+ private void initConnection() {
+ try {
+ this.connectionFactory = getConnectionFactory();
+ this.connection = connectionFactory.newConnection();
+ } catch (Exception e) {
+ logger.error("{}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ConnectionFactory getConnectionFactory() throws Exception {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ Map<String, Object> configMap = configFn.get();
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_URI)) {
+ connectionFactory.setUri(configMap.get(RABBITMQ_CONFIG_KEY_URI).toString());
+ } else {
+ if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_HOST)) {
+ throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_HOST);
+ }
+
+ connectionFactory.setHost(configMap.get(RABBITMQ_CONFIG_KEY_HOST).toString());
+
+ if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_PORT)) {
+ throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_PORT);
+ }
+
+ connectionFactory.setPort(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_PORT).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST)) {
+ connectionFactory.setVirtualHost(configMap.get(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST).toString());
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_NAME)) {
+ connectionFactory.setUsername(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_NAME).toString());
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD)) {
+ connectionFactory.setPassword(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD).toString());
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY)) {
+ connectionFactory.setAutomaticRecoveryEnabled(
+ Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TIMEOUT)) {
+ connectionFactory.setConnectionTimeout(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TIMEOUT).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL)) {
+ connectionFactory.setNetworkRecoveryInterval(
+ Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT)) {
+ connectionFactory.setRequestedHeartbeat(
+ Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED)) {
+ connectionFactory.setTopologyRecoveryEnabled(
+ Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX)) {
+ connectionFactory.setRequestedChannelMax(
+ Integer.parseInt(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX)) {
+ connectionFactory.setRequestedChannelMax(
+ Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX).toString()));
+ }
+
+ return connectionFactory;
+ }
+
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java
new file mode 100644
index 0000000..67c2d45
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * a publisher for RabbitMQ connector
+ */
+public class RabbitmqPublisher<T> implements Consumer<T>, AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(RabbitmqPublisher.class);
+
+ private final RabbitmqConnector connector;
+ private final Function<T, byte[]> msgFn;
+ private final String queue;
+ private String id;
+
+ public RabbitmqPublisher(RabbitmqConnector connector, String queue, Function<T, byte[]> msgFn) {
+ this.connector = connector;
+ this.queue = queue;
+ this.msgFn = msgFn;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ logger.info("{} is closing.", id());
+ connector.close();
+ logger.info("{} is closed.", id());
+ }
+
+ @Override
+ public void accept(T value) {
+ byte[] msg = msgFn.apply(value);
+ try {
+ connector.channel().basicPublish("", queue, null, msg);
+ } catch (IOException e) {
+ logger.error("publish exception : {}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String id() {
+ if (id == null) {
+ // include our short object Id
+ id = connector.id() + " PUB " + toString().substring(toString().indexOf('@') + 1);
+ }
+
+ return id;
+ }
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java
new file mode 100644
index 0000000..40c2d45
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * a subscriber for RabbitMQ connector
+ */
+public class RabbitmqSubscriber<T> implements Consumer<Consumer<T>>, AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(RabbitmqSubscriber.class);
+
+ private final RabbitmqConnector connector;
+ private Function<byte[], T> toTupleFn;
+ private Consumer<T> eventSubmitter;
+ private ExecutorService executor;
+ private String queue;
+ private String id;
+
+ public RabbitmqSubscriber(RabbitmqConnector connector, String queue, Function<byte[], T> toTupleFn) {
+ this.connector = connector;
+ this.queue = queue;
+ this.toTupleFn = toTupleFn;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ logger.info("{} is closing.", id());
+ if (executor != null && !executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+
+ connector.close();
+ logger.info("{} is closed", id());
+ }
+
+ @Override
+ public void accept(Consumer<T> eventSubmitter) {
+ this.eventSubmitter = eventSubmitter;
+
+ executor = Executors.newFixedThreadPool(1);
+
+ executor.submit(() -> {
+ boolean autoAck = false;
+ try {
+ connector.channel().basicConsume(queue, autoAck,
+ new DefaultConsumer(connector.channel()) {
+ @Override
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body)
+ throws IOException {
+ long deliveryTag = envelope.getDeliveryTag();
+
+ acceptCallback(body);
+
+ connector.channel().basicAck(deliveryTag, false);
+ }
+ });
+ } catch (IOException e) {
+ logger.error("Consumer exception : {}", e);
+ }
+ });
+
+ }
+
+ private void acceptCallback(byte[] msg) {
+ T tuple = toTupleFn.apply(msg);
+ eventSubmitter.accept(tuple);
+ }
+
+ public String id() {
+ if (id == null) {
+ // include our short object Id
+ id = connector.id() + " SUB " + toString().substring(toString().indexOf('@') + 1);
+ }
+
+ return id;
+ }
+
+}
diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java
new file mode 100644
index 0000000..9565bf9
--- /dev/null
+++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.connectors.rabbitmq;
+
+/**
+ * RabbitmqStreams connector globalization tests.
+ */
+public class RabbitmqStreamsGlobalTestManual extends RabbitmqStreamsTestManual {
+
+ private static final String globalMsg1 = "你好";
+ private static final String globalMsg2 = "你在嗎";
+
+ public String getMsg1() {
+ return globalMsg1;
+ }
+
+ public String getMsg2() {
+ return globalMsg2;
+ }
+
+}
diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java
new file mode 100644
index 0000000..02ea0bc
--- /dev/null
+++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants;
+import org.apache.edgent.connectors.rabbitmq.RabbitmqConsumer;
+import org.apache.edgent.connectors.rabbitmq.RabbitmqProducer;
+import org.apache.edgent.test.connectors.common.ConnectorTestBase;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A {@link RabbitmqConsumer} manual test case.
+ * Please follow there steps
+ *
+ * step 1 :
+ * Install RabbitMQ server.
+ * For Mac os x, you can use homebrew to install it, the simple command is : `brew install rabbitmq`
+ * For other system, please follow RabbitMQ's offical document :
+ * <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a>
+ *
+ * step 2 :
+ * Start the RabbitMQ server.
+ * For Mac os x, if you install it with homebrew, you can start it with : `brew services start rabbitmq`
+ * For other system, please follow RabbitMQ's offical document :
+ * <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a>
+ * Note : the default port which RabbitMQ server listen is 5672
+ * and it will start a web console which listen port 15672
+ * so you can access it with your web browser, just enter : http://localhost:15672/
+ * you will see a authorization page, the default user name and password are both `guest`
+ *
+ * step 3 :
+ * Create a test queue use the web console.
+ * after starting the server, you can access `http://localhost:15672/#/queues`
+ * then you will find a button named `Add a new queue`, click it and input `testQueue` into `name` field
+ * and click `Add queue` button to create the queue.
+ *
+ * step 4 :
+ * run this test case to test the publish and subscribe method.
+ */
+public class RabbitmqStreamsTestManual extends ConnectorTestBase {
+
+ private static final int PUB_DELAY_MSEC = 15*1000; // have seen 12sec 1st test's consumer startup delay
+ private static final int SEC_TIMEOUT = 20;
+
+ private final String msg1 = "Hello";
+ private final String msg2 = "Are you there?";
+
+ public String getMsg1() {
+ return msg1;
+ }
+
+ public String getMsg2() {
+ return msg2;
+ }
+
+ private Map<String, Object> initRabbitmqConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+
+ return config;
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ Topology t = newTopology("testSimple");
+ Map<String, Object> configMap = initRabbitmqConfig();
+ MsgGenerator generator = new MsgGenerator(t.getName());
+ String queue = "testQueue";
+
+ List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2());
+
+ TStream<String> stream = PlumbingStreams.blockingOneShotDelay(
+ t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
+
+ TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
+
+ RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap);
+
+ TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
+
+ completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0]));
+
+ assertNotNull(sink);
+ }
+
+
+}
diff --git a/pom.xml b/pom.xml
index bbe6867..6450e14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -505,11 +505,28 @@
<version>1.0.0-SNAPSHOT</version>
<executions>
<execution>
- <id>filter-test-jars</id>
+ <id>filter-deploy-artifacts</id>
<phase>install</phase>
<goals>
- <goal>filter-test-jars</goal>
+ <goal>filter-deploy-artifacts</goal>
</goals>
+ <configuration>
+ <filterRules>
+ <!-- Filter out all test-jars -->
+ <filterRule>
+ <type>test-jar</type>
+ </filterRule>
+ <!-- Filter out the signatures of all test-jars -->
+ <filterRule>
+ <type>jar.asc</type>
+ <classifier>tests</classifier>
+ </filterRule>
+ <!-- Filter out any source release archives -->
+ <filterRule>
+ <classifier>source-release</classifier>
+ </filterRule>
+ </filterRules>
+ </configuration>
</execution>
</executions>
</plugin>
diff --git a/utils/edgent-deployment-filter-maven-plugin/pom.xml b/utils/edgent-deployment-filter-maven-plugin/pom.xml
index 64bbf7e..312a7c3 100644
--- a/utils/edgent-deployment-filter-maven-plugin/pom.xml
+++ b/utils/edgent-deployment-filter-maven-plugin/pom.xml
@@ -59,6 +59,13 @@
<artifactId>plexus-utils</artifactId>
<version>3.0.8</version>
</dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>1.3</version>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/pom.xml b/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/pom.xml
index 26dc3cf..466bd6f 100644
--- a/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/pom.xml
+++ b/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/pom.xml
@@ -70,11 +70,28 @@
<version>@project.version@</version>
<executions>
<execution>
- <id>filter-test-jars</id>
+ <id>filter-deploy-artifacts</id>
<phase>install</phase>
<goals>
- <goal>filter-test-jars</goal>
+ <goal>filter-deploy-artifacts</goal>
</goals>
+ <configuration>
+ <filterRules>
+ <!-- Filter out all test-jars -->
+ <filterRule>
+ <type>test-jar</type>
+ </filterRule>
+ <!-- Filter out the signatures of all test-jars -->
+ <filterRule>
+ <type>jar.asc</type>
+ <classifier>tests</classifier>
+ </filterRule>
+ <!-- Filter out any source release archives -->
+ <filterRule>
+ <classifier>source-release</classifier>
+ </filterRule>
+ </filterRules>
+ </configuration>
</execution>
</executions>
</plugin>
@@ -116,6 +133,33 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.apache.resources</groupId>
+ <artifactId>apache-source-release-assembly-descriptor</artifactId>
+ <version>1.0.6</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>source-release-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot>
+ <descriptorRefs>
+ <descriptorRef>${sourceReleaseAssemblyDescriptor}</descriptorRef>
+ </descriptorRefs>
+ <tarLongFileMode>posix</tarLongFileMode>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/verify.groovy b/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/verify.groovy
index c376a30..58286d4 100644
--- a/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/verify.groovy
+++ b/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/verify.groovy
@@ -21,33 +21,66 @@
def jarFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT.jar")
def testJarFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT-tests.jar")
+def testJarAscFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT-tests.jar.asc")
+def sourceReleaseFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT-source-release.zip")
+def sourceReleaseAscFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT-source-release.zip.asc")
// The jar file should exist
assert jarFile.exists() && jarFile.isFile()
-// The test-jar should also exist
+// The test-jar and it's signature should also exist
assert testJarFile.exists() && testJarFile.isFile()
+assert testJarAscFile.exists() && testJarAscFile.isFile()
-// The local repo should contain the test-jar.
+// The source release zip and it's signature should exist
+assert sourceReleaseFile.exists() && sourceReleaseFile.isFile()
+assert sourceReleaseAscFile.exists() && sourceReleaseAscFile.isFile()
+
+// The local repo should contain all files.
def jarLocalRepo = new File("target/maven-repos/local/org/apache/edgent/plugins/it/with-plugin/1.0-SNAPSHOT")
assert jarLocalRepo.exists()
def foundTestJarInLocal = false
+def foundTestJarAscInLocal = false
+def foundSourceReleaseZipInLocal = false
+def foundSourceReleaseZipAscInLocal = false
jarLocalRepo.eachFileRecurse (FileType.FILES) { file ->
println file.name
if(file.name.endsWith("tests.jar")) {
foundTestJarInLocal = true
}
+ if(file.name.endsWith("tests.jar.asc")) {
+ foundTestJarAscInLocal = true
+ }
+ if(file.name.endsWith("source-release.zip")) {
+ foundSourceReleaseZipInLocal = true
+ }
+ if(file.name.endsWith("source-release.zip.asc")) {
+ foundSourceReleaseZipAscInLocal = true
+ }
}
-assert foundTestJarInLocal
+assert foundTestJarInLocal && foundTestJarAscInLocal && foundSourceReleaseZipInLocal && foundSourceReleaseZipAscInLocal
-// The remote repo shouldn't contain it.
+// The remote repo shouldn't not contain test-jar and source-release and their corresponding signatures.
def jarRemoteRepo = new File("target/maven-repos/remote/org/apache/edgent/plugins/it/with-plugin/1.0-SNAPSHOT")
assert jarRemoteRepo.exists()
def foundTestJarInRemote = false
+def foundTestJarAscInRemote = false
+def foundSourceReleaseZipInRemote = false
+def foundSourceReleaseZipAscInRemote = false
jarRemoteRepo.eachFileRecurse (FileType.FILES) { file ->
println file.name
if(file.name.endsWith("tests.jar")) {
foundTestJarInRemote = true
}
+ if(file.name.endsWith("tests.jar.asc")) {
+ foundTestJarAscInRemote = true
+ }
+ if(file.name.endsWith("source-release.zip")) {
+ foundSourceReleaseZipInRemote = true
+ }
+ if(file.name.endsWith("source-release.zip.asc")) {
+ foundSourceReleaseZipAscInRemote = true
+ }
}
-assert !foundTestJarInRemote
+assert !foundTestJarInRemote && !foundTestJarAscInRemote && !foundSourceReleaseZipInRemote && !foundSourceReleaseZipAscInRemote
+
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/pom.xml b/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/pom.xml
index e90f322..002017b 100644
--- a/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/pom.xml
+++ b/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/pom.xml
@@ -101,6 +101,33 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.apache.resources</groupId>
+ <artifactId>apache-source-release-assembly-descriptor</artifactId>
+ <version>1.0.6</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>source-release-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot>
+ <descriptorRefs>
+ <descriptorRef>${sourceReleaseAssemblyDescriptor}</descriptorRef>
+ </descriptorRefs>
+ <tarLongFileMode>posix</tarLongFileMode>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/verify.groovy b/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/verify.groovy
index e294108..ea7d39b 100644
--- a/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/verify.groovy
+++ b/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/verify.groovy
@@ -21,33 +21,65 @@
def jarFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT.jar")
def testJarFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT-tests.jar")
+def testJarAscFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT-tests.jar.asc")
+def sourceReleaseFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT-source-release.zip")
+def sourceReleaseAscFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT-source-release.zip.asc")
// The jar file should exist
assert jarFile.exists() && jarFile.isFile()
-// The test-jar should also exist
+// The test-jar and it's signature should also exist
assert testJarFile.exists() && testJarFile.isFile()
+assert testJarAscFile.exists() && testJarAscFile.isFile()
-// The local repo should contain the test-jar.
+// The source release zip and it's signature should exist
+assert sourceReleaseFile.exists() && sourceReleaseFile.isFile()
+assert sourceReleaseAscFile.exists() && sourceReleaseAscFile.isFile()
+
+// The local repo should contain all expected files.
def jarLocalRepo = new File("target/maven-repos/local/org/apache/edgent/plugins/it/without-plugin/1.0-SNAPSHOT")
assert jarLocalRepo.exists()
def foundTestJarInLocal = false
+def foundTestJarAscInLocal = false
+def foundSourceReleaseZipInLocal = false
+def foundSourceReleaseZipAscInLocal = false
jarLocalRepo.eachFileRecurse (FileType.FILES) { file ->
println file.name
if(file.name.endsWith("tests.jar")) {
foundTestJarInLocal = true
}
+ if(file.name.endsWith("tests.jar.asc")) {
+ foundTestJarAscInLocal = true
+ }
+ if(file.name.endsWith("source-release.zip")) {
+ foundSourceReleaseZipInLocal = true
+ }
+ if(file.name.endsWith("source-release.zip.asc")) {
+ foundSourceReleaseZipAscInLocal = true
+ }
}
-assert foundTestJarInLocal
+assert foundTestJarInLocal && foundTestJarAscInLocal && foundSourceReleaseZipInLocal && foundSourceReleaseZipAscInLocal
-// The remote repo should contain it too.
+// The remote repo should also contain all of them.
def jarRemoteRepo = new File("target/maven-repos/remote/org/apache/edgent/plugins/it/without-plugin/1.0-SNAPSHOT")
assert jarRemoteRepo.exists()
def foundTestJarInRemote = false
+def foundTestJarAscInRemote = false
+def foundSourceReleaseZipInRemote = false
+def foundSourceReleaseZipAscInRemote = false
jarRemoteRepo.eachFileRecurse (FileType.FILES) { file ->
println file.name
if(file.name.endsWith("tests.jar")) {
foundTestJarInRemote = true
}
+ if(file.name.endsWith("tests.jar.asc")) {
+ foundTestJarAscInRemote = true
+ }
+ if(file.name.endsWith("source-release.zip")) {
+ foundSourceReleaseZipInRemote = true
+ }
+ if(file.name.endsWith("source-release.zip.asc")) {
+ foundSourceReleaseZipAscInRemote = true
+ }
}
-assert foundTestJarInRemote
+assert foundTestJarInRemote && foundTestJarAscInLocal && foundSourceReleaseZipInLocal && foundSourceReleaseZipAscInLocal
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterDeployArtifactsMojo.java b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterDeployArtifactsMojo.java
new file mode 100644
index 0000000..13b427e
--- /dev/null
+++ b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterDeployArtifactsMojo.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.plugins.deploymentfilter;
+
+import org.apache.edgent.plugins.deploymentfilter.model.FilterRule;
+import org.apache.maven.artifact.Artifact;
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+
+import org.apache.maven.plugins.annotations.LifecyclePhase;
+import org.apache.maven.plugins.annotations.Mojo;
+import org.apache.maven.plugins.annotations.Parameter;
+import org.apache.maven.project.MavenProject;
+import org.hamcrest.Matcher;
+import org.hamcrest.beans.HasPropertyWithValue;
+import org.hamcrest.core.AllOf;
+import org.hamcrest.core.AnyOf;
+import org.hamcrest.core.IsEqual;
+import org.sonatype.aether.util.StringUtils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Goal which filters all 'test-jar' artifacts from installation and deployment.
+ * The goal is added to the 'install' phase as this way it is executed after the install plugin,
+ * but before the deploy, which is the phase we don't want the artifact to be handled.
+ */
+@Mojo( name = "filter-deploy-artifacts", defaultPhase = LifecyclePhase.INSTALL )
+public class FilterDeployArtifactsMojo
+ extends AbstractMojo
+{
+
+ @Parameter(defaultValue="${project}")
+ private MavenProject project;
+
+ @Parameter
+ private List<FilterRule> filterRules;
+
+ public void execute()
+ throws MojoExecutionException
+ {
+ List<Matcher<? super Artifact>> filter = new LinkedList<Matcher<? super Artifact>>();
+ for (FilterRule filterRule : filterRules) {
+ List<Matcher<? super Artifact>> curFilter = new LinkedList<Matcher<? super Artifact>>();
+ if(!StringUtils.isEmpty(filterRule.getType())) {
+ curFilter.add(HasPropertyWithValue.hasProperty("type",
+ IsEqual.equalTo(filterRule.getType())));
+ }
+ if(!StringUtils.isEmpty(filterRule.getClassifier())) {
+ curFilter.add(HasPropertyWithValue.hasProperty("classifier",
+ IsEqual.equalTo(filterRule.getClassifier())));
+ }
+ if(!StringUtils.isEmpty(filterRule.getGroupId())) {
+ curFilter.add(HasPropertyWithValue.hasProperty("groupId",
+ IsEqual.equalTo(filterRule.getGroupId())));
+ }
+ if(!StringUtils.isEmpty(filterRule.getArtifactId())) {
+ curFilter.add(HasPropertyWithValue.hasProperty("artifactId",
+ IsEqual.equalTo(filterRule.getArtifactId())));
+ }
+ if(!StringUtils.isEmpty(filterRule.getVersion())) {
+ curFilter.add(HasPropertyWithValue.hasProperty("version",
+ IsEqual.equalTo(filterRule.getVersion())));
+ }
+ if(!StringUtils.isEmpty(filterRule.getScope())) {
+ curFilter.add(HasPropertyWithValue.hasProperty("scope",
+ IsEqual.equalTo(filterRule.getScope())));
+ }
+ if(!curFilter.isEmpty()) {
+ filter.add(new AllOf<Artifact>(curFilter));
+ }
+ }
+ AnyOf<Artifact> matcher = new AnyOf<Artifact>(filter);
+
+ // Find all 'test-jar' artifacts.
+ // (This has to be done in separate loops in order to prevent
+ // concurrent modification exceptions.
+ List<Artifact> toBeRemovedArtifacts = new LinkedList<Artifact>();
+ for(Artifact artifact : project.getAttachedArtifacts()) {
+ if(matcher.matches(artifact)) {
+ toBeRemovedArtifacts.add(artifact);
+ }
+ }
+
+ // Remove all of them from the list of attached artifacts.
+ if(!toBeRemovedArtifacts.isEmpty()) {
+ for (Artifact toBeRemovedArtifact : toBeRemovedArtifacts) {
+ getLog().info(" - Excluding artifact " + toBeRemovedArtifact.getArtifactId() +
+ " from deployment.");
+ project.getAttachedArtifacts().remove(toBeRemovedArtifact);
+ }
+ }
+ }
+}
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterTestJarsMojo.java b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterTestJarsMojo.java
deleted file mode 100644
index 0315f19..0000000
--- a/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterTestJarsMojo.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.plugins.deploymentfilter;
-
-import org.apache.maven.artifact.Artifact;
-import org.apache.maven.plugin.AbstractMojo;
-import org.apache.maven.plugin.MojoExecutionException;
-
-import org.apache.maven.plugins.annotations.LifecyclePhase;
-import org.apache.maven.plugins.annotations.Mojo;
-import org.apache.maven.plugins.annotations.Parameter;
-import org.apache.maven.project.MavenProject;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Goal which filters all 'test-jar' artifacts from installation and deployment.
- * The goal is added to the 'install' phase as this way it is executed after the install plugin,
- * but before the deploy, which is the phase we don't want the artifact to be handled.
- */
-@Mojo( name = "filter-test-jars", defaultPhase = LifecyclePhase.INSTALL )
-public class FilterTestJarsMojo
- extends AbstractMojo
-{
-
- @Parameter(defaultValue="${project}")
- private MavenProject project;
-
- public void execute()
- throws MojoExecutionException
- {
- // Find all 'test-jar' artifacts.
- // (This has to be done in separate loops in order to prevent
- // concurrent modification exceptions.
- List<Artifact> toBeRemovedArtifacts = new LinkedList<Artifact>();
- for(Artifact artifact : project.getAttachedArtifacts()) {
- if("test-jar".equals(artifact.getType()) ||
- ("jar.asc".equals(artifact.getType()) && "tests".equals(artifact.getClassifier()))) {
- toBeRemovedArtifacts.add(artifact);
- }
- }
-
- // Remove all of them from the list of attached artifacts.
- if(!toBeRemovedArtifacts.isEmpty()) {
- for (Artifact toBeRemovedArtifact : toBeRemovedArtifacts) {
- getLog().info(" - Excluding test-jar artifact " + toBeRemovedArtifact.getArtifactId() +
- " from deployment.");
- project.getAttachedArtifacts().remove(toBeRemovedArtifact);
- }
- }
- }
-}
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/model/FilterRule.java b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/model/FilterRule.java
new file mode 100644
index 0000000..9f12813
--- /dev/null
+++ b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/model/FilterRule.java
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.plugins.deploymentfilter.model;
+
+public class FilterRule {
+ private String groupId;
+ private String artifactId;
+ private String version;
+ private String type;
+ private String classifier;
+ private String scope;
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ public String getArtifactId() {
+ return artifactId;
+ }
+
+ public void setArtifactId(String artifactId) {
+ this.artifactId = artifactId;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getClassifier() {
+ return classifier;
+ }
+
+ public void setClassifier(String classifier) {
+ this.classifier = classifier;
+ }
+
+ public String getScope() {
+ return scope;
+ }
+
+ public void setScope(String scope) {
+ this.scope = scope;
+ }
+}