blob: 73ef653ddf956f0ac4719a4a170a372884061d7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include "ICalculator.h"
#include "celix_shell_command.h"
#include "celix/FrameworkFactory.h"
class RemoteServicesIntegrationTestSuite : public ::testing::Test {
public:
RemoteServicesIntegrationTestSuite() {
celix::Properties clientConfig{
{"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "info"},
{celix::FRAMEWORK_CACHE_DIR, ".clientCache"},
};
clientFw = celix::createFramework(clientConfig);
clientCtx = clientFw->getFrameworkBundleContext();
celix::Properties serverConfig{
{"CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL", "info"},
{celix::FRAMEWORK_CACHE_DIR, ".serverCache"},
};
serverFw = celix::createFramework(serverConfig);
serverCtx = serverFw->getFrameworkBundleContext();
}
static void installSharedBundles(std::shared_ptr<celix::BundleContext>& ctx) {
auto sharedBundles = {
RS_DISCOVERY_BUNDLE_LOC,
RS_FACTORY_BUNDLE_LOC ,
RS_RSA_BUNDLE_LOC};
for (const auto& bndLoc : sharedBundles) {
auto bndId = ctx->installBundle(bndLoc);
EXPECT_GE(bndId, 0);
}
}
void installProviderBundles() {
installSharedBundles(serverCtx);
auto bndId = serverCtx->installBundle(RS_PROVIDER_BUNDLE_LOC);
EXPECT_GE(bndId, 0);
}
void installConsumerBundles() {
installSharedBundles(clientCtx);
auto bndId = clientCtx->installBundle(RS_CONSUMER_BUNDLE_LOC);
EXPECT_GE(bndId, 0);
}
static void printTopicSendersAndReceivers(celix::BundleContext& ctx) {
ctx.useService<celix_shell_command>(CELIX_SHELL_COMMAND_SERVICE_NAME)
.setFilter((std::string{"("}.append(CELIX_SHELL_COMMAND_NAME).append("=celix::psa_zmq)")))
.addUseCallback([](auto& cmd) {
cmd.executeCommand(cmd.handle, "psa_zmq", stdout, stdout);
})
.build();
}
void invokeRemoteCalcService() {
//If a calculator provider bundle is installed I expect a exported calculator interface
auto count = serverCtx->useService<ICalculator>()
.setFilter("(service.exported.interfaces=*)")
.build();
EXPECT_EQ(count, 1);
//If a calculator consumer bundle is installed and also the needed remote services bundle, I expect an import calculator interface
count = clientCtx->useService<ICalculator>()
.setTimeout(std::chrono::seconds{1})
.setFilter("(service.imported=*)")
.build();
EXPECT_EQ(count, 1);
//When I call the calculator service from the client, I expect an answer
std::atomic<int> streamCount = 0;
std::atomic<double> lastStreamValue = 0.0;
std::atomic<bool> promiseSuccessful = false;
std::atomic<double> promiseValue = 0.0;
/*
* Testing the remote service in a while loop till it is successful or 10 seconds has passed.
* Note that because mq does not guarantee a connection when used, it is possible - and likely -
* that the first remote test iteration fails due to not yet completely connected mq.
*/
auto start = std::chrono::system_clock::now();
auto now = std::chrono::system_clock::now();
int iter = 1;
while ((streamCount == 0 || !promiseSuccessful) && (now - start) < std::chrono::seconds{10}) {
clientCtx->logInfo("Testing remote C++ iteration %i.", iter++);
count = clientCtx->useService<ICalculator>()
.addUseCallback([&](auto& calc) {
//testing remote stream
auto stream = calc.result();
auto streamEnded = stream->forEach([&](double event){
lastStreamValue = event;
streamCount++;
});
streamEnded.onResolve([ctx = clientCtx, i = iter]() {
ctx->logInfo("Stream for iteration %i closed", i);
});
//testing remote promise
auto promise = calc.add(2, 4).onFailure([ctx = clientCtx, i = iter](const auto& e) {
ctx->logInfo("Got remote promise exception for iteration %i: %s", i, e.what());
});
promise.wait();
promiseSuccessful = promise.isSuccessfullyResolved();
if (promiseSuccessful) {
promiseValue = promise.getValue();
}
})
.build();
now = std::chrono::system_clock::now();
}
EXPECT_EQ(count, 1);
EXPECT_GE(streamCount, 1);
EXPECT_GE(lastStreamValue, 0.0);
EXPECT_TRUE(promiseSuccessful);
EXPECT_EQ(6, promiseValue);
if (streamCount == 0 || !promiseSuccessful) {
//extra debug info
printTopicSendersAndReceivers(*clientCtx);
printTopicSendersAndReceivers(*serverCtx);
}
}
std::shared_ptr<celix::Framework> clientFw{};
std::shared_ptr<celix::BundleContext> clientCtx{};
std::shared_ptr<celix::Framework> serverFw{};
std::shared_ptr<celix::BundleContext> serverCtx{};
};
TEST_F(RemoteServicesIntegrationTestSuite, StartStopFrameworks) {
installProviderBundles();
installConsumerBundles();
}
TEST_F(RemoteServicesIntegrationTestSuite, InvokeRemoteCalcService) {
installProviderBundles();
installConsumerBundles();
invokeRemoteCalcService();
}