blob: f9289a6f986f0687ebfefe4519de8d4307644d8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <ignite/binary/binary_raw_reader.h>
#include <ignite/thin/cache/cache_peek_mode.h>
#include <ignite/impl/thin/writable.h>
#include <ignite/impl/thin/readable.h>
#include <ignite/impl/thin/cache/continuous/continuous_query_client_holder.h>
#include "impl/response_status.h"
#include "impl/data_channel.h"
#include "impl/message.h"
namespace ignite
{
/**
* Client platform codes.
*/
struct ClientPlatform
{
enum Type
{
UNKNOWN = 0,
JAVA = 1,
CPP = 3
};
};
namespace impl
{
namespace thin
{
CachePartitionsRequest::CachePartitionsRequest(const std::vector<int32_t>& cacheIds) :
cacheIds(cacheIds)
{
// No-op.
}
void ResourceCloseRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext&) const
{
writer.WriteInt64(id);
}
void CachePartitionsRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext&) const
{
writer.WriteInt32(static_cast<int32_t>(cacheIds.size()));
for (size_t i = 0; i < cacheIds.size(); ++i)
writer.WriteInt32(cacheIds[i]);
}
GetOrCreateCacheWithNameRequest::GetOrCreateCacheWithNameRequest(const std::string& name) :
name(name)
{
// No-op.
}
void GetOrCreateCacheWithNameRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext&) const
{
writer.WriteString(name);
}
CreateCacheWithNameRequest::CreateCacheWithNameRequest(const std::string& name) :
name(name)
{
// No-op.
}
void CreateCacheWithNameRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext&) const
{
writer.WriteString(name);
}
Response::Response():
flags(),
status(ResponseStatus::FAILED)
{
// No-op.
}
Response::~Response()
{
// No-op.
}
void Response::Read(binary::BinaryReaderImpl& reader, const ProtocolContext& context)
{
if (context.IsFeatureSupported(VersionFeature::PARTITION_AWARENESS))
{
flags = reader.ReadInt16();
if (IsAffinityTopologyChanged())
topologyVersion.Read(reader);
if (!IsFailure())
{
status = ResponseStatus::SUCCESS;
ReadOnSuccess(reader, context);
return;
}
}
status = reader.ReadInt32();
if (status == ResponseStatus::SUCCESS)
ReadOnSuccess(reader, context);
else
reader.ReadString(error);
}
bool Response::IsAffinityTopologyChanged() const
{
return (flags & Flag::AFFINITY_TOPOLOGY_CHANGED) != 0;
}
bool Response::IsFailure() const
{
return (flags & Flag::FAILURE) != 0;
}
CachePartitionsResponse::CachePartitionsResponse(std::vector<PartitionAwarenessGroup>& groups) :
groups(groups)
{
// No-op.
}
CachePartitionsResponse::~CachePartitionsResponse()
{
// No-op.
}
void CachePartitionsResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
topologyVersion.Read(reader);
int32_t groupsNum = reader.ReadInt32();
groups.clear();
groups.resize(static_cast<size_t>(groupsNum));
for (int32_t i = 0; i < groupsNum; ++i)
groups[i].Read(reader);
}
CacheValueResponse::CacheValueResponse(Readable& value) :
value(value)
{
// No-op.
}
CacheValueResponse::~CacheValueResponse()
{
// No-op.
}
void CacheValueResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
value.Read(reader);
}
void BinaryTypeGetRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext&) const
{
writer.WriteInt32(typeId);
}
void BinaryTypePutRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext&) const
{
writer.WriteInt32(snapshot.GetTypeId());
writer.WriteString(snapshot.GetTypeName());
const std::string& affFieldName = snapshot.GetAffinityFieldName();
if (affFieldName.empty())
writer.WriteNull();
else
writer.WriteString(affFieldName);
const binary::Snap::FieldMap& fields = snapshot.GetFieldMap();
writer.WriteInt32(static_cast<int32_t>(fields.size()));
for (binary::Snap::FieldMap::const_iterator it = fields.begin(); it != fields.end(); ++it)
{
writer.WriteString(it->first);
writer.WriteInt32(it->second.GetTypeId());
writer.WriteInt32(it->second.GetFieldId());
}
// Is enum: always false for now as we do not support enums.
writer.WriteBool(false);
// Schemas. Compact schema is not supported for now.
writer.WriteInt32(0);
}
void BinaryTypeGetResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
int32_t typeId = reader.ReadInt32();
std::string typeName;
reader.ReadString(typeName);
std::string affKeyFieldName;
reader.ReadString(affKeyFieldName);
snapshot = binary::SPSnap(new binary::Snap(typeName, affKeyFieldName, typeId));
int32_t fieldsNum = reader.ReadInt32();
for (int32_t i = 0; i < fieldsNum; ++i)
{
std::string fieldName;
reader.ReadString(fieldName);
int32_t fieldTypeId = reader.ReadInt32();
int32_t fieldId = reader.ReadInt32();
snapshot.Get()->AddField(fieldId, fieldName, fieldTypeId);
}
// Check if the type is enum.
bool isEnum = reader.ReadBool();
if (isEnum)
throw IgniteError(IgniteError::IGNITE_ERR_BINARY, "Enum types is not supported.");
// Ignoring schemas for now.
}
void DestroyCacheRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext&) const
{
writer.WriteInt32(cacheId);
}
void GetCacheNamesResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
int32_t len = reader.ReadInt32();
cacheNames.reserve(static_cast<size_t>(len));
for (int32_t i = 0; i < len; i++)
{
std::string res;
reader.ReadString(res);
cacheNames.push_back(res);
}
}
void BoolResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
value = reader.ReadBool();
}
CacheGetSizeRequest::CacheGetSizeRequest(int32_t cacheId, bool binary, int32_t peekModes) :
CacheRequest<MessageType::CACHE_GET_SIZE>(cacheId, binary),
peekModes(peekModes)
{
// No-op.
}
void CacheGetSizeRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext& context) const
{
CacheRequest<MessageType::CACHE_GET_SIZE>::Write(writer, context);
if (peekModes & ignite::thin::cache::CachePeekMode::ALL)
{
// Size.
writer.WriteInt32(1);
writer.WriteInt8(0);
return;
}
interop::InteropOutputStream* stream = writer.GetStream();
// Reserve size.
int32_t sizePos = stream->Reserve(4);
if (peekModes & ignite::thin::cache::CachePeekMode::NEAR_CACHE)
stream->WriteInt8(1);
if (peekModes & ignite::thin::cache::CachePeekMode::PRIMARY)
stream->WriteInt8(2);
if (peekModes & ignite::thin::cache::CachePeekMode::BACKUP)
stream->WriteInt8(3);
if (peekModes & ignite::thin::cache::CachePeekMode::ONHEAP)
stream->WriteInt8(4);
if (peekModes & ignite::thin::cache::CachePeekMode::OFFHEAP)
stream->WriteInt8(5);
int32_t size = stream->Position() - sizePos - 4;
stream->WriteInt32(sizePos, size);
stream->Synchronize();
}
void Int64Response::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
value = reader.ReadInt64();
}
void Int32Response::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
value = reader.ReadInt32();
}
void ScanQueryResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
ignite::binary::BinaryRawReader rawReader(&reader);
cursorId = rawReader.ReadInt64();
cursorPage.Get()->Read(reader);
}
ScanQueryRequest::ScanQueryRequest(int32_t cacheId, const ignite::thin::cache::query::ScanQuery &qry) :
CacheRequest<MessageType::QUERY_SCAN>(cacheId, false),
qry(qry)
{
// No-op.
}
void ScanQueryRequest::Write(binary::BinaryWriterImpl &writer, const ProtocolContext& context) const
{
CacheRequest::Write(writer, context);
// TODO: IGNITE-16995 Implement a RemoteFilter for ScanQuery
writer.WriteNull();
writer.WriteInt32(qry.GetPageSize());
writer.WriteInt32(qry.GetPartition());
writer.WriteBool(qry.IsLocal());
}
SqlFieldsQueryRequest::SqlFieldsQueryRequest(
int32_t cacheId,
const ignite::thin::cache::query::SqlFieldsQuery &qry
) :
CacheRequest<MessageType::QUERY_SQL_FIELDS>(cacheId, false),
qry(qry)
{
// No-op.
}
void SqlFieldsQueryRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext& context) const
{
CacheRequest<MessageType::QUERY_SQL_FIELDS>::Write(writer, context);
if (qry.schema.empty())
writer.WriteNull();
else
writer.WriteString(qry.schema);
writer.WriteInt32(qry.pageSize);
writer.WriteInt32(qry.maxRows);
writer.WriteString(qry.sql);
writer.WriteInt32(static_cast<int32_t>(qry.args.size()));
{
std::vector<impl::thin::CopyableWritable*>::const_iterator it;
for (it = qry.args.begin(); it != qry.args.end(); ++it)
(*it)->Write(writer);
}
writer.WriteInt8(0); // Statement type - Any
writer.WriteBool(qry.distributedJoins);
writer.WriteBool(qry.loc);
writer.WriteBool(false); // Replicated only
writer.WriteBool(qry.enforceJoinOrder);
writer.WriteBool(qry.collocated);
writer.WriteBool(qry.lazy);
writer.WriteInt64(qry.timeout);
writer.WriteBool(true); // Include field names
if (context.IsFeatureSupported(BitmaskFeature::QRY_PARTITIONS_BATCH_SIZE))
{
if (qry.parts.empty())
writer.WriteInt32(-1);
else
{
writer.WriteInt32(static_cast<int32_t>(qry.parts.size()));
for (std::vector<int32_t>::const_iterator it = qry.parts.begin(); it != qry.parts.end(); ++it)
writer.WriteInt32(*it);
}
writer.WriteInt32(qry.updateBatchSize);
}
}
void SqlFieldsQueryResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
ignite::binary::BinaryRawReader rawReader(&reader);
cursorId = rawReader.ReadInt64();
int32_t columnsCnt = rawReader.ReadInt32();
columns.reserve(static_cast<size_t>(columnsCnt));
for (int32_t i = 0; i < columnsCnt; ++i)
{
columns.push_back(rawReader.ReadString());
}
cursorPage.Get()->Read(reader);
}
void QueryCursorGetPageResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
cursorPage.Get()->Read(reader);
}
void ContinuousQueryRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext& context) const
{
CacheRequest<MessageType::QUERY_CONTINUOUS>::Write(writer, context);
writer.WriteInt32(pageSize);
writer.WriteInt64(timeInterval);
writer.WriteBool(includeExpired);
if (!filter)
writer.WriteNull();
else
{
writer.WriteTopObject(filter);
writer.WriteInt8(ClientPlatform::JAVA);
}
}
void ContinuousQueryResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
queryId = reader.ReadInt64();
}
void ComputeTaskExecuteRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolContext&) const
{
// To be changed when Cluster API is implemented.
int32_t nodesNum = 0;
writer.WriteInt32(nodesNum);
writer.WriteInt8(flags);
writer.WriteInt64(timeout);
writer.WriteString(taskName);
arg.Write(writer);
}
void ComputeTaskExecuteResponse::ReadOnSuccess(binary::BinaryReaderImpl&reader, const ProtocolContext&)
{
taskId = reader.ReadInt64();
}
void ComputeTaskFinishedNotification::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
result.Read(reader);
}
void ClientCacheEntryEventNotification::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolContext&)
{
ignite::binary::BinaryRawReader reader0(&reader);
query.ReadAndProcessEvents(reader0);
}
}
}
}