Merge pull request #17 from duhenglucky/auth
removed ons-auth4client dependency
diff --git a/ons-core/ons-client/pom.xml b/ons-core/ons-client/pom.xml
index b9558d8..fde3dc5 100644
--- a/ons-core/ons-client/pom.xml
+++ b/ons-core/ons-client/pom.xml
@@ -33,6 +33,11 @@
<version>${rocketmq.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-acl</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
@@ -45,12 +50,18 @@
<groupId>${project.groupId}</groupId>
<artifactId>ons-trace-core</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>ons-auth4client</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>ons-auth4client</artifactId>
- <version>${project.version}</version>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>${project.groupId}</groupId>-->
+ <!--<artifactId>ons-auth4client</artifactId>-->
+ <!--<version>${project.version}</version>-->
+ <!--</dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
index a82c405..2a82a0b 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
@@ -21,4 +21,6 @@
public static final String TRANSACTION_ID = "__transactionId__";
public static final String TOPIC_PARTITION_SEPARATOR = "#";
+
+ public static final String ONS_CHANNEL_KEY = "OnsChannel";
}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java
new file mode 100644
index 0000000..b4c57d6
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.impl.authority.exception;
+
+public class AuthenticationException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ private String status;
+ private int code;
+
+
+ public AuthenticationException(String status, int code) {
+ super();
+ this.status = status;
+ this.code = code;
+ }
+
+
+ public AuthenticationException(String status, int code, String message) {
+ super(message);
+ this.status = status;
+ this.code = code;
+ }
+
+
+ public AuthenticationException(String status, int code, Throwable throwable) {
+ super(throwable);
+ this.status = status;
+ this.code = code;
+ }
+
+
+ public AuthenticationException(String status, int code, String message, Throwable throwable) {
+ super(message, throwable);
+ this.status = status;
+ this.code = code;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java
new file mode 100644
index 0000000..e4acc99
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.ons.api.impl.rocketmq;
+
+
+public enum ONSChannel {
+
+ CLOUD,
+
+ ALIYUN,
+
+ ALL
+}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java
index 6b78740..984ba6f 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.ons.api.impl.rocketmq;
-
import io.openmessaging.api.Credentials;
import io.openmessaging.api.LifeCycle;
import java.util.Properties;
@@ -28,14 +27,15 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Generated;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.api.Constants;
import org.apache.rocketmq.ons.api.PropertyKeyConst;
import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
import org.apache.rocketmq.ons.api.impl.util.NameAddrUtils;
import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
@@ -74,27 +74,17 @@
public ONSClientAbstract(Properties properties) {
this.properties = properties;
this.sessionCredentials.updateContent(properties);
- if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) &&
- (null == this.sessionCredentials.getAccessKey() || "".equals(this.sessionCredentials.getAccessKey()))) {
- throw new ONSClientException("please set access key");
- }
-
- if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) &&
- (null == this.sessionCredentials.getSecretKey() || "".equals(this.sessionCredentials.getSecretKey()))) {
- throw new ONSClientException("please set secret key");
- }
-
- if (null == this.sessionCredentials.getOnsChannel()) {
- throw new ONSClientException("please set ons channel");
- }
+ ONSChannel onsChannle = ONSChannel.valueOf(this.properties.getProperty(Constants.ONS_CHANNEL_KEY, "ALIYUN"));
this.nameServerAddr = getNameSrvAddrFromProperties();
if (nameServerAddr != null) {
return;
}
- if (nameServerAddr == null && !this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN)) {
+
+ if (nameServerAddr == null && !onsChannle.equals(ONSChannel.ALIYUN)) {
return;
}
+
this.nameServerAddr = fetchNameServerAddr();
if (null == nameServerAddr) {
throw new ONSClientException(FAQ.errorMessage("Can not find name server, May be your network problem.", FAQ.FIND_NS_FAILED));
@@ -239,17 +229,6 @@
@Override
public void updateCredential(Properties credentialProperties) {
- if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) &&
- (null == credentialProperties.getProperty(SessionCredentials.AccessKey)
- || "".equals(credentialProperties.getProperty(SessionCredentials.AccessKey)))) {
- throw new ONSClientException("update credential failed. please set access key.");
- }
-
- if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) &&
- (null == credentialProperties.getProperty(SessionCredentials.SecretKey)
- || "".equals(credentialProperties.getProperty(SessionCredentials.SecretKey)))) {
- throw new ONSClientException("update credential failed. please set secret key");
- }
this.sessionCredentials.updateContent(credentialProperties);
}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
index d82feeb..6cc4220 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
@@ -17,21 +17,22 @@
package org.apache.rocketmq.ons.api.impl.rocketmq;
-
import io.openmessaging.api.MessageSelector;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.api.Constants;
import org.apache.rocketmq.ons.api.PropertyKeyConst;
import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.impl.tracehook.OnsConsumeMessageHookImpl;
import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsConsumeMessageHookImpl;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class ONSConsumerAbstract extends ONSClientAbstract {
@@ -55,8 +56,8 @@
}
this.defaultMQPushConsumer =
- new DefaultMQPushConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials));
-
+ new DefaultMQPushConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials,
+ properties.getProperty(Constants.ONS_CHANNEL_KEY)));
String maxReconsumeTimes = properties.getProperty(PropertyKeyConst.MaxReconsumeTimes);
if (!UtilAll.isBlank(maxReconsumeTimes)) {
@@ -117,15 +118,13 @@
} else {
try {
Properties tempProperties = new Properties();
- tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
- tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
- AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials);
+ AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, new AclClientRPCHook(sessionCredentials));
dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java
index a145c25..b117a00 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java
@@ -17,23 +17,39 @@
package org.apache.rocketmq.ons.api.impl.rocketmq;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.ons.api.Constants;
import org.apache.rocketmq.ons.api.impl.MQClientInfo;
-import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
+import org.apache.rocketmq.ons.api.impl.authority.exception.AuthenticationException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public class OnsClientRPCHook extends ClientRPCHook {
+public class OnsClientRPCHook extends AclClientRPCHook {
+ private static final int CAL_SIGNATURE_FAILED = 10015;
+ private final ONSChannel onsChannel;
- public OnsClientRPCHook(SessionCredentials sessionCredentials) {
+// public OnsClientRPCHook(SessionCredentials sessionCredentials) {
+// super(sessionCredentials);
+// this.onsChannel = ONSChannel.ALIYUN;
+// }
+
+ public OnsClientRPCHook(SessionCredentials sessionCredentials, String channel) {
super(sessionCredentials);
+ this.onsChannel = ONSChannel.valueOf(channel);
}
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
- super.doBeforeRequest(remoteAddr, request);
+ try {
+ super.doBeforeRequest(remoteAddr, request);
+ } catch (AclException aclException) {
+ throw new AuthenticationException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, aclException.getMessage(), aclException);
+ }
+ request.addExtField(Constants.ONS_CHANNEL_KEY, onsChannel.name());
request.setVersion(MQClientInfo.versionCode);
}
-
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
super.doAfterResponse(remoteAddr, request, response);
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
index c840bb2..1164ad1 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
@@ -17,25 +17,25 @@
package org.apache.rocketmq.ons.api.impl.rocketmq;
-
import io.openmessaging.api.Message;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.order.OrderProducer;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
-
+import org.apache.rocketmq.ons.api.Constants;
import org.apache.rocketmq.ons.api.PropertyKeyConst;
import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class OrderProducerImpl extends ONSClientAbstract implements OrderProducer {
@@ -50,8 +50,8 @@
}
this.defaultMQProducer =
- new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials));
-
+ new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials,
+ properties.getProperty(Constants.ONS_CHANNEL_KEY)));
this.defaultMQProducer.setProducerGroup(producerGroup);
@@ -78,15 +78,13 @@
} else {
try {
Properties tempProperties = new Properties();
- tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
- tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name());
- AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials);
+ AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, new AclClientRPCHook(sessionCredentials));
dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
index cccdf05..97107ed 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
@@ -32,13 +32,14 @@
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.api.Constants;
import org.apache.rocketmq.ons.api.PropertyKeyConst;
import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
@@ -56,7 +57,8 @@
}
this.defaultMQProducer =
- new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials));
+ new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials,
+ properties.getProperty(Constants.ONS_CHANNEL_KEY)));
this.defaultMQProducer.setProducerGroup(producerGroup);
@@ -96,7 +98,8 @@
tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name());
- AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials);
+ AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
+ new OnsClientRPCHook(sessionCredentials, properties.getProperty(Constants.ONS_CHANNEL_KEY)));
dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
index d364b88..233d637 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
@@ -63,7 +63,8 @@
}
this.litePullConsumer =
- new DefaultLitePullConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials));
+ new DefaultLitePullConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials,
+ properties.getProperty(Constants.ONS_CHANNEL_KEY)));
String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
this.litePullConsumer.setMessageModel(MessageModel.valueOf(messageModel));
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
index 66b3014..45a6839 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.ons.api.impl.rocketmq;
-
import io.openmessaging.api.Message;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.transaction.LocalTransactionExecuter;
@@ -25,6 +24,7 @@
import io.openmessaging.api.transaction.TransactionStatus;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
@@ -33,11 +33,11 @@
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.ons.api.Constants;
import org.apache.rocketmq.ons.api.PropertyKeyConst;
-import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class TransactionProducerImpl extends ONSClientAbstract implements TransactionProducer {
@@ -53,7 +53,8 @@
producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP";
}
transactionMQProducer =
- new TransactionMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials));
+ new TransactionMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials,
+ properties.getProperty(Constants.ONS_CHANNEL_KEY)));
boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled, "false"));
transactionMQProducer.setVipChannelEnabled(isVipChannelEnabled);
@@ -75,15 +76,13 @@
} else {
try {
Properties tempProperties = new Properties();
- tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
- tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name());
- AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials);
+ AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, new AclClientRPCHook(sessionCredentials));
dispatcher.setHostProducer(transactionMQProducer.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.transactionMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/ons-core/ons-trace-core/pom.xml b/ons-core/ons-trace-core/pom.xml
index 422b192..32317fc 100644
--- a/ons-core/ons-trace-core/pom.xml
+++ b/ons-core/ons-trace-core/pom.xml
@@ -29,11 +29,6 @@
<name>ons-trace-core ${project.version}</name>
<dependencies>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>ons-auth4client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java
index 0877f84..f92ec8d 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java
@@ -18,7 +18,6 @@
package org.apache.rocketmq.ons.open.trace.core.common;
import javax.annotation.Generated;
-
import org.apache.rocketmq.common.MixAll;
@Generated("ons-client")
@@ -42,16 +41,16 @@
public static final String MaxMsgSize = "MaxMsgSize";
-
public static final String groupName = "_INNER_TRACE_PRODUCER";
public static final String traceTopic = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
-
public static final String default_region = MixAll.DEFAULT_TRACE_REGION_ID;
- public static final char CONTENT_SPLITOR = (char)1;
- public static final char FIELD_SPLITOR = (char)2;
+ public static final char CONTENT_SPLITOR = (char) 1;
+ public static final char FIELD_SPLITOR = (char) 2;
public static final String TraceDispatcherType = "DispatcherType";
+
+ public static final String CustomizedTraceTopic = "customizedTraceTopic";
}
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java
index 87ead88..9156680 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java
@@ -30,4 +30,6 @@
void flush() throws IOException;
void shutdown();
+
+ void start(String nameServerAddresses) throws MQClientException;
}
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
index 4f02bbd..601f6df 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
@@ -30,6 +30,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -45,20 +46,20 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDataEncoder;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceTransferBean;
import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.remoting.RPCHook;
public class AsyncArrayDispatcher implements AsyncDispatcher {
private final static InternalLogger CLIENT_LOG = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
- private final DefaultMQProducer traceProducer;
- private final ThreadPoolExecutor traceExecuter;
+ private DefaultMQProducer traceProducer;
+ private final ThreadPoolExecutor traceExecutor;
private AtomicLong discardCount;
private Thread worker;
@@ -71,9 +72,17 @@
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
+ private String customizedTraceTopic;
- public AsyncArrayDispatcher(Properties properties) throws MQClientException {
+ /**
+ * Create AsyncArrayDispatcher with acl RPC hook.
+ *
+ * @param properties
+ * @param rpcHook RPC hook only can be set with AclRPCHook
+ */
+ public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) {
dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType);
+ this.customizedTraceTopic = properties.getProperty(OnsTraceConstants.CustomizedTraceTopic);
int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048"));
queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
this.queueSize = queueSize;
@@ -82,34 +91,14 @@
traceContextQueue = new ArrayBlockingQueue<OnsTraceContext>(1024);
appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
- this.traceExecuter = new ThreadPoolExecutor(//
+ this.traceExecutor = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
- traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties);
- }
-
- public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials) throws MQClientException {
- dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType);
- int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048"));
- queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
- this.queueSize = queueSize;
- batchSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxBatchNum, "1"));
- this.discardCount = new AtomicLong(0L);
- traceContextQueue = new ArrayBlockingQueue<OnsTraceContext>(1024);
- appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
-
- this.traceExecuter = new ThreadPoolExecutor(
- 10,
- 20,
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.appenderQueue,
- new ThreadFactoryImpl("MQTraceSendThread_"));
- traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials);
+ traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
}
public DefaultMQProducerImpl getHostProducer() {
@@ -138,6 +127,12 @@
}
@Override
+ public void start(String nameServerAddresses) throws MQClientException {
+ this.traceProducer.setNamesrvAddr(nameServerAddresses);
+ this.start();
+ }
+
+ @Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((OnsTraceContext) ctx);
if (!result) {
@@ -162,7 +157,7 @@
@Override
public void shutdown() {
this.stopped = true;
- this.traceExecuter.shutdown();
+ this.traceExecutor.shutdown();
TraceProducerFactory.unregisterTraceDispatcher(dispatcherId);
this.removeShutdownHook();
}
@@ -171,6 +166,7 @@
if (shutDownHook == null) {
shutDownHook = new ThreadFactoryImpl("ShutdownHookMQTrace").newThread(new Runnable() {
private volatile boolean hasShutdown = false;
+
@Override
public void run() {
synchronized (this) {
@@ -215,7 +211,7 @@
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
- traceExecuter.submit(request);
+ traceExecutor.submit(request);
} else if (AsyncArrayDispatcher.this.stopped) {
this.stopped = true;
}
@@ -291,9 +287,13 @@
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic,
String currentRegionId) {
- String topic = OnsTraceConstants.traceTopic + currentRegionId;
+ String topic = customizedTraceTopic;
+ if (StringUtils.isBlank(topic)) {
+ topic = OnsTraceConstants.traceTopic + currentRegionId;
+ }
final Message message = new Message(topic, data.getBytes());
message.setKeys(keySet);
+
try {
Set<String> dataBrokerSet = getBrokerSetByTopic(dataTopic);
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
@@ -305,7 +305,7 @@
@Override
public void onException(Throwable e) {
- CLIENT_LOG.info("send trace data ,the traceData is " + data);
+ CLIENT_LOG.info("send trace data ,the traceData is: {} ", data, e);
}
};
if (dataBrokerSet.isEmpty()) {
@@ -333,7 +333,7 @@
}
} catch (Exception e) {
- CLIENT_LOG.info("send trace data,the traceData is" + data);
+ CLIENT_LOG.info("send trace data,the traceData is: {}", data, e);
}
}
@@ -379,4 +379,16 @@
return brokerSet;
}
}
+
+ public String getCustomizedTraceTopic() {
+ return customizedTraceTopic;
+ }
+
+ public void setCustomizedTraceTopic(String customizedTraceTopic) {
+ this.customizedTraceTopic = customizedTraceTopic;
+ }
+
+ public DefaultMQProducer getTraceProducer() {
+ return traceProducer;
+ }
}
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
index 60e65cb..4a75f7b 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
@@ -21,51 +21,21 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.namesrv.TopAddressing;
-
-import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
-import org.apache.rocketmq.ons.api.impl.rocketmq.ClientRPCHook;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
+import org.apache.rocketmq.remoting.RPCHook;
public class TraceProducerFactory {
private static Map<String, Object> dispatcherTable = new ConcurrentHashMap<String, Object>();
private static AtomicBoolean isStarted = new AtomicBoolean(false);
private static DefaultMQProducer traceProducer;
- public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
+ public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) {
if (traceProducer == null) {
- SessionCredentials sessionCredentials = new SessionCredentials();
- Properties sessionProperties = new Properties();
- String accessKey = properties.getProperty(OnsTraceConstants.AccessKey);
- String secretKey = properties.getProperty(OnsTraceConstants.SecretKey);
- sessionProperties.put(OnsTraceConstants.AccessKey, accessKey);
- sessionProperties.put(OnsTraceConstants.SecretKey, secretKey);
- sessionCredentials.updateContent(sessionProperties);
- traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
- traceProducer.setProducerGroup(accessKey + OnsTraceConstants.groupName);
- traceProducer.setSendMsgTimeout(5000);
- traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis())));
- String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR);
- if (nameSrv == null) {
- TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL));
- nameSrv = topAddressing.fetchNSAddr();
- }
- traceProducer.setNamesrvAddr(nameSrv);
- traceProducer.setVipChannelEnabled(false);
- int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize, "128000"));
- traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
- }
- return traceProducer;
- }
-
- public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, SessionCredentials sessionCredentials) {
- if (traceProducer == null) {
- String accessKey = properties.getProperty(OnsTraceConstants.AccessKey);
- traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
- traceProducer.setProducerGroup(accessKey.replace('.', '-') + OnsTraceConstants.groupName);
+ traceProducer = new DefaultMQProducer(rpcHook); //RPC hook only can be set with AclRPCHook
+ traceProducer.setProducerGroup(OnsTraceConstants.groupName);
traceProducer.setSendMsgTimeout(5000);
traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis())));
String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR);
@@ -86,12 +56,14 @@
if (traceProducer != null && isStarted.compareAndSet(false, true)) {
traceProducer.start();
}
+
}
public static void unregisterTraceDispatcher(String dispatcherId) {
dispatcherTable.remove(dispatcherId);
- if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get()) {
+ if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.compareAndSet(true, false)) {
traceProducer.shutdown();
}
}
+
}
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java
deleted file mode 100644
index 9e78880..0000000
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.ons.open.trace.core.hook;
-
-import java.lang.reflect.Field;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.AccessKey;
-import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.ONSChannelKey;
-
-public abstract class AbstractRPCHook implements RPCHook {
- protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
- new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
-
-
- protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String onsChannel) {
- CommandCustomHeader header = request.readCustomHeader();
- // sort property
- SortedMap<String, String> map = new TreeMap<String, String>();
- map.put(AccessKey, ak);
- map.put(ONSChannelKey, onsChannel);
- try {
- // add header properties
- if (null != header) {
- Field[] fields = fieldCache.get(header.getClass());
- if (null == fields) {
- fields = header.getClass().getDeclaredFields();
- for (Field field : fields) {
- field.setAccessible(true);
- }
- Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
- if (null != tmp) {
- fields = tmp;
- }
- }
-
- for (Field field : fields) {
- Object value = field.get(header);
- if (null != value && !field.isSynthetic()) {
- map.put(field.getName(), value.toString());
- }
- }
- }
- return map;
- }
- catch (Exception e) {
- throw new RuntimeException("incompatible exception.", e);
- }
- }
-
-}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java
similarity index 98%
rename from ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java
rename to ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java
index 6385ad9..c7b7e4f 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.ons.api.impl.tracehook;
+package org.apache.rocketmq.ons.open.trace.core.hook;
import java.util.ArrayList;
import org.apache.rocketmq.client.hook.SendMessageContext;
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java
similarity index 98%
rename from ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java
rename to ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java
index d5a0782..e08cac1 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.ons.api.impl.tracehook;
+package org.apache.rocketmq.ons.open.trace.core.hook;
import java.util.ArrayList;
import java.util.List;
diff --git a/ons-core/pom.xml b/ons-core/pom.xml
index cd82803..dcd2f46 100644
--- a/ons-core/pom.xml
+++ b/ons-core/pom.xml
@@ -246,6 +246,11 @@
<version>${rocketmq.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-acl</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>ons-auth4client</artifactId>
<version>${project.version}</version>