[STORM-3763] send initial message to remote client only after authentication completes (#3390)
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index 8b1183b..58b0975 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -55,7 +55,8 @@
* @param stormId topology ID
* @param port port #
* @param cb The callback to deliver received messages to
- * @param newConnectionResponse Supplier of the initial message to send to new client connections
+ * @param newConnectionResponse Supplier of the initial message to send to new client connections. If authentication
+ * is required, the message will be sent after authentication is complete.
* @return server side connection
*/
IConnection bind(String stormId, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 94c5d75..0e99257 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -65,16 +65,19 @@
private final IConnectionCallback cb;
private final Supplier<Object> newConnectionResponse;
private volatile boolean closing = false;
+ private final boolean isNettyAuthRequired;
/**
* Starts Netty at the given port.
* @param topoConf The topology config
* @param port The port to start Netty at
* @param cb The callback to deliver incoming messages to
- * @param newConnectionResponse The response to send to clients when they connect. Can be null.
+ * @param newConnectionResponse The response to send to clients when they connect. Can be null. If authentication
+ * is required, the message will be sent after authentication is complete.
*/
Server(Map<String, Object> topoConf, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
this.topoConf = topoConf;
+ this.isNettyAuthRequired = (Boolean) topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
this.port = port;
ser = new KryoValuesSerializer(topoConf);
this.cb = cb;
@@ -252,8 +255,9 @@
**/
@Override
public void channelActive(Channel c) {
- if (newConnectionResponse != null) {
- c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+ if (!isNettyAuthRequired) {
+ //if authentication is not required, treat it as authenticated.
+ authenticated(c);
}
allChannels.add(c);
}
@@ -285,6 +289,14 @@
@Override
public void authenticated(Channel c) {
+ if (isNettyAuthRequired) {
+ LOG.debug("The channel {} is active and authenticated", c);
+ } else {
+ LOG.debug("The channel {} is active", c);
+ }
+ if (newConnectionResponse != null) {
+ c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
+ }
}
@Override