[runtime && dist]Configuration and packaging modifications (#165)
Configuration and packaging modifications
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
index 3ea21fa..225ab5c 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/common/CirculatorContext.java
@@ -241,7 +241,9 @@
int endIndex = targetRunnerConfig.getComponents().size() - 1;
TargetKeyValue targetKeyValue = new TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex));
SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
- pusherTaskMap.put(runnerName, sinkTask);
+ if(sinkTask != null) {
+ pusherTaskMap.put(runnerName, sinkTask);
+ }
if (!pusherExecutorMap.containsKey(runnerName)) {
pusherExecutorMap.put(runnerName, initDefaultThreadPoolExecutor(runnerName));
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/ConfigLoader.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/ConfigLoader.java
new file mode 100644
index 0000000..d8f2f35
--- /dev/null
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/ConfigLoader.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.io.support.PropertiesLoaderUtils;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * support loading from runtime.properties and spring properties
+ */
+@Component
+@Slf4j
+public class ConfigLoader {
+
+ @Autowired
+ private PropertiesResolveService propertiesResolveService;
+
+ private Properties properties;
+
+ public ConfigLoader() {
+ try {
+ this.properties = PropertiesLoaderUtils.loadAllProperties("runtime.properties");
+ } catch (IOException e) {
+ log.warn("init runtime properties failed ", e);
+ }
+ }
+
+ public String getString(String name, String defaultValue){
+ String result = getValue(name);
+ if (StringUtils.isBlank(result)) {
+ result = defaultValue;
+ }
+ log.debug("Load property '" + name + "' = " + result);
+ return result;
+ }
+
+ public String getString(String name){
+ String result = getValue(name);
+ log.debug("Load property '" + name + "' = " + result);
+ return result;
+ }
+
+ private String getValue(String name){
+ if (StringUtils.isBlank(name)) {
+ return null;
+ }
+ String value = null;
+ try {
+ if (propertiesResolveService != null) {
+ value = propertiesResolveService.getPropertiesValue(name);
+ }
+ } catch (IllegalArgumentException e) {
+ log.warn("Failed load property '" + name + "' from spring.");
+ }
+ if (properties != null && StringUtils.isBlank((value))) {
+ value = properties.getProperty(name);
+ }
+ return value;
+ }
+}
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/PropertiesResolveService.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/PropertiesResolveService.java
new file mode 100644
index 0000000..81e175b
--- /dev/null
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/PropertiesResolveService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.runtimer;
+
+import org.apache.rocketmq.eventbridge.config.CommonConstants;
+import org.springframework.context.EmbeddedValueResolverAware;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.PriorityOrdered;
+import org.springframework.util.StringValueResolver;
+
+/**
+ * spring props load service
+ */
+@Configuration
+public class PropertiesResolveService implements EmbeddedValueResolverAware, PriorityOrdered {
+
+ private StringValueResolver stringValueResolver;
+
+ @Override
+ public void setEmbeddedValueResolver(StringValueResolver resolver) {
+ stringValueResolver = resolver;
+ }
+
+ public String getPropertiesValue(String name) {
+ if (!name.contains(CommonConstants.V_PREFIXE)) {
+ name = CommonConstants.V_PREFIXE + name + CommonConstants.V_POSTFIX;
+ }
+ return stringValueResolver.resolveStringValue(name);
+ }
+
+ @Override
+ public int getOrder() {
+ return PriorityOrdered.HIGHEST_PRECEDENCE;
+ }
+}
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index 58dda61..b88fcfb 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Properties;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -58,7 +57,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
-import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@@ -86,6 +84,9 @@
@Autowired
private TargetRunnerConfigObserver runnerConfigObserver;
+ @Autowired
+ private ConfigLoader configLoader;
+
private final BlockingQueue<MessageExt> messageBuffer = new LinkedBlockingQueue<>(50000);
private Integer pullTimeOut;
@@ -180,17 +181,16 @@
private void initMqProperties() {
try {
ClientConfig clientConfig = new ClientConfig();
- Properties properties = PropertiesLoaderUtils.loadAllProperties("runtime.properties");
- String namesrvAddr = properties.getProperty("rocketmq.namesrvAddr");
- pullTimeOut = Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut"));
- pullBatchSize = Integer.valueOf(properties.getProperty("rocketmq.consumer.pullBatchSize"));
- String accessChannel = properties.getProperty("rocketmq.accessChannel");
- String namespace = properties.getProperty("rocketmq.namespace");
- String accessKey = properties.getProperty("rocketmq.consumer.accessKey");
- String secretKey = properties.getProperty("rocketmq.consumer.secretKey");
- String socks5UserName = properties.getProperty("rocketmq.consumer.socks5UserName");
- String socks5Password = properties.getProperty("rocketmq.consumer.socks5Password");
- String socks5Endpoint = properties.getProperty("rocketmq.consumer.socks5Endpoint");
+ String namesrvAddr = configLoader.getString("rocketmq.namesrvAddr");
+ pullTimeOut = Integer.valueOf(configLoader.getString("rocketmq.consumer.pullTimeOut"));
+ pullBatchSize = Integer.valueOf(configLoader.getString("rocketmq.consumer.pullBatchSize"));
+ String accessChannel = configLoader.getString("rocketmq.accessChannel");
+ String namespace = configLoader.getString("rocketmq.namespace");
+ String accessKey = configLoader.getString("rocketmq.consumer.accessKey");
+ String secretKey = configLoader.getString("rocketmq.consumer.secretKey");
+ String socks5UserName = configLoader.getString("rocketmq.consumer.socks5UserName");
+ String socks5Password = configLoader.getString("rocketmq.consumer.socks5Password");
+ String socks5Endpoint = configLoader.getString("rocketmq.consumer.socks5Endpoint");
clientConfig.setNameSrvAddr(namesrvAddr);
clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ?
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/config/CommonConstants.java b/common/src/main/java/org/apache/rocketmq/eventbridge/config/CommonConstants.java
new file mode 100644
index 0000000..b93eb4a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/config/CommonConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.eventbridge.config;
+
+public interface CommonConstants {
+ String V_PREFIXE = "${";
+
+ String V_POSTFIX = "}";
+}
diff --git a/dist/src/main/assembly/assembly.xml b/dist/src/main/assembly/assembly.xml
index a56bf42..5cfdd12 100644
--- a/dist/src/main/assembly/assembly.xml
+++ b/dist/src/main/assembly/assembly.xml
@@ -50,5 +50,10 @@
<outputDirectory>config/</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
+ <fileSet>
+ <directory>src/main/plugin</directory>
+ <outputDirectory>plugin/</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
</fileSets>
</assembly>
\ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
new file mode 100644
index 0000000..6d86ea5
--- /dev/null
+++ b/docker/Dockerfile
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM eclipse-temurin:8-jdk-centos7 AS builder
+
+RUN set -eux \
+ && yum -y update \
+ && yum -y install curl gnupg unzip \
+ && yum clean all -y
+
+ENV EVENTBRIDGE_USER=apps \
+ EVENTBRIDGE_HOME=/home/apps/rocketmq-eventbridge \
+ EVENTBRIDGE_LOG_DIR=/home/apps/logs \
+ EVENTBRIDGE_LOG_FILE=/home/apps/logs/rocketmq-eventbridge.log
+
+RUN set -x \
+ && adduser "$EVENTBRIDGE_USER" \
+ && mkdir -p "$EVENTBRIDGE_LOG_DIR" \
+ && touch "$EVENTBRIDGE_LOG_FILE"
+
+WORKDIR /home/apps
+ARG version
+# Rocketmq eventbridge version
+ENV ROCKETMQ_EVENTBRIDGE_VERSION ${version}
+
+ADD rocketmq-eventbridge-dist-${ROCKETMQ_EVENTBRIDGE_VERSION}.zip /home/apps
+RUN cd /home/apps && unzip rocketmq-eventbridge-dist-${ROCKETMQ_EVENTBRIDGE_VERSION}.zip && rm -rf rocketmq-eventbridge-dist-${ROCKETMQ_EVENTBRIDGE_VERSION}.zip
+
+RUN chown -R "$EVENTBRIDGE_USER:$EVENTBRIDGE_USER" "$EVENTBRIDGE_HOME" "$EVENTBRIDGE_LOG_DIR"
+COPY docker-entrypoint.sh /home/apps/rocketmq-eventbridge/bin
+RUN chmod 755 /home/apps/rocketmq-eventbridge/bin/docker-entrypoint.sh
+
+EXPOSE 8083 7001 7002
+USER apps
+WORKDIR /home/apps/rocketmq-eventbridge
+
+ENTRYPOINT ["./bin/docker-entrypoint.sh"]
diff --git a/docker/build-image.sh b/docker/build-image.sh
new file mode 100755
index 0000000..0c6a37f
--- /dev/null
+++ b/docker/build-image.sh
@@ -0,0 +1,55 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+checkVersion() {
+ echo "Version = $1"
+ echo $1 |grep -E "^[0-9]+\.[0-9]+\.[0-9]+" > /dev/null
+ if [ $? = 0 ]; then
+ return 1
+ fi
+
+ echo "Version $1 illegal, it should be X.X.X format(e.g. 4.5.0), please check released versions in 'https://archive.apache.org/dist/rocketmq/'"
+ exit -1
+}
+
+checkZipFile() {
+
+ file="../dist/target/rocketmq-eventbridge-dist-${ROCKETMQ_EVENTBRIDGE_VERSION}.zip"
+
+ if [ -e "$file" ]; then
+ return 1
+ fi
+
+ echo "the installation package rocketmq-eventbridge-dist-${ROCKETMQ_EVENTBRIDGE_VERSION}.zip does not exist in dist/target, please check released package"
+ exit -1
+}
+
+if [ $# -lt 1 ]; then
+ echo -e "Usage: sh $0 Version"
+ exit -1
+fi
+ROCKETMQ_EVENTBRIDGE_VERSION=$1
+checkVersion $ROCKETMQ_EVENTBRIDGE_VERSION
+checkZipFile
+# copy package to docker directory
+cp ../dist/target/rocketmq-eventbridge-dist-${ROCKETMQ_EVENTBRIDGE_VERSION}.zip ./
+
+# Build rocketmq-eventbridge
+
+docker build --no-cache -f Dockerfile -t apache/rocketmq-eventbridge:${ROCKETMQ_EVENTBRIDGE_VERSION} --build-arg version=${ROCKETMQ_EVENTBRIDGE_VERSION} .
+rm -rf rocketmq-eventbridge-dist-${ROCKETMQ_EVENTBRIDGE_VERSION}.zip
+
diff --git a/docker/docker-entrypoint.sh b/docker/docker-entrypoint.sh
new file mode 100644
index 0000000..5316741
--- /dev/null
+++ b/docker/docker-entrypoint.sh
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e
+
+START_EVENTBRIDGE="./bin/eventbridge.sh start"
+echo "${START_EVENTBRIDGE}" && source ${START_EVENTBRIDGE}
+
+START_CMD="tail -f /home/apps/logs/rocketmq-eventbridge.log"
+#START_TEST="tail -f /etc/hosts"
+#echo "${START_TEST}" && exec ${START_TEST}
+echo "${START_CMD}" && exec ${START_CMD}
\ No newline at end of file
diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties
index 35547c9..8daf91c 100644
--- a/start/src/main/resources/application.properties
+++ b/start/src/main/resources/application.properties
@@ -32,7 +32,7 @@
runtime.config.mode=DB
runtime.storage.mode=ROCKETMQ
rumtime.name=eventbridge-runtimer
-runtime.pluginpath=~/eventbridge/plugin
+runtime.pluginpath=./plugin
## log