blob: 9e15465fbd7b52ea001a03d5598f3aa7c21a57e5 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstring>
#include <cstddef>
#include <cstdlib>
#include <iterator>
#include <algorithm>
#include <ignite/network/codec_data_filter.h>
#include <ignite/network/length_prefix_codec.h>
#include <ignite/network/network.h>
#include <ignite/network/utils.h>
#include <ignite/network/ssl/secure_data_filter.h>
#include "impl/utility.h"
#include "impl/data_router.h"
#include "impl/message.h"
#include "impl/response_status.h"
#include "impl/remote_type_updater.h"
namespace ignite
{
namespace impl
{
namespace thin
{
DataRouter::DataRouter(const ignite::thin::IgniteClientConfiguration& cfg) :
config(cfg),
userThreadPool(cfg.GetUserThreadPoolSize())
{
srand(common::GetRandSeed());
typeUpdater.reset(new net::RemoteTypeUpdater(*this));
typeMgr.SetUpdater(typeUpdater.get());
CollectAddresses(config.GetEndPoints(), ranges);
}
DataRouter::~DataRouter()
{
Close();
}
void DataRouter::Connect()
{
using ignite::thin::SslMode;
if (ranges.empty())
throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_ARGUMENT, "No valid address to connect.");
if (!asyncPool.IsValid())
{
std::vector<network::SP_DataFilter> filters;
if (config.GetSslMode() == SslMode::REQUIRE)
{
network::ssl::EnsureSslLoaded();
network::ssl::SecureConfiguration sslCfg;
sslCfg.caPath = config.GetSslCaFile();
sslCfg.keyPath = config.GetSslKeyFile();
sslCfg.certPath = config.GetSslCertFile();
network::ssl::SP_SecureDataFilter secureFilter(new network::ssl::SecureDataFilter(sslCfg));
filters.push_back(secureFilter);
}
network::SP_CodecFactory codecFactory(new network::LengthPrefixCodecFactory());
network::SP_CodecDataFilter codecFilter(new network::CodecDataFilter(codecFactory));
filters.push_back(codecFilter);
asyncPool = network::MakeAsyncClientPool(filters);
if (!asyncPool.IsValid())
throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Can not create async connection pool");
asyncPool.Get()->SetHandler(this);
}
userThreadPool.Start();
asyncPool.Get()->Start(ranges, config.GetConnectionsLimit());
bool connected = EnsureConnected(config.GetConnectionTimeout());
if (!connected)
throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
"Failed to establish connection with any host.");
}
void DataRouter::Close()
{
if (asyncPool.IsValid())
{
asyncPool.Get()->SetHandler(0);
asyncPool.Get()->Stop();
}
userThreadPool.Stop();
}
bool DataRouter::EnsureConnected(int32_t timeout)
{
common::concurrent::CsLockGuard lock(channelsMutex);
if (!connectedChannels.empty())
return true;
CheckHandshakeErrorLocked();
channelsWaitPoint.WaitFor(channelsMutex, timeout);
CheckHandshakeErrorLocked();
return !connectedChannels.empty();
}
void DataRouter::CheckHandshakeErrorLocked()
{
if (!lastHandshakeError.get())
return;
IgniteError err = *lastHandshakeError;
lastHandshakeError.reset();
throw IgniteError(err);
}
void DataRouter::OnConnectionSuccess(const network::EndPoint& addr, uint64_t id)
{
SP_DataChannel channel(new DataChannel(id, addr, asyncPool, config, typeMgr, *this, userThreadPool));
{
common::concurrent::CsLockGuard lock(channelsMutex);
channels[id] = channel;
}
channel.Get()->StartHandshake();
}
void DataRouter::OnConnectionError(const network::EndPoint& addr, const IgniteError& err)
{
IGNITE_UNUSED(addr);
if (!connectedChannels.empty())
return;
if (err.GetCode() != IgniteError::IGNITE_ERR_SECURE_CONNECTION_FAILURE)
return;
common::concurrent::CsLockGuard lock(channelsMutex);
lastHandshakeError.reset(new IgniteError(err));
channelsWaitPoint.NotifyAll();
}
void DataRouter::OnConnectionClosed(uint64_t id, const IgniteError* err)
{
SP_DataChannel channel;
{
common::concurrent::CsLockGuard lock(channelsMutex);
channel = FindChannelLocked(id);
connectedChannels.erase(id);
InvalidateChannelLocked(channel);
}
if (channel.IsValid())
channel.Get()->FailPendingRequests(err);
}
void DataRouter::OnMessageReceived(uint64_t id, const network::DataBuffer& msg)
{
SP_DataChannel channel = FindChannel(id);
if (channel.IsValid())
channel.Get()->ProcessMessage(msg);
}
void DataRouter::OnMessageSent(uint64_t id)
{
IGNITE_UNUSED(id);
// No-op.
}
void DataRouter::OnHandshakeSuccess(uint64_t id)
{
common::concurrent::CsLockGuard lock(channelsMutex);
connectedChannels.insert(id);
channelsWaitPoint.NotifyAll();
SP_DataChannel channel = FindChannelLocked(id);
if (channel.IsValid())
{
const IgniteNode& node = channel.Get()->GetNode();
if (!node.IsLegacy())
partChannels[node.GetGuid()] = channel;
}
}
void DataRouter::OnHandshakeError(uint64_t id, const IgniteError& err)
{
IGNITE_UNUSED(id);
common::concurrent::CsLockGuard lock(channelsMutex);
lastHandshakeError.reset(new IgniteError(err));
channelsWaitPoint.NotifyAll();
}
void DataRouter::OnNotificationHandlingError(uint64_t id, const IgniteError &err)
{
SP_DataChannel channel = FindChannel(id);
if (channel.IsValid())
channel.Get()->Close(&err);
}
SP_DataChannel DataRouter::SyncMessage(Request &req, Response &rsp)
{
SP_DataChannel channel = GetRandomChannel();
int32_t metaVer = typeMgr.GetVersion();
channel = SyncMessagePreferredChannelNoMetaUpdate(req, rsp, channel);
ProcessMeta(metaVer);
return channel;
}
SP_DataChannel DataRouter::SyncMessage(Request &req, Response &rsp, const Guid &hint)
{
SP_DataChannel channel = GetBestChannel(hint);
int32_t metaVer = typeMgr.GetVersion();
channel = SyncMessagePreferredChannelNoMetaUpdate(req, rsp, channel);
ProcessMeta(metaVer);
return channel;
}
SP_DataChannel DataRouter::SyncMessageNoMetaUpdate(Request &req, Response &rsp)
{
SP_DataChannel channel = GetRandomChannel();
channel = SyncMessagePreferredChannelNoMetaUpdate(req, rsp, channel);
return channel;
}
void DataRouter::ProcessMeta(int32_t metaVer)
{
if (typeMgr.IsUpdatedSince(metaVer))
{
IgniteError err;
if (!typeMgr.ProcessPendingUpdates(err))
throw IgniteError(err);
}
}
void DataRouter::CheckAffinity(Response &rsp)
{
const AffinityTopologyVersion* ver = rsp.GetAffinityTopologyVersion();
if (ver != 0 && config.IsPartitionAwareness())
affinityManager.UpdateAffinity(*ver);
}
SP_DataChannel DataRouter::SyncMessagePreferredChannelNoMetaUpdate(Request &req, Response &rsp,
const SP_DataChannel &preferred)
{
SP_DataChannel channel(preferred);
if (!channel.IsValid())
channel = GetRandomChannel();
if (!channel.IsValid())
{
bool connected = EnsureConnected(config.GetConnectionTimeout());
if (!connected)
throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
"Failed to establish connection with any host.");
channel = GetRandomChannel();
if (!channel.IsValid())
throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE,
"Failed to establish connection with any host.");
}
try
{
channel.Get()->SyncMessage(req, rsp, config.GetConnectionTimeout());
}
catch (IgniteError& err)
{
InvalidateChannel(channel);
std::string msg("Connection failure during command processing. Please re-run command. Cause: ");
msg += err.GetText();
throw IgniteError(IgniteError::IGNITE_ERR_NETWORK_FAILURE, msg.c_str());
}
CheckAffinity(rsp);
return channel;
}
void DataRouter::RefreshAffinityMapping(int32_t cacheId)
{
std::vector<int32_t> ids(1, cacheId);
RefreshAffinityMapping(ids);
}
void DataRouter::RefreshAffinityMapping(const std::vector<int32_t>& cacheIds)
{
std::vector<PartitionAwarenessGroup> groups;
CachePartitionsRequest req(cacheIds);
CachePartitionsResponse rsp(groups);
SyncMessageNoMetaUpdate(req, rsp);
if (rsp.GetStatus() != ResponseStatus::SUCCESS)
throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str());
affinityManager.UpdateAffinity(rsp.GetGroups(), rsp.GetVersion());
}
affinity::SP_AffinityAssignment DataRouter::GetAffinityAssignment(int32_t cacheId) const
{
return affinityManager.GetAffinityAssignment(cacheId);
}
void DataRouter::InvalidateChannel(SP_DataChannel &channel)
{
if (!channel.IsValid())
return;
common::concurrent::CsLockGuard lock(channelsMutex);
InvalidateChannelLocked(channel);
}
void DataRouter::InvalidateChannelLocked(SP_DataChannel &channel)
{
if (!channel.IsValid())
return;
DataChannel& channel0 = *channel.Get();
channels.erase(channel0.GetId());
partChannels.erase(channel0.GetNode().GetGuid());
}
SP_DataChannel DataRouter::GetRandomChannel()
{
common::concurrent::CsLockGuard lock(channelsMutex);
return GetRandomChannelLocked();
}
SP_DataChannel DataRouter::GetRandomChannelLocked()
{
if (connectedChannels.empty())
return SP_DataChannel();
int r = rand();
size_t idx = r % connectedChannels.size();
ChannelsIdSet::iterator it = connectedChannels.begin();
std::advance(it, idx);
return channels[*it];
}
SP_DataChannel DataRouter::GetBestChannel(const Guid& hint)
{
common::concurrent::CsLockGuard lock(channelsMutex);
ChannelsGuidMap::iterator itChannel = partChannels.find(hint);
if (itChannel != partChannels.end())
return itChannel->second;
return GetRandomChannelLocked();
}
void DataRouter::CollectAddresses(const std::string& str, std::vector<network::TcpRange>& ranges)
{
ranges.clear();
utility::ParseAddress(str, ranges, DEFAULT_PORT);
std::random_shuffle(ranges.begin(), ranges.end());
}
SP_DataChannel DataRouter::FindChannel(uint64_t id)
{
common::concurrent::CsLockGuard lock(channelsMutex);
return FindChannelLocked(id);
}
SP_DataChannel DataRouter::FindChannelLocked(uint64_t id)
{
ChannelsIdMap::iterator it = channels.find(id);
if (it != channels.end())
return it->second;
return SP_DataChannel();
}
}
}
}