| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "rpc_engine.h" |
| #include "rpc_connection.h" |
| #include "common/logging.h" |
| #include "common/optional_wrapper.h" |
| #include "x-platform/syscall.h" |
| |
| #include "sasl_engine.h" |
| #include "sasl_protocol.h" |
| |
| #if defined USE_SASL |
| #if defined USE_CYRUS_SASL |
| #include "cyrus_sasl_engine.h" // CySaslEngine() |
| #elif defined USE_GSASL |
| #include "gsasl_engine.h" // GSaslEngine() |
| #else |
| #error USE_SASL defined but no engine (USE_GSASL) defined |
| #endif |
| #endif |
| |
| namespace hdfs { |
| |
| using namespace hadoop::common; |
| using namespace google::protobuf; |
| |
| /***** |
| * Threading model: all entry points need to acquire the sasl_lock before accessing |
| * members of the class |
| * |
| * Lifecycle model: asio may have outstanding callbacks into this class for arbitrary |
| * amounts of time, so any references to the class must be shared_ptr's. The |
| * SASLProtocol keeps a weak_ptr to the owning RpcConnection, which might go away, |
| * so the weak_ptr should be locked only long enough to make callbacks into the |
| * RpcConnection. |
| */ |
| |
| SaslProtocol::SaslProtocol(const std::string & cluster_name, |
| const AuthInfo & auth_info, |
| std::shared_ptr<RpcConnection> connection) : |
| state_(kUnstarted), |
| cluster_name_(cluster_name), |
| auth_info_(auth_info), |
| connection_(connection) |
| { |
| } |
| |
| SaslProtocol::~SaslProtocol() |
| { |
| assert(state_ != kNegotiate); |
| } |
| |
| void SaslProtocol::SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers) { |
| std::lock_guard<std::mutex> state_lock(sasl_state_lock_); |
| event_handlers_ = event_handlers; |
| } // SetEventHandlers() method |
| |
| void SaslProtocol::Authenticate(std::function<void(const Status & status, const AuthInfo new_auth_info)> callback) |
| { |
| std::lock_guard<std::mutex> state_lock(sasl_state_lock_); |
| |
| callback_ = callback; |
| state_ = kNegotiate; |
| event_handlers_->call("SASL Start", cluster_name_.c_str(), 0); |
| |
| std::shared_ptr<RpcSaslProto> req_msg = std::make_shared<RpcSaslProto>(); |
| req_msg->set_state(RpcSaslProto_SaslState_NEGOTIATE); |
| |
| // We cheat here since this is always called while holding the RpcConnection's lock |
| std::shared_ptr<RpcConnection> connection = connection_.lock(); |
| if (!connection) { |
| AuthComplete(Status::AuthenticationFailed("Lost RPC Connection"), AuthInfo()); |
| return; |
| } |
| |
| std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>(); |
| auto self(shared_from_this()); |
| connection->AsyncRpc_locked(SASL_METHOD_NAME, req_msg.get(), resp_msg, |
| [self, req_msg, resp_msg, connection] (const Status & status) { |
| assert(connection); |
| self->OnServerResponse(status, resp_msg.get()); |
| }); |
| } // authenticate() method |
| |
| AuthInfo::AuthMethod ParseMethod(const std::string & method) { |
| if (XPlatform::Syscall::StringCompareIgnoreCase(method, "SIMPLE")) { |
| return AuthInfo::kSimple; |
| } |
| if (XPlatform::Syscall::StringCompareIgnoreCase(method, "KERBEROS")) { |
| return AuthInfo::kKerberos; |
| } |
| if (XPlatform::Syscall::StringCompareIgnoreCase(method, "TOKEN")) { |
| return AuthInfo::kToken; |
| } |
| return AuthInfo::kUnknownAuth; |
| } // ParseMethod() |
| |
| // build_init_msg(): |
| // Helper function for Start(), to keep |
| // these ProtoBuf-RPC calls out of the sasl_engine code. |
| |
| std::pair<Status, RpcSaslProto> |
| SaslProtocol::BuildInitMessage(std::string & token, const hadoop::common::RpcSaslProto * negotiate_msg) |
| { |
| // The init message needs one of the RpcSaslProto_SaslAuth structures that |
| // was sent in the negotiate message as our chosen mechanism |
| // Map the chosen_mech name back to the RpcSaslProto_SaslAuth from the negotiate |
| // message that it corresponds to |
| SaslMethod & chosenMech = sasl_engine_->chosen_mech_; |
| auto auths = negotiate_msg->auths(); |
| auto pb_auth_it = std::find_if(auths.begin(), auths.end(), |
| [chosenMech](const RpcSaslProto_SaslAuth & data) |
| { |
| return data.mechanism() == chosenMech.mechanism; |
| }); |
| if (pb_auth_it == auths.end()) |
| return std::make_pair(Status::Error("Couldn't find mechanism in negotiate msg"), RpcSaslProto()); |
| auto & pb_auth = *pb_auth_it; |
| |
| // Prepare INITIATE message |
| RpcSaslProto initiate = RpcSaslProto(); |
| |
| initiate.set_state(RpcSaslProto_SaslState_INITIATE); |
| // initiate message will contain: |
| // token_ (binary data), and |
| // auths_[ ], an array of objects just like pb_auth. |
| // In our case, we want the auths array |
| // to hold just the single element from pb_auth: |
| |
| RpcSaslProto_SaslAuth * respAuth = initiate.add_auths(); |
| respAuth->CopyFrom(pb_auth); |
| |
| // Mostly, an INITIATE message contains a "Token". |
| // For GSSAPI, the token is a Kerberos AP_REQ, aka |
| // "Authenticated application request," comprising |
| // the client's application ticket & and an encrypted |
| // message that Kerberos calls an "authenticator". |
| |
| if (token.empty()) { |
| const char * errmsg = "SaslProtocol::build_init_msg(): No token available."; |
| LOG_ERROR(kRPC, << errmsg); |
| return std::make_pair(Status::Error(errmsg), RpcSaslProto()); |
| } |
| |
| // add challenge token to the INITIATE message: |
| initiate.set_token(token); |
| |
| // the initiate message is ready to send: |
| return std::make_pair(Status::OK(), initiate); |
| } // build_init_msg() |
| |
| // Converts the RpcSaslProto.auths ararray from RpcSaslProto_SaslAuth PB |
| // structures to SaslMethod structures |
| static bool |
| extract_auths(std::vector<SaslMethod> & resp_auths, |
| const hadoop::common::RpcSaslProto * response) { |
| |
| bool simple_avail = false; |
| auto pb_auths = response->auths(); |
| |
| // For our GSSAPI case, an element of pb_auths contains: |
| // method_ = "KERBEROS" |
| // mechanism_ = "GSSAPI" |
| // protocol_ = "nn" /* "name node", AKA "hdfs" |
| // serverid_ = "foobar1.acmecorp.com" |
| // challenge_ = "" |
| // _cached_size_ = 0 |
| // _has_bits_ = 15 |
| |
| for (int i = 0; i < pb_auths.size(); ++i) { |
| auto pb_auth = pb_auths.Get(i); |
| AuthInfo::AuthMethod method = ParseMethod(pb_auth.method()); |
| |
| switch(method) { |
| case AuthInfo::kToken: |
| case AuthInfo::kKerberos: { |
| SaslMethod new_method; |
| new_method.mechanism = pb_auth.mechanism(); |
| new_method.protocol = pb_auth.protocol(); |
| new_method.serverid = pb_auth.serverid(); |
| new_method.challenge = pb_auth.has_challenge() ? |
| pb_auth.challenge() : ""; |
| resp_auths.push_back(new_method); |
| } |
| break; |
| case AuthInfo::kSimple: |
| simple_avail = true; |
| break; |
| case AuthInfo::kUnknownAuth: |
| LOG_WARN(kRPC, << "Unknown auth method " << pb_auth.method() << "; ignoring"); |
| break; |
| default: |
| LOG_WARN(kRPC, << "Invalid auth type: " << method << "; ignoring"); |
| break; |
| } |
| } // for |
| return simple_avail; |
| } // extract_auths() |
| |
| void SaslProtocol::ResetEngine() { |
| #if defined USE_SASL |
| #if defined USE_CYRUS_SASL |
| sasl_engine_.reset(new CySaslEngine()); |
| #elif defined USE_GSASL |
| sasl_engine_.reset(new GSaslEngine()); |
| #else |
| #error USE_SASL defined but no engine (USE_GSASL) defined |
| #endif |
| #endif |
| return; |
| } // Reset_Engine() method |
| |
| void SaslProtocol::Negotiate(const hadoop::common::RpcSaslProto * response) |
| { |
| this->ResetEngine(); // get a new SaslEngine |
| |
| if (auth_info_.getToken()) { |
| sasl_engine_->SetPasswordInfo(auth_info_.getToken().value().identifier, |
| auth_info_.getToken().value().password); |
| } |
| sasl_engine_->SetKerberosInfo(auth_info_.getUser()); // TODO: map to principal? |
| |
| // Copy the response's auths list to an array of SaslMethod objects. |
| // SaslEngine shouldn't need to know about the protobuf classes. |
| std::vector<SaslMethod> resp_auths; |
| bool simple_available = extract_auths(resp_auths, response); |
| bool mech_chosen = sasl_engine_->ChooseMech(resp_auths); |
| |
| if (mech_chosen) { |
| |
| // Prepare an INITIATE message, |
| // later on we'll send it to the hdfs server: |
| auto start_result = sasl_engine_->Start(); |
| Status status = start_result.first; |
| if (! status.ok()) { |
| // start() failed, simple isn't avail, |
| // so give up & stop authentication: |
| AuthComplete(status, auth_info_); |
| return; |
| } |
| // token.second is a binary buffer, containing |
| // client credentials that will prove the |
| // client's identity to the application server. |
| // Put the token into an INITIATE message: |
| auto init = BuildInitMessage(start_result.second, response); |
| |
| // If all is OK, send the INITIATE msg to the hdfs server; |
| // Otherwise, if possible, fail over to simple authentication: |
| status = init.first; |
| if (status.ok()) { |
| SendSaslMessage(init.second); |
| return; |
| } |
| if (!simple_available) { |
| // build_init_msg() failed, simple isn't avail, |
| // so give up & stop authentication: |
| AuthComplete(status, auth_info_); |
| return; |
| } |
| // If simple IS available, fall through to below, |
| // but without build_init_msg()'s failure-status. |
| } |
| |
| // There were no resp_auths, or the SaslEngine couldn't make one work |
| if (simple_available) { |
| // Simple was the only one we could use. That's OK. |
| AuthComplete(Status::OK(), auth_info_); |
| return; |
| } else { |
| // We didn't understand any of the resp_auths; |
| // Give back some information |
| std::stringstream ss; |
| ss << "Client cannot authenticate via: "; |
| |
| auto pb_auths = response->auths(); |
| for (int i = 0; i < pb_auths.size(); ++i) { |
| const RpcSaslProto_SaslAuth & pb_auth = pb_auths.Get(i); |
| ss << pb_auth.mechanism() << ", "; |
| } |
| |
| AuthComplete(Status::Error(ss.str().c_str()), auth_info_); |
| return; |
| } |
| } // Negotiate() method |
| |
| void SaslProtocol::Challenge(const hadoop::common::RpcSaslProto * challenge) |
| { |
| if (!sasl_engine_) { |
| AuthComplete(Status::Error("Received challenge before negotiate"), auth_info_); |
| return; |
| } |
| |
| RpcSaslProto response; |
| response.CopyFrom(*challenge); |
| response.set_state(RpcSaslProto_SaslState_RESPONSE); |
| |
| std::string challenge_token = challenge->has_token() ? challenge->token() : ""; |
| auto sasl_response = sasl_engine_->Step(challenge_token); |
| |
| if (sasl_response.first.ok()) { |
| response.set_token(sasl_response.second); |
| |
| std::shared_ptr<RpcSaslProto> return_msg = std::make_shared<RpcSaslProto>(); |
| SendSaslMessage(response); |
| } else { |
| AuthComplete(sasl_response.first, auth_info_); |
| return; |
| } |
| } // Challenge() method |
| |
| bool SaslProtocol::SendSaslMessage(RpcSaslProto & message) |
| { |
| assert(lock_held(sasl_state_lock_)); // Must be holding lock before calling |
| |
| // RpcConnection might have been freed when we weren't looking. Lock it |
| // to make sure it's there long enough for us |
| std::shared_ptr<RpcConnection> connection = connection_.lock(); |
| if (!connection) { |
| LOG_DEBUG(kRPC, << "Tried sending a SASL Message but the RPC connection was gone"); |
| AuthComplete(Status::AuthenticationFailed("Lost RPC Connection"), AuthInfo()); |
| return false; |
| } |
| |
| std::shared_ptr<RpcSaslProto> resp_msg = std::make_shared<RpcSaslProto>(); |
| auto self(shared_from_this()); |
| connection->AsyncRpc(SASL_METHOD_NAME, &message, resp_msg, |
| [self, resp_msg, connection] (const Status & status) { |
| assert(connection); |
| self->OnServerResponse(status, resp_msg.get()); |
| }); |
| |
| return true; |
| } // SendSaslMessage() method |
| |
| // AuthComplete(): stop the auth effort, successful ot not: |
| bool SaslProtocol::AuthComplete(const Status & status, const AuthInfo & auth_info) |
| { |
| assert(lock_held(sasl_state_lock_)); // Must be holding lock before calling |
| state_ = kComplete; |
| event_handlers_->call("SASL End", cluster_name_.c_str(), 0); |
| |
| // RpcConnection might have been freed when we weren't looking. Lock it |
| // to make sure it's there long enough for us |
| std::shared_ptr<RpcConnection> connection = connection_.lock(); |
| if (!connection) { |
| LOG_DEBUG(kRPC, << "Tried sending an AuthComplete but the RPC connection was gone: " << status.ToString()); |
| return false; |
| } |
| |
| LOG_TRACE(kRPC, << "Received SASL response" << status.ToString()); |
| connection->AuthComplete(status, auth_info); |
| |
| return true; |
| } // AuthComplete() method |
| |
| void SaslProtocol::OnServerResponse(const Status & status, const hadoop::common::RpcSaslProto * response) |
| { |
| |
| std::lock_guard<std::mutex> state_lock(sasl_state_lock_); |
| |
| LOG_TRACE(kRPC, << "Received SASL response: " << status.ToString()); |
| |
| if (status.ok()) { |
| switch(response->state()) { |
| case RpcSaslProto_SaslState_NEGOTIATE: |
| Negotiate(response); |
| break; |
| case RpcSaslProto_SaslState_CHALLENGE: |
| Challenge(response); |
| break; |
| case RpcSaslProto_SaslState_SUCCESS: |
| if (sasl_engine_) { |
| sasl_engine_->Finish(); |
| } |
| AuthComplete(Status::OK(), auth_info_); |
| break; |
| |
| case RpcSaslProto_SaslState_INITIATE: // Server side only |
| case RpcSaslProto_SaslState_RESPONSE: // Server side only |
| case RpcSaslProto_SaslState_WRAP: |
| LOG_ERROR(kRPC, << "Invalid client-side SASL state: " << response->state()); |
| AuthComplete(Status::Error("Invalid client-side state"), auth_info_); |
| break; |
| default: |
| LOG_ERROR(kRPC, << "Unknown client-side SASL state: " << response->state()); |
| AuthComplete(Status::Error("Unknown client-side state"), auth_info_); |
| break; |
| } |
| } else { |
| AuthComplete(status, auth_info_); |
| } |
| } // OnServerResponse() method |
| |
| } // namespace hdfs |