Reformat code
diff --git a/remoting-core/remoting-api/pom.xml b/remoting-core/remoting-api/pom.xml
index a5d1d0b..ad7676f 100644
--- a/remoting-core/remoting-api/pom.xml
+++ b/remoting-core/remoting-api/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-xxx</artifactId>
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
index 7ef01db..7179c91 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java
@@ -51,6 +51,15 @@
}
/**
+ * Return the detail message, including the message from the nested exception
+ * if there is one.
+ */
+ @Override
+ public String getMessage() {
+ return getMessageWithCause(super.getMessage(), getCause());
+ }
+
+ /**
* Build a message for the given base message and root cause.
*
* @param message the base message
@@ -71,15 +80,6 @@
}
/**
- * Return the detail message, including the message from the nested exception
- * if there is one.
- */
- @Override
- public String getMessage() {
- return getMessageWithCause(super.getMessage(), getCause());
- }
-
- /**
* Retrieve the innermost cause of this exception, if any.
*
* @return the innermost exception, or {@code null} if none
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
index 97ec2e6..c7f7a9b 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
@@ -58,6 +58,11 @@
this.request = request;
}
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+ }
+
public RemotingCommand getResponse() {
return response;
}
@@ -65,9 +70,4 @@
public void setResponse(RemotingCommand response) {
this.response = response;
}
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
- }
}
diff --git a/remoting-core/remoting-impl/pom.xml b/remoting-core/remoting-impl/pom.xml
index da1b8df..e4887cd 100644
--- a/remoting-core/remoting-impl/pom.xml
+++ b/remoting-core/remoting-impl/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-xxx</artifactId>
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
index 5a50089..a4a7487 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java
@@ -49,7 +49,21 @@
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon));
}
- public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) {
+ public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
+ return new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
+ thread.setDaemon(isDaemon);
+ return thread;
+ }
+ };
+ }
+
+ public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName,
+ boolean isDaemon) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
@@ -80,19 +94,6 @@
return newGenericThreadFactory(processName, threads, false);
}
- public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
- return new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
- thread.setDaemon(isDaemon);
- return thread;
- }
- };
- }
-
public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
final boolean isDaemon) {
return new ThreadFactory() {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
index 5a71452..d4fc15c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java
@@ -49,13 +49,13 @@
}
@Override
- public void writeShort(final short value) {
- buffer.writeShort(value);
+ public void writeInt(int data) {
+ buffer.writeInt(data);
}
@Override
- public void writeInt(int data) {
- buffer.writeInt(data);
+ public void writeShort(final short value) {
+ buffer.writeShort(value);
}
@Override
@@ -69,12 +69,12 @@
}
@Override
- public void readBytes(final ByteBuffer dst) {
+ public void readBytes(byte[] dst) {
buffer.readBytes(dst);
}
@Override
- public void readBytes(byte[] dst) {
+ public void readBytes(final ByteBuffer dst) {
buffer.readBytes(dst);
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
index b90afc1..c7640ce 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
@@ -56,11 +56,6 @@
}
@Override
- protected void deallocate() {
- chunkRegion.release();
- }
-
- @Override
public FileRegion retain() {
super.retain();
return this;
@@ -78,6 +73,11 @@
}
@Override
+ protected void deallocate() {
+ chunkRegion.release();
+ }
+
+ @Override
public FileRegion touch(Object hint) {
return this;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
index ba4a969..4427cd7 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
@@ -70,6 +70,11 @@
}
@Override
+ public int hashCode() {
+ return channel != null ? channel.hashCode() : 0;
+ }
+
+ @Override
public boolean equals(final Object o) {
if (this == o)
return true;
@@ -83,11 +88,6 @@
}
@Override
- public int hashCode() {
- return channel != null ? channel.hashCode() : 0;
- }
-
- @Override
public String toString() {
return "NettyChannelImpl [channel=" + channel + "]";
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
index bfc536b..df88504 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
@@ -28,10 +28,9 @@
// ProtocolMagic(1) + TotalLength(4) + CmdCode(2) + CmdVersion(2) + RequestID(4) + TrafficType(1) + OpCode(2)
// + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4);
public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4;
+ public final static byte PROTOCOL_MAGIC = 0x14;
private final static char PROPERTY_SEPARATOR = '\n';
private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
-
- public final static byte PROTOCOL_MAGIC = 0x14;
private final static int REMARK_MAX_LEN = Short.MAX_VALUE;
private final static int PROPERTY_MAX_LEN = 524288; // 512KB
private final static int PAYLOAD_MAX_LEN = 16777216; // 16MB
@@ -41,7 +40,7 @@
out.writeByte(PROTOCOL_MAGIC);
short remarkLen = 0;
- byte [] remark = null;
+ byte[] remark = null;
if (command.remark() != null) {
remark = command.remark().getBytes(REMOTING_CHARSET);
if (remark.length > REMARK_MAX_LEN) {
@@ -124,7 +123,6 @@
cmd.trafficType(TrafficType.parse(in.readByte()));
cmd.opCode(in.readShort());
-
short remarkLen = in.readShort();
if (remarkLen > 0) {
byte[] bytes = new byte[remarkLen];
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
index b405eda..3a67ead 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
@@ -42,6 +42,26 @@
}
@Override
+ public short cmdCode() {
+ return this.cmdCode;
+ }
+
+ @Override
+ public void cmdCode(short code) {
+ this.cmdCode = code;
+ }
+
+ @Override
+ public short cmdVersion() {
+ return this.cmdVersion;
+ }
+
+ @Override
+ public void cmdVersion(short version) {
+ this.cmdVersion = version;
+ }
+
+ @Override
public int requestID() {
return requestId;
}
@@ -102,26 +122,6 @@
}
@Override
- public short cmdCode() {
- return this.cmdCode;
- }
-
- @Override
- public void cmdCode(short code) {
- this.cmdCode = code;
- }
-
- @Override
- public short cmdVersion() {
- return this.cmdVersion;
- }
-
- @Override
- public void cmdVersion(short version) {
- this.cmdVersion = version;
- }
-
- @Override
public byte[] payload() {
return this.payload;
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index d54c71a..b351445 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -100,6 +100,31 @@
}, 3000, 1000, TimeUnit.MICROSECONDS);
}
+ void scanResponseTable() {
+ /*
+ Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, ResponseResult> next = iterator.next();
+ ResponseResult result = next.getValue();
+
+ if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) {
+ iterator.remove();
+ try {
+ long timeoutMillis = result.getTimeoutMillis();
+ long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp();
+ result.onTimeout(timeoutMillis, costTimeMillis);
+ LOG.error("scan response table command {} failed", result.getRequestId());
+ } catch (Throwable e) {
+ LOG.warn("Error occurred when execute timeout callback !", e);
+ } finally {
+ result.release();
+ LOG.warn("Removed timeout request {} ", result);
+ }
+ }
+ }
+ */
+ }
+
@Override
public void start() {
if (this.channelEventListenerGroup.size() > 0) {
@@ -158,52 +183,6 @@
}
}
- @NotNull
- private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd,
- final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) {
- return new Runnable() {
- @Override
- public void run() {
- try {
- interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE,
- extractRemoteAddress(ctx.channel()), cmd));
-
- RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd);
-
- interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE,
- extractRemoteAddress(ctx.channel()), cmd, response));
-
- handleResponse(response, cmd, ctx);
- } catch (Throwable e) {
- LOG.error(String.format("Process request %s error !", cmd.toString()), e);
-
- handleException(e, cmd, ctx);
- }
- }
- };
- }
-
- private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) {
- if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
- //FiXME Exception interceptor can not throw exception
- interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
- }
- }
-
- private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
- if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
- if (response != null) {
- try {
- writeAndFlush(ctx.channel(), response);
- } catch (Throwable e) {
- LOG.error(String.format("Process request %s success, but transfer response %s failed !",
- cmd.toString(), response.toString()), e);
- }
- }
- }
-
- }
-
private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final ResponseResult responseResult = ackTables.get(cmd.requestID());
if (responseResult != null) {
@@ -254,8 +233,33 @@
}
}
- private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) {
- channel.writeAndFlush(msg).addListener(listener);
+ @NotNull
+ private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd,
+ final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE,
+ extractRemoteAddress(ctx.channel()), cmd));
+
+ RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd);
+
+ interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE,
+ extractRemoteAddress(ctx.channel()), cmd, response));
+
+ handleResponse(response, cmd, ctx);
+ } catch (Throwable e) {
+ LOG.error(String.format("Process request %s error !", cmd.toString()), e);
+
+ handleException(e, cmd, ctx);
+ }
+ }
+ };
+ }
+
+ protected String extractRemoteAddress(Channel channel) {
+ return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
}
private void writeAndFlush(final Channel channel, final Object msg) {
@@ -266,29 +270,25 @@
return this.publicExecutor;
}
- void scanResponseTable() {
- /*
- Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Integer, ResponseResult> next = iterator.next();
- ResponseResult result = next.getValue();
-
- if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) {
- iterator.remove();
+ private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
+ if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
+ if (response != null) {
try {
- long timeoutMillis = result.getTimeoutMillis();
- long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp();
- result.onTimeout(timeoutMillis, costTimeMillis);
- LOG.error("scan response table command {} failed", result.getRequestId());
+ writeAndFlush(ctx.channel(), response);
} catch (Throwable e) {
- LOG.warn("Error occurred when execute timeout callback !", e);
- } finally {
- result.release();
- LOG.warn("Removed timeout request {} ", result);
+ LOG.error(String.format("Process request %s success, but transfer response %s failed !",
+ cmd.toString(), response.toString()), e);
}
}
}
- */
+
+ }
+
+ private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) {
+ if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
+ //FiXME Exception interceptor can not throw exception
+ interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
+ }
}
public RemotingCommand invokeWithInterceptor(final Channel channel, final RemotingCommand request,
@@ -358,6 +358,10 @@
}
}
+ private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) {
+ channel.writeAndFlush(msg).addListener(listener);
+ }
+
public void invokeAsyncWithInterceptor(final Channel channel, final RemotingCommand request,
final AsyncHandler invokeCallback, long timeoutMillis) {
request.trafficType(TrafficType.REQUEST_ASYNC);
@@ -486,13 +490,9 @@
}
}
- public String getRemotingInstanceId() {
- return remotingInstanceId;
- }
-
@Override
- public RemotingCommandFactory commandFactory() {
- return this.remotingCommandFactory;
+ public void registerInterceptor(Interceptor interceptor) {
+ this.interceptorGroup.registerInterceptor(interceptor);
}
@Override
@@ -514,13 +514,22 @@
}
@Override
+ public Pair<RequestProcessor, ExecutorService> processor(short requestCode) {
+ return processorTables.get(requestCode);
+ }
+
+ @Override
public String remotingInstanceId() {
return this.getRemotingInstanceId();
}
+ public String getRemotingInstanceId() {
+ return remotingInstanceId;
+ }
+
@Override
- public void registerInterceptor(Interceptor interceptor) {
- this.interceptorGroup.registerInterceptor(interceptor);
+ public RemotingCommandFactory commandFactory() {
+ return this.remotingCommandFactory;
}
@Override
@@ -528,15 +537,6 @@
this.channelEventListenerGroup.registerChannelEventListener(listener);
}
- @Override
- public Pair<RequestProcessor, ExecutorService> processor(short requestCode) {
- return processorTables.get(requestCode);
- }
-
- protected String extractRemoteAddress(Channel channel) {
- return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
- }
-
class ChannelEventExecutor extends Thread {
private final static int MAX_SIZE = 10000;
private final LinkedBlockingQueue<NettyChannelEvent> eventQueue = new LinkedBlockingQueue<NettyChannelEvent>();
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index faead7f..3dab3db 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -88,6 +88,25 @@
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
}
+ @Override
+ public void start() {
+ super.start();
+
+ this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
+ clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
+ new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler());
+ }
+ });
+
+ applyOptions(clientBootstrap);
+
+ startUpHouseKeepingService();
+ }
+
private void applyOptions(Bootstrap bootstrap) {
if (null != clientConfig) {
if (clientConfig.getTcpSoLinger() > 0) {
@@ -111,25 +130,6 @@
}
@Override
- public void start() {
- super.start();
-
- this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
- clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
- new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler());
- }
- });
-
- applyOptions(clientBootstrap);
-
- startUpHouseKeepingService();
- }
-
- @Override
public void stop() {
// try {
ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
@@ -200,74 +200,6 @@
}
}
- private Channel createIfAbsent(final String addr) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isActive()) {
- return cw.getChannel();
- }
- return this.createChannel(addr);
- }
-
- //FIXME need test to verify
- private Channel createChannel(final String addr) {
- ChannelWrapper cw = null;
- try {
- if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- boolean createNewConnection;
- cw = this.channelTables.get(addr);
- if (cw != null) {
- if (cw.isActive()) {
- return cw.getChannel();
- } else if (!cw.getChannelFuture().isDone()) {
- createNewConnection = false;
- } else {
- this.channelTables.remove(addr);
- createNewConnection = true;
- }
- } else {
- createNewConnection = true;
- }
-
- if (createNewConnection) {
- String[] s = addr.split(":");
- SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
- ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
- LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
- cw = new ChannelWrapper(channelFuture);
- this.channelTables.put(addr, cw);
- }
- } catch (Exception e) {
- LOG.error("createChannel: create channel exception", e);
- } finally {
- this.lockChannelTables.unlock();
- }
- } else {
- LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- if (cw != null) {
- ChannelFuture channelFuture = cw.getChannelFuture();
- if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
- if (cw.isActive()) {
- LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
- return cw.getChannel();
- } else {
- LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
- this.closeChannel(addr, cw.getChannel());
- }
- } else {
- LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
- channelFuture.toString());
- this.closeChannel(addr, cw.getChannel());
- }
- }
- return null;
- }
-
private void closeChannel(final Channel channel) {
if (null == channel)
return;
@@ -344,6 +276,74 @@
}
+ private Channel createIfAbsent(final String addr) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isActive()) {
+ return cw.getChannel();
+ }
+ return this.createChannel(addr);
+ }
+
+ //FIXME need test to verify
+ private Channel createChannel(final String addr) {
+ ChannelWrapper cw = null;
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean createNewConnection;
+ cw = this.channelTables.get(addr);
+ if (cw != null) {
+ if (cw.isActive()) {
+ return cw.getChannel();
+ } else if (!cw.getChannelFuture().isDone()) {
+ createNewConnection = false;
+ } else {
+ this.channelTables.remove(addr);
+ createNewConnection = true;
+ }
+ } else {
+ createNewConnection = true;
+ }
+
+ if (createNewConnection) {
+ String[] s = addr.split(":");
+ SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
+ ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
+ LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+ cw = new ChannelWrapper(channelFuture);
+ this.channelTables.put(addr, cw);
+ }
+ } catch (Exception e) {
+ LOG.error("createChannel: create channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ if (cw != null) {
+ ChannelFuture channelFuture = cw.getChannelFuture();
+ if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
+ if (cw.isActive()) {
+ LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+ return cw.getChannel();
+ } else {
+ LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
+ this.closeChannel(addr, cw.getChannel());
+ }
+ } else {
+ LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
+ channelFuture.toString());
+ this.closeChannel(addr, cw.getChannel());
+ }
+ }
+ return null;
+ }
+
@Override
public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler,
final long timeoutMillis) {
@@ -403,12 +403,6 @@
private class ClientConnectionHandler extends ChannelDuplexHandler {
@Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
- ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
- }
-
- @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise)
throws Exception {
@@ -455,6 +449,12 @@
}
@Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
+ ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index 0d6a2cc..ec8a243 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -94,36 +94,6 @@
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
}
- private void applyOptions(ServerBootstrap bootstrap) {
- //option() is for the NioServerSocketChannel that accepts incoming connections.
- //childOption() is for the Channels accepted by the parent ServerChannel, which is NioServerSocketChannel in this case
- if (null != serverConfig) {
- if (serverConfig.getTcpSoBacklogSize() > 0) {
- bootstrap.option(ChannelOption.SO_BACKLOG, serverConfig.getTcpSoBacklogSize());
- }
-
- if (serverConfig.getTcpSoLinger() > 0) {
- bootstrap.option(ChannelOption.SO_LINGER, serverConfig.getTcpSoLinger());
- }
-
- if (serverConfig.getTcpSoSndBufSize() > 0) {
- bootstrap.childOption(ChannelOption.SO_SNDBUF, serverConfig.getTcpSoSndBufSize());
- }
- if (serverConfig.getTcpSoRcvBufSize() > 0) {
- bootstrap.childOption(ChannelOption.SO_RCVBUF, serverConfig.getTcpSoRcvBufSize());
- }
-
- bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()).
- childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()).
- childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()).
- option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout());
- }
-
- if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
- bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- }
- }
-
@Override
public void start() {
super.start();
@@ -157,6 +127,36 @@
startUpHouseKeepingService();
}
+ private void applyOptions(ServerBootstrap bootstrap) {
+ //option() is for the NioServerSocketChannel that accepts incoming connections.
+ //childOption() is for the Channels accepted by the parent ServerChannel, which is NioServerSocketChannel in this case
+ if (null != serverConfig) {
+ if (serverConfig.getTcpSoBacklogSize() > 0) {
+ bootstrap.option(ChannelOption.SO_BACKLOG, serverConfig.getTcpSoBacklogSize());
+ }
+
+ if (serverConfig.getTcpSoLinger() > 0) {
+ bootstrap.option(ChannelOption.SO_LINGER, serverConfig.getTcpSoLinger());
+ }
+
+ if (serverConfig.getTcpSoSndBufSize() > 0) {
+ bootstrap.childOption(ChannelOption.SO_SNDBUF, serverConfig.getTcpSoSndBufSize());
+ }
+ if (serverConfig.getTcpSoRcvBufSize() > 0) {
+ bootstrap.childOption(ChannelOption.SO_RCVBUF, serverConfig.getTcpSoRcvBufSize());
+ }
+
+ bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()).
+ childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()).
+ childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()).
+ option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout());
+ }
+
+ if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
+ bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ }
+ }
+
@Override
public void stop() {
try {
@@ -203,12 +203,6 @@
private class ServerConnectionHandler extends ChannelDuplexHandler {
@Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
- ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
- }
-
- @Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
@@ -249,6 +243,12 @@
}
@Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
+ ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
+ }
+
+ @Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java
index c298ce7..0688dfa 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java
@@ -34,6 +34,26 @@
}
/**
+ * Compare two two-dimensional byte arrays. No null checks are performed.
+ *
+ * @param left the first byte array
+ * @param right the second byte array
+ * @return the result of the comparison
+ */
+ public static boolean equals(byte[][] left, byte[][] right) {
+ if (left.length != right.length) {
+ return false;
+ }
+
+ boolean result = true;
+ for (int i = left.length - 1; i >= 0; i--) {
+ result &= ByteUtils.equals(left[i], right[i]);
+ }
+
+ return result;
+ }
+
+ /**
* Compare two byte arrays (perform null checks beforehand).
*
* @param left the first byte array
@@ -59,26 +79,6 @@
}
/**
- * Compare two two-dimensional byte arrays. No null checks are performed.
- *
- * @param left the first byte array
- * @param right the second byte array
- * @return the result of the comparison
- */
- public static boolean equals(byte[][] left, byte[][] right) {
- if (left.length != right.length) {
- return false;
- }
-
- boolean result = true;
- for (int i = left.length - 1; i >= 0; i--) {
- result &= ByteUtils.equals(left[i], right[i]);
- }
-
- return result;
- }
-
- /**
* Compare two three-dimensional byte arrays. No null checks are performed.
*
* @param left the first byte array
@@ -104,16 +104,16 @@
}
/**
- * Computes a hashcode based on the contents of a one-dimensional byte array
- * rather than its identity.
+ * Computes a hashcode based on the contents of a three-dimensional byte
+ * array rather than its identity.
*
* @param array the array to compute the hashcode of
* @return the hashcode
*/
- public static int deepHashCode(byte[] array) {
+ public static int deepHashCode(byte[][][] array) {
int result = 1;
for (int i = 0; i < array.length; i++) {
- result = 31 * result + array[i];
+ result = 31 * result + deepHashCode(array[i]);
}
return result;
}
@@ -134,16 +134,16 @@
}
/**
- * Computes a hashcode based on the contents of a three-dimensional byte
- * array rather than its identity.
+ * Computes a hashcode based on the contents of a one-dimensional byte array
+ * rather than its identity.
*
* @param array the array to compute the hashcode of
* @return the hashcode
*/
- public static int deepHashCode(byte[][][] array) {
+ public static int deepHashCode(byte[] array) {
int result = 1;
for (int i = 0; i < array.length; i++) {
- result = 31 * result + deepHashCode(array[i]);
+ result = 31 * result + array[i];
}
return result;
}
@@ -281,7 +281,7 @@
*
* @param x1 the first array
* @param x2 the second array
- * @return (x2||x1) (little-endian order, i.e. x1 is at lower memory
+ * @return (x2 | | x1) (little-endian order, i.e. x1 is at lower memory
* addresses)
*/
public static byte[] concatenate(byte[] x1, byte[] x2) {
@@ -339,6 +339,18 @@
*
* @param input the input byte array
* @param start the start index
+ * @return a subarray of <tt>input</tt>, ranging from <tt>start</tt> to
+ * the end of the array
+ */
+ public static byte[] subArray(byte[] input, int start) {
+ return subArray(input, start, input.length);
+ }
+
+ /**
+ * Generate a subarray of a given byte array.
+ *
+ * @param input the input byte array
+ * @param start the start index
* @param end the end index
* @return a subarray of <tt>input</tt>, ranging from <tt>start</tt>
* (inclusively) to <tt>end</tt> (exclusively)
@@ -350,18 +362,6 @@
}
/**
- * Generate a subarray of a given byte array.
- *
- * @param input the input byte array
- * @param start the start index
- * @return a subarray of <tt>input</tt>, ranging from <tt>start</tt> to
- * the end of the array
- */
- public static byte[] subArray(byte[] input, int start) {
- return subArray(input, start, input.length);
- }
-
- /**
* Rewrite a byte array as a char array
*
* @param input -
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java
index 6386ca0..dc17d05 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java
@@ -28,25 +28,6 @@
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
/**
- * <p>Gets the stack trace from a Throwable as a String.</p>
- *
- * <p>The result of this method vary by JDK version as this method
- * uses {@link Throwable#printStackTrace(java.io.PrintWriter)}.
- * On JDK1.3 and earlier, the cause exception will not be shown
- * unless the specified throwable alters printStackTrace.</p>
- *
- * @param throwable the <code>Throwable</code> to be examined
- * @return the stack trace as generated by the exception's
- * <code>printStackTrace(PrintWriter)</code> method
- */
- public static String getStackTrace(final Throwable throwable) {
- final StringWriter sw = new StringWriter();
- final PrintWriter pw = new PrintWriter(sw, true);
- throwable.printStackTrace(pw);
- return sw.getBuffer().toString();
- }
-
- /**
* <p>Produces a <code>List</code> of stack frames - the message
* is not included. Only the trace of the specified exception is
* returned, any caused by trace is stripped.</p>
@@ -77,4 +58,23 @@
}
return list;
}
+
+ /**
+ * <p>Gets the stack trace from a Throwable as a String.</p>
+ *
+ * <p>The result of this method vary by JDK version as this method
+ * uses {@link Throwable#printStackTrace(java.io.PrintWriter)}.
+ * On JDK1.3 and earlier, the cause exception will not be shown
+ * unless the specified throwable alters printStackTrace.</p>
+ *
+ * @param throwable the <code>Throwable</code> to be examined
+ * @return the stack trace as generated by the exception's
+ * <code>printStackTrace(PrintWriter)</code> method
+ */
+ public static String getStackTrace(final Throwable throwable) {
+ final StringWriter sw = new StringWriter();
+ final PrintWriter pw = new PrintWriter(sw, true);
+ throwable.printStackTrace(pw);
+ return sw.getBuffer().toString();
+ }
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java
index a4b1293..b64ab10 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java
@@ -55,21 +55,13 @@
counter = 0;
}
- public static UIDGenerator instance() {
- return generatorLocal.get();
- }
-
- public String createUID() {
- long current = System.currentTimeMillis();
- if (current >= nextStartTime) {
- setStartTime(current);
- }
- buffer.position(0);
- sb.setLength(basePos);
- buffer.putInt((int) (System.currentTimeMillis() - startTime));
- buffer.putShort(counter++);
- sb.append(ByteUtils.toHexString(buffer.array()));
- return sb.toString();
+ public byte[] createFakeIP() {
+ ByteBuffer bb = ByteBuffer.allocate(8);
+ bb.putLong(System.currentTimeMillis());
+ bb.position(4);
+ byte[] fakeIP = new byte[4];
+ bb.get(fakeIP);
+ return fakeIP;
}
private void setStartTime(long millis) {
@@ -85,13 +77,21 @@
nextStartTime = cal.getTimeInMillis();
}
- public byte[] createFakeIP() {
- ByteBuffer bb = ByteBuffer.allocate(8);
- bb.putLong(System.currentTimeMillis());
- bb.position(4);
- byte[] fakeIP = new byte[4];
- bb.get(fakeIP);
- return fakeIP;
+ public static UIDGenerator instance() {
+ return generatorLocal.get();
+ }
+
+ public String createUID() {
+ long current = System.currentTimeMillis();
+ if (current >= nextStartTime) {
+ setStartTime(current);
+ }
+ buffer.position(0);
+ sb.setLength(basePos);
+ buffer.putInt((int) (System.currentTimeMillis() - startTime));
+ buffer.putShort(counter++);
+ sb.append(ByteUtils.toHexString(buffer.array()));
+ return sb.toString();
}
}