blob: 9d169e1bb3f51ff45676b3e036ee8a38caeedeba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef GEODE_INTEGRATION_TEST_THINCLIENTDURABLEFAILOVER_H_
#define GEODE_INTEGRATION_TEST_THINCLIENTDURABLEFAILOVER_H_
#include "fw_dunit.hpp"
#include "ThinClientHelper.hpp"
#include <thread>
#include <chrono>
/* Testing Parameters Param's Value
Termination : Keepalive = true/ false, Client crash
Restart Time: Before Timeout / After Timeout
Register Interest Durable/ Non Durable
Descripton: There is One server , one feeder and two clients. Both clients
comes up ->
feeder feed -> both clients go down in same way ( keepalive = true/ false ,
crash )->
feeder feed -> Client1 comes up -> Client2 comes up after timeout -> verify ->
Shutdown
*/
#define CLIENT1 s1p1
#define CLIENT2 s1p2
#define SERVER1 s2p1
#define FEEDER s2p2
namespace { // NOLINT(google-build-namespaces)
using apache::geode::client::EntryEvent;
using apache::geode::client::Exception;
using apache::geode::client::HashMapOfCacheable;
using apache::geode::client::RegionEvent;
class OperMonitor : public CacheListener {
int m_ops;
HashMapOfCacheable m_map;
void check(const EntryEvent& event) {
m_ops++;
auto key = event.getKey();
std::shared_ptr<CacheableInt32> value = nullptr;
try {
value = std::dynamic_pointer_cast<CacheableInt32>(event.getNewValue());
} catch (Exception&) {
// do nothing.
}
auto keyPtr = std::dynamic_pointer_cast<CacheableString>(key);
if (keyPtr != nullptr && value != nullptr) {
char buf[256] = {'\0'};
sprintf(buf, " Got Key: %s, Value: %d", keyPtr->toString().c_str(),
value->value());
LOG(buf);
}
if (value) {
m_map[key] = value;
}
}
public:
OperMonitor() : m_ops(0) {}
~OperMonitor() override { m_map.clear(); }
void validate(size_t keyCount, int eventcount, int durableValue,
int nonDurableValue) {
LOG("validate called");
char buf[256] = {'\0'};
sprintf(buf, "Expected %zd keys for the region, Actual = %zd", keyCount,
m_map.size());
ASSERT(m_map.size() == keyCount, buf);
sprintf(buf, "Expected %d events for the region, Actual = %d", eventcount,
m_ops);
ASSERT(m_ops == eventcount, buf);
for (const auto& item : m_map) {
auto keyPtr = std::dynamic_pointer_cast<CacheableString>(item.first);
auto valuePtr = std::dynamic_pointer_cast<CacheableInt32>(item.second);
if (keyPtr->toString().find('D') ==
std::string::npos) { /*Non Durable Key */
sprintf(buf,
"Expected final value for nonDurable Keys = %d, Actual = %d",
nonDurableValue, valuePtr->value());
ASSERT(valuePtr->value() == nonDurableValue, buf);
} else { /*Durable Key */
sprintf(buf, "Expected final value for Durable Keys = %d, Actual = %d",
durableValue, valuePtr->value());
ASSERT(valuePtr->value() == durableValue, buf);
}
}
}
void afterCreate(const EntryEvent& event) override {
LOG("afterCreate called");
check(event);
}
void afterUpdate(const EntryEvent& event) override {
LOG("afterUpdate called");
check(event);
}
void afterDestroy(const EntryEvent& event) override {
LOG("afterDestroy called");
check(event);
}
void afterRegionInvalidate(const RegionEvent&) override{};
void afterRegionDestroy(const RegionEvent&) override{};
};
void setCacheListener(const char* regName,
std::shared_ptr<OperMonitor> monitor) {
auto reg = getHelper()->getRegion(regName);
auto attrMutator = reg->getAttributesMutator();
attrMutator->setCacheListener(monitor);
}
std::shared_ptr<OperMonitor> mon1 = nullptr;
std::shared_ptr<OperMonitor> mon2 = nullptr;
#include "ThinClientDurableInit.hpp"
#include "ThinClientTasks_C2S2.hpp"
/* Total 10 Keys , alternate durable and non-durable */
const char* mixKeys[] = {"Key-1", "D-Key-1", "L-Key", "LD-Key"};
const char* testRegex[] = {"D-Key-.*", "Key-.*"};
void initClientCache(int redundancy, int durableTimeout,
std::shared_ptr<OperMonitor>& mon, int sleepDuration = 0,
int durableIdx = 0) {
if (sleepDuration) SLEEP(sleepDuration);
if (mon == nullptr) {
mon = std::make_shared<OperMonitor>();
}
// 35 sec ack interval to ensure primary clears its Q only
// after the secondary comes up and is able to receive the QRM
// otherwise it will get the unacked events from GII causing the
// client to get 2 extra / replayed events.
initClientAndRegion(redundancy, durableIdx, std::chrono::seconds(1),
std::chrono::seconds(1),
std::chrono::seconds(durableTimeout));
setCacheListener(regionNames[0], mon);
getHelper()->cachePtr->readyForEvents();
auto regPtr0 = getHelper()->getRegion(regionNames[0]);
// for R =1 it will get a redundancy error
try {
regPtr0->registerRegex(testRegex[0], true);
} catch (Exception&) {
// do nothing.
}
try {
regPtr0->registerRegex(testRegex[1], false);
} catch (Exception&) {
// do nothing.
}
}
void feederUpdate(int value) {
createIntEntry(regionNames[0], mixKeys[0], value);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
createIntEntry(regionNames[0], mixKeys[1], value);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
/* Close Client 1 with option keep alive = true*/
DUNIT_TASK_DEFINITION(CLIENT1, CloseClient1WithKeepAlive)
{
// sleep 30 sec to allow clients' periodic acks (1 sec) to go out
// this is along with the 5 sec sleep after feeder update and
// tied to the notify-ack-interval setting of 35 sec.
SLEEP(30000);
getHelper()->disconnect(true);
cleanProc();
LOG("CloseClient1WithKeepAlive complete: Keepalive = True");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, StartServer1)
{
if (isLocalServer) {
CacheHelper::initServer(1, "cacheserver_notify_subscription.xml",
locatorsG);
}
LOG("SERVER started");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, StartServer2)
{
if (isLocalServer) {
CacheHelper::initServer(2, "cacheserver_notify_subscription2.xml",
locatorsG);
}
// sleep for 3 seconds to allow redundancy monitor to detect new server.
std::this_thread::sleep_for(std::chrono::seconds(3));
LOG("SERVER started");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(FEEDER, FeederInit)
{
initClientWithPool(true, "__TEST_POOL1__", locatorsG, nullptr, nullptr, 0,
true);
getHelper()->createPooledRegion(regionNames[0], USE_ACK, locatorsG,
"__TEST_POOL1__", true, true);
LOG("FeederInit complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, InitClient1NoRedundancy)
{ initClientCache(0, 300, mon1); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, InitClient1WithRedundancy)
{ initClientCache(1, 300, mon1); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(FEEDER, FeederUpdate1)
{
feederUpdate(1);
// Wait 5 seconds for events to be removed from ha queues.
std::this_thread::sleep_for(std::chrono::seconds(5));
LOG("FeederUpdate1 complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(FEEDER, FeederUpdate2)
{
feederUpdate(2);
// Wait 5 seconds for events to be removed from ha queues.
std::this_thread::sleep_for(std::chrono::seconds(5));
LOG("FeederUpdate2 complete.");
}
END_TASK_DEFINITION
// R =0 and clientDown, Intermediate events lost.
DUNIT_TASK_DEFINITION(CLIENT1, VerifyClientDownWithEventsLost)
{
LOG("Client Verify 1.");
mon1->validate(2, 2, 1, 1);
}
END_TASK_DEFINITION
// R =1 and clientDown, Durable events recieved
DUNIT_TASK_DEFINITION(CLIENT1, VerifyClientDownDurableEventsRecieved)
{
LOG("Client Verify 2.");
mon1->validate(2, 3, 2, 1);
}
END_TASK_DEFINITION
// No clientDown, All events recieved
DUNIT_TASK_DEFINITION(CLIENT1, VeryifyNoClientDownAllEventsReceived)
{
LOG("Client Verify 3.");
mon1->validate(2, 4, 2, 2);
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(FEEDER, CloseFeeder)
{
cleanProc();
LOG("FEEDER closed");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, CloseClient1)
{
mon1 = nullptr;
cleanProc();
LOG("CLIENT1 closed");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CloseServer1)
{
CacheHelper::closeServer(1);
// Wait 2 seconds to allow client failover.
std::this_thread::sleep_for(std::chrono::seconds(2));
LOG("SERVER closed");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CloseServer2)
{
CacheHelper::closeServer(2);
LOG("SERVER closed");
}
END_TASK_DEFINITION
void doThinClientDurableFailoverClientClosedNoRedundancy() {
CALL_TASK(StartLocator);
CALL_TASK(StartServer1);
CALL_TASK(FeederInit);
CALL_TASK(InitClient1NoRedundancy);
CALL_TASK(StartServer2);
CALL_TASK(FeederUpdate1);
CALL_TASK(CloseClient1WithKeepAlive);
CALL_TASK(CloseServer1);
CALL_TASK(FeederUpdate2);
CALL_TASK(InitClient1NoRedundancy);
CALL_TASK(VerifyClientDownWithEventsLost);
CALL_TASK(CloseClient1);
CALL_TASK(CloseFeeder);
CALL_TASK(CloseServer2);
CALL_TASK(CloseLocator);
}
void doThinClientDurableFailoverClientNotClosedRedundancy() {
CALL_TASK(StartLocator);
CALL_TASK(StartServer1);
CALL_TASK(FeederInit);
CALL_TASK(InitClient1WithRedundancy);
CALL_TASK(StartServer2);
CALL_TASK(FeederUpdate1);
CALL_TASK(CloseServer1);
CALL_TASK(FeederUpdate2);
CALL_TASK(VeryifyNoClientDownAllEventsReceived);
CALL_TASK(CloseClient1);
CALL_TASK(CloseFeeder);
CALL_TASK(CloseServer2);
CALL_TASK(CloseLocator);
}
void doThinClientDurableFailoverClientClosedRedundancy() {
CALL_TASK(StartLocator);
CALL_TASK(StartServer1);
CALL_TASK(FeederInit);
CALL_TASK(InitClient1WithRedundancy);
CALL_TASK(StartServer2);
CALL_TASK(FeederUpdate1);
CALL_TASK(CloseClient1WithKeepAlive);
CALL_TASK(CloseServer1);
CALL_TASK(FeederUpdate2);
CALL_TASK(InitClient1WithRedundancy);
CALL_TASK(VerifyClientDownDurableEventsRecieved);
CALL_TASK(CloseClient1);
CALL_TASK(CloseFeeder);
CALL_TASK(CloseServer2);
CALL_TASK(CloseLocator);
}
} // namespace
#endif // GEODE_INTEGRATION_TEST_THINCLIENTDURABLEFAILOVER_H_