Merge pull request #2258 from imaffe/affe-0819-prtemplate
doc(rocketmq) add a recommendation to PR template
diff --git a/README.md b/README.md
index 9b28078..91d195c 100644
--- a/README.md
+++ b/README.md
@@ -12,7 +12,7 @@
It offers a variety of features:
-* Messageing patterns including publish/subscribe, request/reply and streaming
+* Messaging patterns including publish/subscribe, request/reply and streaming
* Financial grade transactional message
* Built-in fault tolerance and high availability configuration options base on [DLedger](https://github.com/openmessaging/openmessaging-storage-dledger)
* A variety of cross language clients, such as Java, C/C++, Python, Go
diff --git a/acl/pom.xml b/acl/pom.xml
index f2862b5..ed5e709 100644
--- a/acl/pom.xml
+++ b/acl/pom.xml
@@ -13,15 +13,11 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
- <url>http://maven.apache.org</url>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
@@ -63,11 +59,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
</dependency>
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
index 8973320..77abe0e 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
@@ -94,7 +94,7 @@
}
}
- public static String v6ipProcess(String netaddress, String[] strArray, int index) {
+ public static String v6ipProcess(String netaddress) {
int part;
String subAddress;
boolean isAsterisk = isAsterisk(netaddress);
@@ -120,7 +120,7 @@
}
}
- public static String[] getAddreeStrArray(String netaddress, String partialAddress) {
+ public static String[] getAddresses(String netaddress, String partialAddress) {
String[] parAddStrArray = StringUtils.split(partialAddress.substring(1, partialAddress.length() - 1), ",");
String address = netaddress.substring(0, netaddress.indexOf("{"));
String[] addreeStrArray = new String[parAddStrArray.length];
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java b/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java
index 0acc8e9..8ceb135 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.java
@@ -65,7 +65,6 @@
case "SUB":
return Permission.SUB;
case "PUB|SUB":
- return Permission.PUB | Permission.SUB;
case "SUB|PUB":
return Permission.PUB | Permission.SUB;
case "DENY":
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
index 89638f6..c182d7e 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
@@ -73,7 +73,7 @@
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
- throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
+ throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
log.info("Broker plain acl conf data is : ", plainAclConfData.toString());
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
@@ -164,14 +164,13 @@
return false;
}
- private Map<String, Object> createAclAccessConfigMap(Map<String, Object> existedAccoutMap, PlainAccessConfig plainAccessConfig) {
-
-
+ private Map<String, Object> createAclAccessConfigMap(Map<String, Object> existedAccountMap, PlainAccessConfig plainAccessConfig) {
+
Map<String, Object> newAccountsMap = null;
- if (existedAccoutMap == null) {
+ if (existedAccountMap == null) {
newAccountsMap = new LinkedHashMap<String, Object>();
} else {
- newAccountsMap = existedAccoutMap;
+ newAccountsMap = existedAccountMap;
}
if (StringUtils.isEmpty(plainAccessConfig.getAccessKey()) ||
@@ -278,7 +277,7 @@
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
- throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
+ throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
JSONArray globalWhiteAddrs = plainAclConfData.getJSONArray(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
if (globalWhiteAddrs != null && !globalWhiteAddrs.isEmpty()) {
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyFactory.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyFactory.java
index 6931eb7..0c3548e 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyFactory.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyFactory.java
@@ -52,14 +52,18 @@
if (!last.startsWith("{")) {
throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", remoteAddr));
}
- return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(remoteAddr, last));
+ return new MultipleRemoteAddressStrategy(AclUtils.getAddresses(remoteAddr, last));
} else {
String[] strArray = StringUtils.split(remoteAddr, ".");
- String four = strArray[3];
- if (!four.startsWith("{")) {
+ // However a right IP String provided by user,it always can be divided into 4 parts by '.'.
+ if (strArray.length < 4) {
+ throw new AclException(String.format("MultipleRemoteAddressStrategy has got a/some wrong format IP(s) ", remoteAddr));
+ }
+ String lastStr = strArray[strArray.length - 1];
+ if (!lastStr.startsWith("{")) {
throw new AclException(String.format("MultipleRemoteAddressStrategy netaddress examine scope Exception netaddress", remoteAddr));
}
- return new MultipleRemoteAddressStrategy(AclUtils.getAddreeStrArray(remoteAddr, four));
+ return new MultipleRemoteAddressStrategy(AclUtils.getAddresses(remoteAddr, lastStr));
}
} else if (AclUtils.isComma(remoteAddr)) {
return new MultipleRemoteAddressStrategy(StringUtils.split(remoteAddr, ","));
@@ -153,7 +157,7 @@
for (int i = 1; i < strArray.length; i++) {
if (ipv6Analysis(strArray, i)) {
AclUtils.verify(remoteAddr, index - 1);
- String preAddress = AclUtils.v6ipProcess(remoteAddr, strArray, index);
+ String preAddress = AclUtils.v6ipProcess(remoteAddr);
this.index = StringUtils.split(preAddress, ":").length;
this.head = preAddress;
break;
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
index 5705b74..7df0afa 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
@@ -32,9 +32,9 @@
public class AclUtilsTest {
@Test
- public void getAddreeStrArray() {
+ public void getAddresses() {
String address = "1.1.1.{1,2,3,4}";
- String[] addressArray = AclUtils.getAddreeStrArray(address, "{1,2,3,4}");
+ String[] addressArray = AclUtils.getAddresses(address, "{1,2,3,4}");
List<String> newAddressList = new ArrayList<>();
for (String a : addressArray) {
newAddressList.add(a);
@@ -49,7 +49,7 @@
// IPv6 test
String ipv6Address = "1:ac41:9987::bb22:666:{1,2,3,4}";
- String[] ipv6AddressArray = AclUtils.getAddreeStrArray(ipv6Address, "{1,2,3,4}");
+ String[] ipv6AddressArray = AclUtils.getAddresses(ipv6Address, "{1,2,3,4}");
List<String> newIPv6AddressList = new ArrayList<>();
for (String a : ipv6AddressArray) {
newIPv6AddressList.add(a);
@@ -181,19 +181,23 @@
public void v6ipProcessTest() {
String remoteAddr = "5::7:6:1-200:*";
String[] strArray = StringUtils.split(remoteAddr, ":");
- Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0007:0006");
+ Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr), "0005:0000:0000:0000:0007:0006");
+// Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0007:0006");
remoteAddr = "5::7:6:1-200";
strArray = StringUtils.split(remoteAddr, ":");
- Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0000:0007:0006");
+ Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr), "0005:0000:0000:0000:0000:0007:0006");
+// Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0000:0007:0006");
remoteAddr = "5::7:6:*";
strArray = StringUtils.split(remoteAddr, ":");
- Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0000:0007:0006");
+ Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr), "0005:0000:0000:0000:0000:0007:0006");
+// Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0000:0000:0000:0000:0007:0006");
remoteAddr = "5:7:6:*";
strArray = StringUtils.split(remoteAddr, ":");
- Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0007:0006");
+ Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr), "0005:0007:0006");
+// Assert.assertEquals(AclUtils.v6ipProcess(remoteAddr, strArray, 3), "0005:0007:0006");
}
@Test
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyTest.java
index 8998dd9..87eb37b 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/RemoteAddressStrategyTest.java
@@ -198,6 +198,16 @@
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
plainAccessResource.setWhiteRemoteAddress("::1,2,3}");
remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ plainAccessResource.setWhiteRemoteAddress("192.168.1.{1}");
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ plainAccessResource.setWhiteRemoteAddress("192.168.1.{1,2}");
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ plainAccessResource.setWhiteRemoteAddress("192.168.{1}");
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ plainAccessResource.setWhiteRemoteAddress("{192.168.1}");
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
+ plainAccessResource.setWhiteRemoteAddress("{192.168.1.1}");
+ remoteAddressStrategyFactory.getRemoteAddressStrategy(plainAccessResource);
}
private void multipleNetaddressStrategyTest(RemoteAddressStrategy remoteAddressStrategy) {
diff --git a/broker/pom.xml b/broker/pom.xml
index 8ddd180..8d52edb 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -55,10 +55,6 @@
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- </dependency>
- <dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 6caa235..a237bf6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,10 +16,10 @@
*/
package org.apache.rocketmq.broker.out;
-import com.google.common.collect.Lists;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -123,7 +123,7 @@
final int timeoutMills,
final boolean compressed) {
- final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
+ final List<RegisterBrokerResult> registerBrokerResultList = new Vector<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
@@ -209,7 +209,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
public void unregisterBrokerAll(
@@ -255,7 +255,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
public List<Boolean> needRegister(
@@ -338,7 +338,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
@@ -355,7 +355,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public String getAllDelayOffset(
@@ -372,7 +372,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
@@ -389,7 +389,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public void registerRPCHook(RPCHook rpcHook) {
diff --git a/client/pom.xml b/client/pom.xml
index 75316a9..c775ee8 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -27,11 +27,6 @@
<artifactId>rocketmq-client</artifactId>
<name>rocketmq-client ${project.version}</name>
- <properties>
- <maven.compiler.source>1.6</maven.compiler.source>
- <maven.compiler.target>1.6</maven.compiler.target>
- </properties>
-
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 6718eb5..9cc7c60 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -307,6 +307,22 @@
this.autoCommit = autoCommit;
}
+ public boolean isConnectBrokerByUser() {
+ return this.defaultLitePullConsumerImpl.getPullAPIWrapper().isConnectBrokerByUser();
+ }
+
+ public void setConnectBrokerByUser(boolean connectBrokerByUser) {
+ this.defaultLitePullConsumerImpl.getPullAPIWrapper().setConnectBrokerByUser(connectBrokerByUser);
+ }
+
+ public long getDefaultBrokerId() {
+ return this.defaultLitePullConsumerImpl.getPullAPIWrapper().getDefaultBrokerId();
+ }
+
+ public void setDefaultBrokerId(long defaultBrokerId) {
+ this.defaultLitePullConsumerImpl.getPullAPIWrapper().setDefaultBrokerId(defaultBrokerId);
+ }
+
public int getPullThreadNums() {
return pullThreadNums;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
index e4f2c8d..f07a38b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
@@ -23,12 +23,22 @@
private static final long serialVersionUID = 5975020272601250368L;
private final int responseCode;
private final String errorMessage;
+ private final String brokerAddr;
public MQBrokerException(int responseCode, String errorMessage) {
super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
- + errorMessage));
+ + errorMessage));
this.responseCode = responseCode;
this.errorMessage = errorMessage;
+ this.brokerAddr = null;
+ }
+
+ public MQBrokerException(int responseCode, String errorMessage, String brokerAddr) {
+ super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ + errorMessage + (brokerAddr != null ? " BROKER: " + brokerAddr : "")));
+ this.responseCode = responseCode;
+ this.errorMessage = errorMessage;
+ this.brokerAddr = brokerAddr;
}
public int getResponseCode() {
@@ -38,4 +48,8 @@
public String getErrorMessage() {
return errorMessage;
}
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c64d7c5..7a4d556 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -390,7 +390,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
@@ -414,7 +414,7 @@
default:
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
@@ -502,7 +502,7 @@
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
- return this.processSendResponse(brokerName, msg, response);
+ return this.processSendResponse(brokerName, msg, response,addr);
}
private void sendMessageAsync(
@@ -528,7 +528,7 @@
if (null == sendCallback && response != null) {
try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
@@ -542,7 +542,7 @@
if (response != null) {
try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
@@ -641,7 +641,8 @@
private SendResult processSendResponse(
final String brokerName,
final Message msg,
- final RemotingCommand response
+ final RemotingCommand response,
+ final String addr
) throws MQBrokerException, RemotingCommandException {
SendStatus sendStatus;
switch (response.getCode()) {
@@ -662,7 +663,7 @@
break;
}
default: {
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
}
@@ -741,7 +742,7 @@
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
- PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
+ PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
@@ -768,11 +769,12 @@
) throws RemotingException, InterruptedException, MQBrokerException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
- return this.processPullResponse(response);
+ return this.processPullResponse(response, addr);
}
private PullResult processPullResponse(
- final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
+ final RemotingCommand response,
+ final String addr) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
@@ -789,7 +791,7 @@
break;
default:
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
PullMessageResponseHeader responseHeader =
@@ -822,7 +824,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
@@ -847,7 +849,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
@@ -871,7 +873,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public List<String> getConsumerIdListByGroup(
@@ -898,7 +900,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
@@ -922,7 +924,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId,
@@ -947,7 +949,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public long queryConsumerOffset(
@@ -971,7 +973,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public void updateConsumerOffset(
@@ -992,7 +994,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public void updateConsumerOffsetOneway(
@@ -1024,7 +1026,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public void unregisterClient(
@@ -1050,7 +1052,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public void endTransactionOneway(
@@ -1116,7 +1118,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public Set<MessageQueue> lockBatchMQ(
@@ -1138,7 +1140,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public void unlockBatchMQ(
@@ -1164,7 +1166,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
}
@@ -1187,7 +1189,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
@@ -1217,7 +1219,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup,
@@ -1239,7 +1241,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup,
@@ -1261,7 +1263,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException,
@@ -1279,7 +1281,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis)
@@ -1301,7 +1303,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
}
@@ -1320,7 +1322,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public ClusterInfo getBrokerClusterInfo(
@@ -1670,7 +1672,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
@@ -1694,7 +1696,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis)
@@ -1745,7 +1747,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public TopicList getSystemTopicList(
@@ -2108,7 +2110,7 @@
default:
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
public TopicConfigSerializeWrapper getAllTopicConfig(final String addr,
@@ -2127,7 +2129,7 @@
break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark());
+ throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
public void updateNameServerConfig(final Properties properties, final List<String> nameServers, long timeoutMillis)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 43dffd3..2f9146d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -1115,7 +1115,7 @@
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
- throw new MQClientException("select message queue throwed exception.", e);
+ throw new MQClientException("select message queue threw exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index 7854fcb..651d043 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -64,10 +64,8 @@
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
- if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
- if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
- return mq;
- }
+ if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
+ return mq;
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 64d64f2..f762910 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -84,7 +84,7 @@
offsetStore.updateOffset(messageQueue, 1024, false);
- doThrow(new MQBrokerException(-1, ""))
+ doThrow(new MQBrokerException(-1, "", null))
.when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
diff --git a/common/pom.xml b/common/pom.xml
index 1992aec..ad1a180 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -27,11 +27,6 @@
<artifactId>rocketmq-common</artifactId>
<name>rocketmq-common ${project.version}</name>
- <properties>
- <maven.compiler.source>1.6</maven.compiler.source>
- <maven.compiler.target>1.6</maven.compiler.target>
- </properties>
-
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 2aab5b7..639a200 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -200,14 +200,20 @@
}
public static double getDiskPartitionSpaceUsedPercent(final String path) {
- if (null == path || path.isEmpty())
+ if (null == path || path.isEmpty()) {
+ log.error("Error when measuring disk space usage, path is null or empty, path : {}", path);
return -1;
+ }
+
try {
File file = new File(path);
- if (!file.exists())
+ if (!file.exists()) {
+ log.error("Error when measuring disk space usage, file doesn't exist on this path: {}", path);
return -1;
+ }
+
long totalSpace = file.getTotalSpace();
@@ -218,6 +224,7 @@
return usedSpace / (double) totalSpace;
}
} catch (Exception e) {
+ log.error("Error when measuring disk space usage, got exception: :", e);
return -1;
}
diff --git a/distribution/bin/runserver.sh b/distribution/bin/runserver.sh
index 08fef73..68fd1b4 100644
--- a/distribution/bin/runserver.sh
+++ b/distribution/bin/runserver.sh
@@ -62,12 +62,25 @@
esac
}
+choose_gc_options()
+{
+ # Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ...
+ # '1' means releases befor Java 9
+ JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
+ if [[ "$JAVA_MAJOR_VERSION" -lt "9" ]] ; then
+ JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
+ JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
+ JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
+ else
+ JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
+ JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M"
+ fi
+}
+
choose_gc_log_directory
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
-JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
-JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
-JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
+choose_gc_options
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
diff --git a/distribution/pom.xml b/distribution/pom.xml
index acb0c52..11effc3 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md
index 0cf977e..4a22b3b 100644
--- a/docs/cn/RocketMQ_Example.md
+++ b/docs/cn/RocketMQ_Example.md
@@ -323,13 +323,6 @@
### 2.2 顺序消费消息
```java
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.common.message.MessageExt;
-import java.util.List;
-
package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md
index 828d8c0..b644718 100644
--- a/docs/cn/msg_trace/user_guide.md
+++ b/docs/cn/msg_trace/user_guide.md
@@ -50,7 +50,7 @@
如果用户不准备将消息轨迹的数据存储于系统级的默认TraceTopic,也可以自己定义并创建用户级的Topic来保存轨迹(即为创建普通的Topic用于保存消息轨迹数据)。下面一节会介绍Client客户端的接口如何支持用户自定义的TraceTopic。
## 4. 支持消息轨迹的Client客户端实践
-为了尽可能地减少用户业务系统使用RocketMQ消息轨迹特性的改造工作量,作者在设计时候采用对原来接口增加一个开关参数(**enableMsgTrace**)来实现消息轨迹是否开启;并新增一个自定义参(**customizedTraceTopic**)数来实现用户存储消息轨迹数据至自己创建的用户级Topic。
+为了尽可能地减少用户业务系统使用RocketMQ消息轨迹特性的改造工作量,作者在设计时候采用对原来接口增加一个开关参数(**enableMsgTrace**)来实现消息轨迹是否开启;并新增一个自定义参数(**customizedTraceTopic**)来实现用户存储消息轨迹数据至自己创建的用户级Topic。
### 4.1 发送消息时开启消息轨迹
```
diff --git a/example/pom.xml b/example/pom.xml
index cc69231..4a427d5 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -17,9 +17,9 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -37,6 +37,14 @@
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-acl</artifactId>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
@@ -44,19 +52,5 @@
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
- <dependency>
- <groupId>io.openmessaging</groupId>
- <artifactId>openmessaging-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-openmessaging</artifactId>
- <version>4.7.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-acl</artifactId>
- <version>4.7.1</version>
- </dependency>
</dependencies>
</project>
diff --git a/filter/pom.xml b/filter/pom.xml
index 05d7606..94bfd16 100644
--- a/filter/pom.xml
+++ b/filter/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/logappender/pom.xml b/logappender/pom.xml
index e479679..b67d6f7 100644
--- a/logappender/pom.xml
+++ b/logappender/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-logappender</artifactId>
@@ -44,11 +44,6 @@
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<optional>true</optional>
</dependency>
diff --git a/logging/pom.xml b/logging/pom.xml
index e45ad3b..917e37a 100644
--- a/logging/pom.xml
+++ b/logging/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -27,11 +27,6 @@
<artifactId>rocketmq-logging</artifactId>
<name>rocketmq-logging ${project.version}</name>
- <properties>
- <maven.compiler.source>1.6</maven.compiler.source>
- <maven.compiler.target>1.6</maven.compiler.target>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 77d8cb1..9a7eb35 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -29,11 +29,11 @@
<dependencies>
<dependency>
- <groupId>org.apache.rocketmq</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.rocketmq</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>rocketmq-tools</artifactId>
</dependency>
<dependency>
@@ -45,10 +45,6 @@
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index 6d94c09..8a0db60 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -20,10 +20,10 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-openmessaging</artifactId>
<name>rocketmq-openmessaging ${project.version}</name>
@@ -33,7 +33,7 @@
<artifactId>openmessaging-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.rocketmq</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
</dependencies>
diff --git a/pom.xml b/pom.xml
index adbb335..09438d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,20 +29,16 @@
<inceptionYear>2012</inceptionYear>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.apache.org/</url>
- <prerequisites>
- <maven>3.2.5</maven>
- </prerequisites>
-
<scm>
<url>git@github.com:apache/rocketmq.git</url>
<connection>scm:git:git@github.com:apache/rocketmq.git</connection>
<developerConnection>scm:git:git@github.com:apache/rocketmq.git</developerConnection>
- <tag>rocketmq-all-4.7.1</tag>
+ <tag>HEAD</tag>
</scm>
<mailingLists>
@@ -119,7 +115,6 @@
<module>namesrv</module>
<module>remoting</module>
<module>logappender</module>
- <module>example</module>
<module>srvutil</module>
<module>filter</module>
<module>test</module>
@@ -127,6 +122,7 @@
<module>openmessaging</module>
<module>logging</module>
<module>acl</module>
+ <module>example</module>
</modules>
<build>
@@ -532,12 +528,17 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>rocketmq-example</artifactId>
+ <artifactId>rocketmq-acl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>rocketmq-acl</artifactId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-example</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -551,11 +552,6 @@
<version>1.0.13</version>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <version>1.0.13</version>
- </dependency>
- <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 3c15472..27f44ea 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -27,11 +27,6 @@
<artifactId>rocketmq-remoting</artifactId>
<name>rocketmq-remoting ${project.version}</name>
- <properties>
- <maven.compiler.source>1.6</maven.compiler.source>
- <maven.compiler.target>1.6</maven.compiler.target>
- </properties>
-
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index ce67ef4..36fe9c0 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/store/pom.xml b/store/pom.xml
index ef72fd1..8f4b44a 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -31,7 +31,7 @@
<dependency>
<groupId>io.openmessaging.storage</groupId>
<artifactId>dledger</artifactId>
- <version>0.1</version>
+ <version>0.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
@@ -56,10 +56,5 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 7891f71..c13ad4c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -148,6 +148,10 @@
private String dLegerPeers;
private String dLegerSelfId;
+ private String preferredLeaderId;
+
+ private boolean isEnableBatchPush = false;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -702,4 +706,20 @@
public void setEnableDLegerCommitLog(boolean enableDLegerCommitLog) {
this.enableDLegerCommitLog = enableDLegerCommitLog;
}
+
+ public String getPreferredLeaderId() {
+ return preferredLeaderId;
+ }
+
+ public void setPreferredLeaderId(String preferredLeaderId) {
+ this.preferredLeaderId = preferredLeaderId;
+ }
+
+ public boolean isEnableBatchPush() {
+ return isEnableBatchPush;
+ }
+
+ public void setEnableBatchPush(boolean enableBatchPush) {
+ isEnableBatchPush = enableBatchPush;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 3361b63..400ad78 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -84,6 +84,9 @@
dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);
+ dLedgerConfig.setPreferredLeaderId(defaultMessageStore.getMessageStoreConfig().getPreferredLeaderId());
+ dLedgerConfig.setEnableBatchPush(defaultMessageStore.getMessageStoreConfig().isEnableBatchPush());
+
id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;
dLedgerServer = new DLedgerServer(dLedgerConfig);
dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index 5b0ca34..5864b28 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -63,9 +63,9 @@
if (leaderId != null) {
dLegerServer.getdLedgerConfig().setEnableLeaderElector(false);
if (selfId.equals(leaderId)) {
- dLegerServer.getMemberState().changeToLeader(-1);
+ dLegerServer.getMemberState().changeToLeader(0);
} else {
- dLegerServer.getMemberState().changeToFollower(-1, leaderId);
+ dLegerServer.getMemberState().changeToFollower(0, leaderId);
}
}
diff --git a/test/pom.xml b/test/pom.xml
index 3b930f2..8fc5a71 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -31,7 +31,6 @@
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>1.2.17</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
diff --git a/tools/pom.xml b/tools/pom.xml
index 4f675b2..4c1b647 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>4.7.1</version>
+ <version>4.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -57,10 +57,6 @@
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
index 11a3604..11346bf 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -48,11 +48,11 @@
@Override
public Options buildCommandlineOptions(final Options options) {
- Option opt = new Option("b", "brokerAddr", true, "update which broker");
+ Option opt = new Option("b", "brokerAddr", true, "get which broker");
opt.setRequired(false);
options.addOption(opt);
- opt = new Option("c", "clusterName", true, "update which cluster");
+ opt = new Option("c", "clusterName", true, "get which cluster");
opt.setRequired(false);
options.addOption(opt);