blob: f0810fb054e3fb6aaa37000d9107822694c35302 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TransactionalOperation.hpp"
#include <geode/Cache.hpp>
#include <geode/CacheableString.hpp>
#include <geode/FunctionService.hpp>
#include "CacheImpl.hpp"
#include "RegionInternal.hpp"
#include "util/exception.hpp"
namespace apache {
namespace geode {
namespace client {
TransactionalOperation::TransactionalOperation(
ServerRegionOperation op, const char* regionName,
std::shared_ptr<CacheableKey> key,
std::shared_ptr<std::vector<std::shared_ptr<Cacheable>>> arguments)
: m_operation(op),
m_regionName(regionName),
m_key(key),
m_arguments(arguments) {}
TransactionalOperation::~TransactionalOperation() {}
std::shared_ptr<Cacheable> TransactionalOperation::replay(
CacheImpl* cacheImpl) {
std::shared_ptr<Cacheable> result = nullptr;
switch (m_operation) {
case GF_CONTAINS_KEY:
result = CacheableBoolean::create(cacheImpl->getCache()
->getRegion(m_regionName)
->containsKeyOnServer(m_key));
break;
case GF_CONTAINS_VALUE:
GfErrTypeThrowException("Contains value not supported in transaction",
GF_NOTSUP);
// result = Boolean.valueOf(containsValue(args[0], true));
break;
case GF_CONTAINS_VALUE_FOR_KEY:
result = CacheableBoolean::create(cacheImpl->getCache()
->getRegion(m_regionName)
->containsValueForKey(m_key));
break;
case GF_DESTROY: // includes REMOVE(k,v)
cacheImpl->getCache()
->getRegion(m_regionName)
->destroy(m_key, m_arguments->at(0));
break;
case GF_EXECUTE_FUNCTION: {
Execution execution;
if (m_regionName == nullptr) {
execution = FunctionService::onServer(*cacheImpl->getCache());
} else {
execution = FunctionService::onRegion(
cacheImpl->getCache()->getRegion(m_regionName));
}
result = std::dynamic_pointer_cast<Cacheable>(
execution.withArgs(m_arguments->at(0))
.withFilter(std::dynamic_pointer_cast<CacheableVector>(
m_arguments->at(1)))
.withCollector(std::dynamic_pointer_cast<ResultCollector>(
m_arguments->at(2)))
.execute(m_arguments->at(3)->toString().c_str(),
std::chrono::milliseconds(
std::dynamic_pointer_cast<CacheableInt32>(
m_arguments->at(4))
->value())));
} break;
case GF_GET:
result = cacheImpl->getCache()
->getRegion(m_regionName)
->get(m_key, m_arguments->at(0));
break;
case GF_GET_ENTRY:
GfErrTypeThrowException("getEntry not supported in transaction",
GF_NOTSUP);
break;
case GF_GET_ALL: {
const auto result =
std::static_pointer_cast<RegionInternal>(
cacheImpl->getCache()->getRegion(m_regionName))
->getAll_internal(*std::dynamic_pointer_cast<
std::vector<std::shared_ptr<CacheableKey>>>(
m_arguments->at(0)),
nullptr,
std::dynamic_pointer_cast<CacheableBoolean>(
m_arguments->at(3))
->value());
auto values =
std::dynamic_pointer_cast<HashMapOfCacheable>(m_arguments->at(1));
values->insert(result.begin(), result.end());
break;
}
case GF_INVALIDATE:
cacheImpl->getCache()
->getRegion(m_regionName)
->invalidate(m_key, m_arguments->at(0));
break;
case GF_REMOVE:
cacheImpl->getCache()
->getRegion(m_regionName)
->remove(m_key, m_arguments->at(0), m_arguments->at(1));
break;
case GF_KEY_SET: {
auto tmp = cacheImpl->getCache()->getRegion(m_regionName)->serverKeys();
auto serverKeys =
std::dynamic_pointer_cast<std::vector<std::shared_ptr<CacheableKey>>>(
m_arguments->at(0));
serverKeys->insert(serverKeys->end(), tmp.begin(), tmp.end());
break;
}
case GF_CREATE: // includes PUT_IF_ABSENT
cacheImpl->getCache()
->getRegion(m_regionName)
->create(m_key, m_arguments->at(0), m_arguments->at(1));
break;
case GF_PUT: // includes PUT_IF_ABSENT
cacheImpl->getCache()
->getRegion(m_regionName)
->put(m_key, m_arguments->at(0), m_arguments->at(1));
break;
case GF_PUT_ALL:
cacheImpl->getCache()
->getRegion(m_regionName)
->putAll(
*std::dynamic_pointer_cast<HashMapOfCacheable>(
m_arguments->at(0)),
std::chrono::milliseconds(
std::dynamic_pointer_cast<CacheableInt32>(m_arguments->at(1))
->value()));
break;
default:
throw UnsupportedOperationException(
"unknown operation encountered during transaction replay");
}
return result;
}
} // namespace client
} // namespace geode
} // namespace apache