blob: 45cd68a1f008fc4e5d28402c8f70cfa89080cff9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_COMMON_UTIL_H_
#define LIB_COMMON_UTIL_H_
#include "hdfspp/status.h"
#include "common/logging.h"
#include <mutex>
#include <memory>
#include <string>
#include <boost/asio/ip/tcp.hpp>
#include <boost/system/error_code.hpp>
#include <openssl/rand.h>
#include <google/protobuf/io/coded_stream.h>
namespace google {
namespace protobuf {
class MessageLite;
}
}
namespace hdfs {
// typedefs based on code that's repeated everywhere
typedef std::lock_guard<std::mutex> mutex_guard;
Status ToStatus(const boost::system::error_code &ec);
// Determine size of buffer that needs to be allocated in order to serialize msg
// in delimited format
size_t DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg);
// Construct msg from the input held in the CodedInputStream
// return false on failure, otherwise return true
bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
::google::protobuf::MessageLite *msg);
// Serialize msg into a delimited form (java protobuf compatible)
// err, if not null, will be set to false on failure
std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageLite *msg,
bool *err);
std::string Base64Encode(const std::string &src);
// Return a new high-entropy client name
std::shared_ptr<std::string> GetRandomClientName();
// Returns true if _someone_ is holding the lock (not necessarily this thread,
// but a std::mutex doesn't track which thread is holding the lock)
template<class T>
bool lock_held(T & mutex) {
bool result = !mutex.try_lock();
if (!result)
mutex.unlock();
return result;
}
// Shutdown and close a socket safely; will check if the socket is open and
// catch anything thrown by asio.
// Returns a string containing error message on failure, otherwise an empty string.
std::string SafeDisconnect(boost::asio::ip::tcp::socket *sock);
// The following helper function is used for classes that look like the following:
//
// template <typename socket_like_object>
// class ObjectThatHoldsSocket {
// socket_like_object sock_;
// void DoSomethingWithAsioTcpSocket();
// }
//
// The trick here is that ObjectThatHoldsSocket may be templated on a mock socket
// in mock tests. If you have a method that explicitly needs to call some asio
// method unrelated to the mock test you need a way of making sure socket_like_object
// is, in fact, an asio::ip::tcp::socket. Otherwise the mocks need to implement
// lots of no-op boilerplate. This will return the value of the input param if
// it's a asio socket, and nullptr if it's anything else.
template <typename sock_t>
inline boost::asio::ip::tcp::socket *get_asio_socket_ptr(sock_t *s) {
(void)s;
return nullptr;
}
template<>
inline boost::asio::ip::tcp::socket *get_asio_socket_ptr<boost::asio::ip::tcp::socket>
(boost::asio::ip::tcp::socket *s) {
return s;
}
//Check if the high bit is set
bool IsHighBitSet(uint64_t num);
// Provide a way to do an atomic swap on a callback.
// SetCallback, AtomicSwapCallback, and GetCallback can only be called once each.
// AtomicSwapCallback and GetCallback must only be called after SetCallback.
//
// We can't throw on error, and since the callback is templated it's tricky to
// generate generic dummy callbacks. Complain loudly in the log and get good
// test coverage. It shouldn't be too hard to avoid invalid states.
template <typename CallbackType>
class SwappableCallbackHolder {
private:
std::mutex state_lock_;
CallbackType callback_;
bool callback_set_ = false;
bool callback_swapped_ = false;
bool callback_accessed_ = false;
public:
bool IsCallbackSet() {
mutex_guard swap_lock(state_lock_);
return callback_set_;
}
bool IsCallbackAccessed() {
mutex_guard swap_lock(state_lock_);
return callback_accessed_;
}
bool SetCallback(const CallbackType& callback) {
mutex_guard swap_lock(state_lock_);
if(callback_set_ || callback_swapped_ || callback_accessed_) {
LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.")
return false;
}
callback_ = callback;
callback_set_ = true;
return true;
}
CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& swapped) {
mutex_guard swap_lock(state_lock_);
if(!callback_set_ || callback_swapped_) {
LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access invariants.")
swapped = false;
} else if (callback_accessed_) {
// Common case where callback has been invoked but caller may not know
LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback has been accessed");
return callback_;
}
CallbackType old = callback_;
callback_ = replacement;
callback_swapped_ = true;
swapped = true;
return old;
}
CallbackType GetCallback() {
mutex_guard swap_lock(state_lock_);
if(!callback_set_ || callback_accessed_) {
LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.")
}
callback_accessed_ = true;
return callback_;
}
};
}
#endif