| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /** |
| * @file UdpIpc.cpp |
| * @since 1.0 |
| * @version 1.0 |
| * @see |
| * |
| */ |
| |
| // ---------------------------------------------------------------------------- |
| |
| #include <geode/internal/geode_globals.hpp> |
| |
| #include "fwklib/UDPIpc.hpp" |
| #include "fwk/UdpIpc.hpp" |
| #include "fwklib/FwkLog.hpp" |
| #include "fwklib/PerfFwk.hpp" |
| |
| #include "fwklib/FwkExport.hpp" |
| |
| #include <mutex> |
| #include <util/concurrent/spinlock_mutex.hpp> |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| namespace testframework { |
| |
| using util::concurrent::spinlock_mutex; |
| |
| static UdpIpc *g_test = nullptr; |
| |
| // ---------------------------------------------------------------------------- |
| |
| TESTTASK initialize(const char *initArgs) { |
| int32_t result = FWK_SUCCESS; |
| if (!g_test) { |
| FWKINFO("Initializing Fwk library."); |
| try { |
| g_test = new UdpIpc(initArgs); |
| } catch (const FwkException &ex) { |
| FWKSEVERE("initialize: caught exception: " << ex.what()); |
| result = FWK_SEVERE; |
| } |
| } |
| return result; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| |
| TESTTASK finalize() { |
| int32_t result = FWK_SUCCESS; |
| FWKINFO("Finalizing Fwk library."); |
| if (g_test) { |
| g_test->cacheFinalize(); |
| delete g_test; |
| g_test = nullptr; |
| } |
| return result; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| |
| void UdpIpc::checkTest(const char *taskId) { |
| std::lock_guard<spinlock_mutex> guard(m_lck); |
| setTask(taskId); |
| if (m_cache == nullptr) { |
| auto pp = Properties::create(); |
| |
| cacheInitialize(pp); |
| |
| // UdpIpc specific initialization |
| // none |
| } |
| } |
| |
| // ---------------------------------------------------------------------------- |
| |
| TESTTASK doShowEndPoints(const char *taskId) { |
| int32_t result = FWK_SUCCESS; |
| g_test->checkTest(taskId); |
| std::string bb("GFE_BB"); |
| std::string key("EndPoints"); |
| std::string epts = g_test->bbGetString(bb, key); |
| if (epts.empty()) { |
| FWKSEVERE("EndPoints are not set in BB."); |
| result = FWK_SEVERE; |
| } else { |
| FWKINFO("EndPoints are: " << epts); |
| } |
| |
| return result; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| |
| TESTTASK doService(const char *taskId) { |
| int32_t result = FWK_SUCCESS; |
| g_test->checkTest(taskId); |
| |
| g_test->doService(); |
| |
| return result; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| |
| TESTTASK doClient(const char *taskId) { |
| int32_t result = FWK_SUCCESS; |
| g_test->checkTest(taskId); |
| |
| g_test->doClient(); |
| |
| return result; |
| } |
| |
| // ---------------------------------------------------------------------------- |
| |
| void UdpIpc::doService() { |
| bool expectResponse = g_test->getBoolValue("expectResponse"); |
| |
| int32_t port = g_test->getIntValue("port"); |
| port = (port < 0) ? 3212 : port; |
| |
| int32_t totThreads = g_test->getIntValue("totThreads"); |
| totThreads = (totThreads < 0) ? 10 : totThreads; |
| |
| int32_t inThreads = g_test->getIntValue("inThreads"); |
| inThreads = (inThreads < 0) ? (totThreads / 3) : inThreads; |
| |
| int32_t procThreads = g_test->getIntValue("procThreads"); |
| procThreads = (procThreads < 0) ? (totThreads / 3) : procThreads; |
| |
| int32_t outThreads = g_test->getIntValue("outThreads"); |
| outThreads = (outThreads < 0) ? (totThreads / 3) : outThreads; |
| |
| std::string label = g_test->getStringValue("label"); |
| if (label.empty()) { |
| label = "BBqueues"; |
| } |
| |
| int32_t timedInterval = getTimeValue("timedInterval"); |
| if (timedInterval <= 0) { |
| timedInterval = 120; |
| } else { |
| timedInterval += 60; |
| } |
| |
| char *fqdn = ACE_OS::getenv("GF_FQDN"); |
| if (!fqdn) { |
| FWKEXCEPTION("GF_FQDN not set in the environment."); |
| } |
| |
| UDPMessageQueues *shared = new UDPMessageQueues(label); |
| Receiver recv(shared, port); |
| TestProcessor serv(shared, expectResponse); |
| Responder resp(shared, port); |
| |
| Service farm(totThreads); |
| |
| try { |
| uint32_t thrds = farm.runThreaded(&recv, inThreads); |
| FWKINFO("Server running recv on " << thrds << " threads."); |
| |
| thrds = farm.runThreaded(&serv, procThreads); |
| FWKINFO("Server running serv on " << thrds << " threads."); |
| |
| thrds = farm.runThreaded(&resp, outThreads); |
| FWKINFO("Server running resp on " << thrds << " threads."); |
| |
| char buff[512]; |
| sprintf(buff, "%s:%d", fqdn, port); |
| std::string key("ServerAddr"); |
| std::string val(buff); |
| g_test->bbSet(label, key, val); |
| |
| perf::sleepSeconds(timedInterval); |
| |
| FWKINFO("Time to stop."); |
| farm.stopThreads(); |
| } catch (FwkException &ex) { |
| FWKSEVERE("Caught exception " << ex.what()); |
| } |
| |
| FWKINFO("Server stopped."); |
| perf::sleepSeconds(1); |
| delete shared; |
| } |
| |
| void UdpIpc::doClient() { |
| bool expectResponse = g_test->getBoolValue("expectResponse"); |
| |
| std::string label = g_test->getStringValue("label"); |
| if (label.empty()) { |
| label = "BBqueues"; |
| } |
| |
| int32_t timedInterval = getTimeValue("timedInterval"); |
| if (timedInterval <= 0) timedInterval = 60; |
| |
| std::string serverAddr; |
| int32_t tries = 60; |
| std::string key("ServerAddr"); |
| while (serverAddr.empty() && (--tries > 0)) { |
| perf::sleepSeconds(1); |
| serverAddr = g_test->bbGetString(label, key); |
| } |
| if (serverAddr.empty()) { |
| FWKEXCEPTION("Server address is not set in BB " << label << "."); |
| } else { |
| FWKINFO("Server address is: " << serverAddr); |
| } |
| |
| int32_t msgCnt = 0; |
| try { |
| UDPMessageClient clnt(serverAddr); |
| UDPMessage msg; |
| |
| const ACE_Time_Value wait(10); |
| FWKINFO("Start"); |
| ACE_Time_Value now = ACE_OS::gettimeofday(); |
| ACE_Time_Value interval(timedInterval); |
| ACE_Time_Value end = now + interval; |
| std::string str("Just junk"); |
| while (end > now) { |
| msg.setMessage(str); |
| msg.setCmd(ACK_REQUEST); |
| msg.sendTo(clnt.getConn(), clnt.getServer()); |
| msgCnt++; |
| if (expectResponse) { |
| msg.receiveFrom(clnt.getConn(), &wait); |
| if (msg.length() == 0) { |
| FWKINFO("NULL response."); |
| } else { |
| std::string reply = msg.what(); |
| if (reply == "A result for you.") { |
| // nevermind |
| } else { |
| FWKWARN("Reply was not as expected: " << reply); |
| } |
| } |
| } |
| now = ACE_OS::gettimeofday(); |
| } |
| } catch (FwkException &ex) { |
| FWKSEVERE("Caught exception " << ex.what()); |
| } |
| FWKINFO("Stop"); |
| FWKINFO("Client sent " << msgCnt << " messages"); |
| } |
| } |
| } |
| } |
| } |