blob: ada56195b7aa47301c444db0b481c71f3cc3aa5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CqEventImpl.hpp"
#include <geode/CacheableString.hpp>
#include "TcrConnectionManager.hpp"
#include "TcrMessage.hpp"
#include "ThinClientCacheDistributionManager.hpp"
#include "ThinClientPoolHADM.hpp"
namespace apache {
namespace geode {
namespace client {
CqEventImpl::CqEventImpl(std::shared_ptr<CqQuery>& cQuery, CqOperation baseOp,
CqOperation cqOp, std::shared_ptr<CacheableKey>& key,
std::shared_ptr<Cacheable>& value,
ThinClientBaseDM* tcrdm,
std::shared_ptr<CacheableBytes> deltaBytes,
std::shared_ptr<EventId> eventId)
: m_error(false) {
m_cQuery = cQuery;
m_queryOp = cqOp;
m_baseOp = baseOp;
m_key = key;
m_newValue = value;
if (m_queryOp == CqOperation::OP_TYPE_INVALID) m_error = true;
m_tcrdm = tcrdm;
m_deltaValue = deltaBytes;
m_eventId = eventId;
}
std::shared_ptr<CqQuery> CqEventImpl::getCq() const { return m_cQuery; }
CqOperation CqEventImpl::getBaseOperation() const { return m_baseOp; }
/**
* Get the the operation on the query results. Supported operations include
* update, create, and destroy.
*/
CqOperation CqEventImpl::getQueryOperation() const { return m_queryOp; }
/**
* Get the key relating to the event.
* @return Object key.
*/
std::shared_ptr<CacheableKey> CqEventImpl::getKey() const { return m_key; }
/**
* Get the new value of the modification.
* If there is no new value because this is a delete, then
* return null.
*/
std::shared_ptr<Cacheable> CqEventImpl::getNewValue() const {
if (m_deltaValue == nullptr) {
return m_newValue;
} else {
// Get full object for delta
TcrMessageRequestEventValue fullObjectMsg(
new DataOutput(
m_tcrdm->getConnectionManager().getCacheImpl()->createDataOutput()),
m_eventId);
TcrMessageReply reply(true, nullptr);
ThinClientPoolHADM* poolHADM = dynamic_cast<ThinClientPoolHADM*>(m_tcrdm);
GfErrType err = GF_NOTCON;
if (poolHADM) {
err = poolHADM->sendRequestToPrimary(fullObjectMsg, reply);
} else {
err = static_cast<ThinClientCacheDistributionManager*>(m_tcrdm)
->sendRequestToPrimary(fullObjectMsg, reply);
}
std::shared_ptr<Cacheable> fullObject = nullptr;
if (err == GF_NOERR) {
fullObject = reply.getValue();
}
return fullObject;
}
}
bool CqEventImpl::getError() { return m_error; }
std::string CqEventImpl::toString() {
char buffer[1024];
std::snprintf(
buffer, 1024,
"CqEvent CqName=%s; base operation=%d; cq operation= %d;key=%s;value=%s",
m_cQuery->getName().c_str(), static_cast<int>(m_baseOp),
static_cast<int>(m_queryOp), m_key->toString().c_str(),
m_newValue->toString().c_str());
return buffer;
}
std::shared_ptr<CacheableBytes> CqEventImpl::getDeltaValue() const {
return m_deltaValue;
}
} // namespace client
} // namespace geode
} // namespace apache