blob: 5d967c1c6b2663071a718cb98da2fa7a6f8d212e [file] [log] [blame]
/*
* Copyright 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Author: yeputons@google.com (Egor Suvorov)
#ifndef PAGESPEED_SYSTEM_REDIS_CACHE_H_
#define PAGESPEED_SYSTEM_REDIS_CACHE_H_
#include <stdarg.h>
#include <memory>
#include <initializer_list>
#include <map>
#include <vector>
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/message_handler.h"
#include "pagespeed/kernel/base/scoped_ptr.h"
#include "pagespeed/kernel/base/shared_string.h"
#include "pagespeed/kernel/base/statistics.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/kernel/base/string.h"
#include "pagespeed/kernel/base/thread_annotations.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/base/timer.h"
#include "pagespeed/kernel/cache/cache_interface.h"
#include "pagespeed/kernel/thread/thread_synchronizer.h"
#include "pagespeed/system/external_server_spec.h"
#include "third_party/hiredis/src/hiredis.h"
namespace net_instaweb {
// Interface to Redis using hiredis library. This implementation is blocking and
// thread-safe. One should call StartUp() before making any requests and
// ShutDown() after finishing work. Connecting will be initiated on StartUp()
// call. If it fails there or is dropped after any request, the following
// reconnection strategy is used:
// 1. If an operation fails because of communication or protocol error, try
// reconnecting on the next Get/Put/Delete (without delay).
// 2. If (re-)connection attempt is unsuccessfull, try again on next
// Get/Put/Delete operation, but not until at least reconnection_delay_ms_
// have passed from the previous attempt.
// That ensures that we do not try to connect to unreachable server a lot, but
// still allows us to reconnect quickly in case of network glitches.
//
// See redis_cache.cc for details on different locks that the class has.
//
// When using clustering, to be efficient we need to know what redis cluster
// machine to talk to for a given key. There are two steps:
// 1) hash the key in a redis-specific way determine the hash slot.
// 2) consult our cached copy of the slot-to-server mapping to see which
// server to send to.
// If we get a redirection after following that approach, our cached mapping
// is invalid and should be refreshed.
//
// http://redis.io/topics/cluster-spec explains this all.
//
// TODO(yeputons): consider extracting a common interface with AprMemCache.
// TODO(yeputons): consider making Redis-reported errors treated as failures.
// TODO(yeputons): add redis AUTH command support.
class RedisCache : public CacheInterface {
public:
// Uses ThreadSystem to generate several mutexes in constructor only.
// Does not take ownership of MessageHandler, Timer, and assumes
// that these pointers are valid throughout full lifetime of RedisCache.
RedisCache(StringPiece host, int port, ThreadSystem* thread_system,
MessageHandler* message_handler, Timer* timer,
int64 reconnection_delay_ms, int64 timeout_us,
Statistics* stats, int database_index);
~RedisCache() override { ShutDown(); }
static void InitStats(Statistics* stats);
GoogleString ServerDescription() const;
void StartUp(bool connect_now = true);
// CacheInterface implementations.
GoogleString Name() const override { return FormatName(); }
bool IsBlocking() const override { return true; }
bool IsHealthy() const override;
void ShutDown() override;
static GoogleString FormatName() { return "RedisCache"; }
// CacheInterface implementations.
void Get(const GoogleString& key, Callback* callback) override;
void Put(const GoogleString& key, const SharedString& value) override;
void Delete(const GoogleString& key) override;
// Appends detailed status for each server to a string. If a server fails to
// report a status, then for that server we append an error message instead.
void GetStatus(GoogleString* status_string);
// Redis spec defined hasher for keys. Static, since it's a pure function.
static int HashSlot(StringPiece key);
// Total number of times we hit one server and were redirected to a different
// one.
int64 Redirections() {
return redirections_->Get();
}
// Total number of times we looked up the cluster slots mappings to avoid
// redirections.
int64 ClusterSlotsFetches() {
return cluster_slots_fetches_->Get();
}
private:
struct RedisReplyDeleter {
void operator()(redisReply* ptr) {
if (ptr != nullptr) { // hiredis 0.11 does not check.
freeReplyObject(ptr);
}
}
};
typedef std::unique_ptr<redisReply, RedisReplyDeleter> RedisReply;
struct RedisContextDeleter {
void operator()(redisContext* ptr) {
if (ptr != nullptr) { // hiredis 0.11 does not check.
redisFree(ptr);
}
}
};
typedef std::unique_ptr<redisContext, RedisContextDeleter> RedisContext;
class Connection {
public:
Connection(RedisCache* redis_cache, StringPiece host, int port,
int database_index);
void StartUp(bool connect_now = true)
LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
bool IsHealthy() const LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
void ShutDown() LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
GoogleString ToString() {
return StrCat(host_, ":", IntegerToString(port_));
}
AbstractMutex* GetOperationMutex() const LOCK_RETURNED(redis_mutex_) {
return redis_mutex_.get();
}
// Must be followed by ValidateRedisReply() under the same lock.
RedisReply RedisCommand(const char* format, va_list args)
EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_) LOCKS_EXCLUDED(state_mutex_);
RedisReply RedisCommand(const char* format, ...)
EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_) LOCKS_EXCLUDED(state_mutex_);
bool ValidateRedisReply(const RedisReply& reply,
std::initializer_list<int> valid_types,
const char* command_executed)
EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_) LOCKS_EXCLUDED(state_mutex_);
private:
enum State {
kShutDown,
kDisconnected,
kConnecting,
kConnected
};
bool IsHealthyLockHeld() const EXCLUSIVE_LOCKS_REQUIRED(state_mutex_);
void UpdateState() EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_, state_mutex_);
// connects with redis as well as selects redis database
bool EnsureConnectionAndDatabaseSelection()
EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_)
LOCKS_EXCLUDED(state_mutex_);
bool EnsureConnection() EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_)
LOCKS_EXCLUDED(state_mutex_);
bool EnsureDatabaseSelection() EXCLUSIVE_LOCKS_REQUIRED(state_mutex_)
LOCKS_EXCLUDED(state_mutex_);
RedisContext TryConnect() LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
void LogRedisContextError(redisContext* redis, const char* cause);
const RedisCache* redis_cache_;
const GoogleString host_;
const int port_;
const scoped_ptr<AbstractMutex> redis_mutex_;
const scoped_ptr<AbstractMutex> state_mutex_;
RedisContext redis_ GUARDED_BY(redis_mutex_);
State state_ GUARDED_BY(state_mutex_);
int64 next_reconnect_at_ms_ GUARDED_BY(state_mutex_);
// selected database is a property of the connection,
// should re-select it on reconnection
const int database_index_;
DISALLOW_COPY_AND_ASSIGN(Connection);
};
typedef std::map<GoogleString, std::unique_ptr<Connection>> ConnectionsMap;
struct ClusterMapping {
// We only ever add connections, so it's ok for us to save raw pointers.
ClusterMapping(int start_slot_range,
int end_slot_range,
Connection* connection) :
start_slot_range_(start_slot_range),
end_slot_range_(end_slot_range),
connection_(connection) {}
int start_slot_range_;
int end_slot_range_;
Connection* connection_;
};
// Performs specified command, handling all reconnections, redirections
// from Redis Cluster, and other stuff. Then it checks that Redis' reply
// has expected type and returns the reply object. If anything fails,
// returns nullptr.
//
// TODO(yeputons): it's expected that destruction of RedisReply is
// thread-safe, which is true as of hiredis 0.13. Otherwise it's wrong to
// return RedisReply here because lock is released on exit.
// See https://github.com/redis/hiredis/issues/465
RedisReply RedisCommand(Connection* connection, const char* format,
std::initializer_list<int> valid_reply_types, ...);
ThreadSynchronizer* GetThreadSynchronizerForTesting() const {
return thread_synchronizer_.get();
}
ExternalServerSpec ParseRedirectionError(StringPiece error);
// Must not be called under Connection::GetOperationLock(), that will cause
// lock inversion and potential theoretical deadlock.
Connection* GetOrCreateConnection(ExternalServerSpec spec,
const int database_index);
// Ask redis what keys should go to which servers.
void FetchClusterSlotMapping(Connection* connection)
LOCKS_EXCLUDED(cluster_map_lock_);
// If we have a cached copy of the connection slot mapping from
// LookupClusterSlots then return the connection this key goes with. If we
// don't have it, or if this slot isn't in the mapping, return
// main_connection_.
Connection* LookupConnection(StringPiece key)
LOCKS_EXCLUDED(cluster_map_lock_);
const GoogleString main_host_;
const int main_port_;
ThreadSystem* thread_system_;
MessageHandler* message_handler_;
Timer* timer_;
const int64 reconnection_delay_ms_;
const int64 timeout_us_;
const scoped_ptr<ThreadSynchronizer> thread_synchronizer_;
const scoped_ptr<ThreadSystem::RWLock> connections_lock_;
const scoped_ptr<ThreadSystem::RWLock> cluster_map_lock_;
Variable* redirections_;
Variable* cluster_slots_fetches_;
// It's expected that connections are only added to the map. That way we can
// safely use raw pointers to them during RedisCache lifetime.
ConnectionsMap connections_ GUARDED_BY(connections_lock_);
std::vector<ClusterMapping> cluster_mappings_ GUARDED_BY(cluster_map_lock_);
// Not guarded, but should only be modified in StartUp().
Connection* main_connection_;
const int database_index_;
friend class RedisCacheTest;
DISALLOW_COPY_AND_ASSIGN(RedisCache);
};
} // namespace net_instaweb
#endif // PAGESPEED_SYSTEM_REDIS_CACHE_H_