blob: 13825f000c4a86f51803d882b034fb6f7cc98c9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <framework/Cluster.h>
#include <framework/Framework.h>
#include <framework/Gfsh.h>
#include <chrono>
#include <future>
#include <geode/Cache.hpp>
#include <geode/EntryEvent.hpp>
#include <geode/PoolManager.hpp>
#include <geode/RegionFactory.hpp>
#include <geode/RegionShortcut.hpp>
#include <geode/TypeRegistry.hpp>
#include "Order.hpp"
namespace {
using apache::geode::client::Cache;
using apache::geode::client::Cacheable;
using apache::geode::client::CacheableKey;
using apache::geode::client::CacheableString;
using apache::geode::client::CacheListener;
using apache::geode::client::EntryEvent;
using apache::geode::client::Pool;
using apache::geode::client::Region;
using apache::geode::client::RegionShortcut;
using apache::geode::client::Serializable;
using std::chrono::minutes;
using std::chrono::seconds;
using WanDeserialization::Order;
class GeodeCacheListener : public CacheListener {
private:
std::promise<
std::pair<std::shared_ptr<CacheableKey>, std::shared_ptr<Serializable>>>
promise_;
public:
void afterCreate(const EntryEvent& event) override {
promise_.set_value({event.getKey(), event.getNewValue()});
}
decltype(promise_)& getPromise() { return promise_; }
};
const std::string regionName = "region";
Cache createCache(std::string durableClientId) {
using apache::geode::client::CacheFactory;
auto cache = CacheFactory()
.set("log-level", "none")
.set("statistic-sampling-enabled", "false")
.set("durable-client-id", durableClientId)
.set("durable-timeout", "300s")
.create();
return cache;
}
std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache,
std::string poolName) {
auto poolFactory = cache.getPoolManager().createFactory();
cluster.applyLocators(poolFactory);
poolFactory.setPRSingleHopEnabled(true);
poolFactory.setSubscriptionEnabled(true);
return poolFactory.create(poolName);
}
std::shared_ptr<Region> setupRegion(
Cache& cache, const std::shared_ptr<Pool>& pool,
std::shared_ptr<GeodeCacheListener> cacheListener) {
auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
.setPoolName(pool->getName())
.setCacheListener(cacheListener)
.create(regionName);
region->registerAllKeys(true, true, true);
return region;
}
TEST(WanDeserializationTest, testEventsAreDeserializedCorrectly) {
const auto& hostname = Framework::getHostname();
const auto locatorAddressA =
LocatorAddress{hostname, Framework::getAvailablePort()};
const auto locatorAddressB =
LocatorAddress{hostname, Framework::getAvailablePort()};
Cluster clusterA{InitialLocators{{locatorAddressA}},
InitialServers{{{hostname, 0}}},
RemoteLocators{{locatorAddressB}}, DistributedSystemId(1)};
clusterA.start();
Cluster clusterB{InitialLocators{{locatorAddressB}},
InitialServers{{{hostname, 0}}},
RemoteLocators{{locatorAddressB}}, DistributedSystemId(2)};
clusterB.start();
// Create gw receivers
clusterA.getGfsh().create().gatewayReceiver().execute();
clusterB.getGfsh().create().gatewayReceiver().execute();
// Create gw senders
clusterA.getGfsh()
.create()
.gatewaySender()
.withId("A-to-B")
.withRemoteDSId("2")
.execute();
clusterB.getGfsh()
.create()
.gatewaySender()
.withId("B-to-A")
.withRemoteDSId("1")
.execute();
// Create regions
clusterA.getGfsh()
.create()
.region()
.withName(regionName)
.withType("PARTITION")
.withGatewaySenderId("A-to-B")
.execute();
clusterB.getGfsh()
.create()
.region()
.withName(regionName)
.withType("PARTITION")
.withGatewaySenderId("B-to-A")
.execute();
auto cacheA = createCache("clientA");
auto poolA = createPool(clusterA, cacheA, "poolSiteA");
std::shared_ptr<GeodeCacheListener> cacheListenerA =
std::make_shared<GeodeCacheListener>();
auto regionA = setupRegion(cacheA, poolA, cacheListenerA);
auto cacheB = createCache("clientB");
auto poolB = createPool(clusterB, cacheB, "poolSiteB");
std::shared_ptr<GeodeCacheListener> cacheListenerB =
std::make_shared<GeodeCacheListener>();
auto regionB = setupRegion(cacheB, poolB, cacheListenerB);
cacheA.getTypeRegistry().registerPdxType(Order::createDeserializable);
cacheA.readyForEvents();
cacheB.getTypeRegistry().registerPdxType(Order::createDeserializable);
cacheB.readyForEvents();
auto key = std::make_shared<CacheableString>("order");
auto order = std::make_shared<Order>(2, "product y", 37);
regionA->put(key, order);
{
auto future = cacheListenerA->getPromise().get_future();
ASSERT_EQ(std::future_status::ready, future.wait_for(seconds(10)));
auto kv = future.get();
auto eventKey = kv.first;
auto eventValue = kv.second;
auto eventOrder = std::dynamic_pointer_cast<Order>(eventValue);
ASSERT_NE(eventKey, nullptr);
EXPECT_EQ(*eventKey, *key);
ASSERT_NE(eventValue, nullptr);
ASSERT_NE(eventOrder, nullptr);
EXPECT_EQ(*eventOrder, *order);
}
{
auto future = cacheListenerB->getPromise().get_future();
ASSERT_EQ(std::future_status::ready, future.wait_for(seconds(10)));
auto kv = future.get();
auto eventKey = kv.first;
auto eventValue = kv.second;
auto eventOrder = std::dynamic_pointer_cast<Order>(eventValue);
ASSERT_NE(eventKey, nullptr);
EXPECT_EQ(*eventKey, *key);
ASSERT_NE(eventValue, nullptr);
ASSERT_NE(eventOrder, nullptr);
EXPECT_EQ(*eventOrder, *order);
}
auto valueB = regionB->get(key);
ASSERT_NE(valueB, nullptr);
auto orderB = std::dynamic_pointer_cast<Order>(valueB);
ASSERT_NE(valueB, nullptr);
EXPECT_EQ(*order, *orderB);
}
} // namespace