blob: 25b0aa29367520caf958f4ee656be1018f0346e2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.messaging.netty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SaslStormClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory
private final long start_time;
private final ISaslClient client;
* Used for client or server's token to send or receive from each other.
private byte[] token;
private String name;
public SaslStormClientHandler(ISaslClient client) throws IOException {
this.client = client;
start_time = System.currentTimeMillis();
public void channelActive(ChannelHandlerContext ctx) {
Channel channel =;"Connection established from " + channel.localAddress()
+ " to " + channel.remoteAddress());
try {
SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
if (saslNettyClient == null) {
LOG.debug("Creating saslNettyClient now " + "for channel: "
+ channel);
saslNettyClient = new SaslNettyClient(name, token);
channel.writeAndFlush(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST, channel.voidPromise());
} catch (Exception e) {
LOG.error("Failed to authenticate with server " + "due to error: ",
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
LOG.debug("send/recv time (ms): {}",
(System.currentTimeMillis() - start_time));
// examine the response message from server
if (message instanceof ControlMessage) {
handleControlMessage(ctx, (ControlMessage) message);
} else if (message instanceof SaslMessageToken) {
handleSaslMessageToken(ctx, (SaslMessageToken) message);
} else {
LOG.error("Unexpected message from server: {}", message);
private SaslNettyClient getChannelSaslNettyClient(Channel channel) throws Exception {
// Generate SASL response to server using Channel-local SASL client.
SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
if (saslNettyClient == null) {
throw new Exception("saslNettyClient was unexpectedly "
+ "null for channel: " + channel);
return saslNettyClient;
private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage controlMessage) throws Exception {
SaslNettyClient saslNettyClient = getChannelSaslNettyClient(;
if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
LOG.debug("Server has sent us the SaslComplete "
+ "message. Allowing normal work to proceed.");
if (!saslNettyClient.isComplete()) {
LOG.error("Server returned a Sasl-complete message, "
+ "but as far as we can tell, we are not authenticated yet.");
throw new Exception("Server returned a "
+ "Sasl-complete message, but as far as "
+ "we can tell, we are not authenticated yet.");
// We call fireMessageRead since the client is allowed to
// perform this request. The client's request will now proceed
// to the next pipeline component namely StormClientHandler.
} else {
LOG.warn("Unexpected control message: {}", controlMessage);
private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
Channel channel =;
SaslNettyClient saslNettyClient = getChannelSaslNettyClient(channel);
LOG.debug("Responding to server's token of length: "
+ saslMessageToken.getSaslToken().length);
// Generate SASL response (but we only actually send the response if
// it's non-null.
byte[] responseToServer = saslNettyClient
if (responseToServer == null) {
// If we generate a null response, then authentication has completed
// (if not, warn), and return without sending a response back to the
// server.
LOG.debug("Response to server is null: "
+ "authentication should now be complete.");
if (!saslNettyClient.isComplete()) {
LOG.warn("Generated a null response, "
+ "but authentication is not complete.");
throw new Exception("Server response is null, but as far as "
+ "we can tell, we are not authenticated yet.");
} else {
LOG.debug("Response to server token has length:"
+ responseToServer.length);
// Construct a message containing the SASL response and send it to the
// server.
SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
channel.writeAndFlush(saslResponse, channel.voidPromise());
private void getSASLCredentials() throws IOException {
String secretKey;
name =;
secretKey = client.secretKey();
if (secretKey != null) {
token = secretKey.getBytes();
LOG.debug("SASL credentials for storm topology " + name
+ " is " + secretKey);