blob: 012bbf08c259f6c9129ee0a461cc7fae8881402c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Implementation of a proxy service doing hash-based sharding of statements
* for storage and retrieval. The shards are passed as vector of host:port
* pairs to the constructor.
*/
#ifndef MARMOTTA_SHARDING_H
#define MARMOTTA_SHARDING_H
#include <vector>
#include <string>
#include <grpc/grpc.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
#include <google/protobuf/wrappers.pb.h>
#include "service/sail.pb.h"
#include "service/sail.grpc.pb.h"
#include "model/model.pb.h"
namespace marmotta {
namespace sharding {
namespace svc = marmotta::service::proto;
/**
* Implementation of a proxy service doing hash-based sharding of statements
* for storage and retrieval. The shards are passed as vector of host:port
* pairs to the constructor.
*/
class ShardingService : public svc::SailService::Service {
public:
/**
* Instantiate new sharding service, connecting to the backends provided
* as argument (vector of host:port pairs).
*/
ShardingService(std::vector<std::string> backends);
/**
* Add namespaces. Since namespaces are potentially needed in all backends,
* they will be added to all.
*/
grpc::Status AddNamespaces(grpc::ServerContext* context,
grpc::ServerReader<rdf::proto::Namespace>* reader,
google::protobuf::Int64Value* result) override;
/**
* Get the namespace matching the pattern using a random server.
*/
grpc::Status GetNamespace(grpc::ServerContext* context,
const rdf::proto::Namespace* pattern,
rdf::proto::Namespace* result) override;
/**
* Get all namespaces matching the pattern using a random server.
*/
grpc::Status GetNamespaces(grpc::ServerContext* context,
const google::protobuf::Empty* ignored,
grpc::ServerWriter<rdf::proto::Namespace>* result) override;
/**
* Add a sequence of statements. Computes a hash over the serialized
* proto message modulo the number of backends to determine which backend
* to write to.
*/
grpc::Status AddStatements(grpc::ServerContext* context,
grpc::ServerReader<rdf::proto::Statement>* reader,
google::protobuf::Int64Value* result) override;
/**
* Retrieve statements matching a certain pattern. Queries all backends in
* parallel and multiplexes the results.
*/
grpc::Status GetStatements(grpc::ServerContext* context,
const rdf::proto::Statement* pattern,
grpc::ServerWriter<rdf::proto::Statement>* result) override;
/**
* Remove statements matching a certain pattern. Forwards the request to
* all backends in parallel.
*/
grpc::Status RemoveStatements(grpc::ServerContext* context,
const rdf::proto::Statement* pattern,
google::protobuf::Int64Value* result) override;
/**
* Process a sequence of updates. For statement updates, computes a hash over the
* serialized proto message modulo the number of backends to determine which backend
* to write to. For namespace updates, writes to all backends.
*/
grpc::Status Update(grpc::ServerContext* context,
grpc::ServerReader<service::proto::UpdateRequest>* reader,
service::proto::UpdateResponse* result) override;
/**
* Retrieve contexts from all backends.
*/
grpc::Status GetContexts(grpc::ServerContext* context,
const google::protobuf::Empty* ignored,
grpc::ServerWriter<rdf::proto::Resource>* result) override;
/**
* Clear all statements matching the given context request. Forwards the
* request to all backends in parallel.
*/
grpc::Status Clear(grpc::ServerContext* context,
const svc::ContextRequest* contexts,
google::protobuf::Int64Value* result) override;
/**
* Get the size of the combined repository. Forwards the request to all
* backends in parallel and adds the results.
*/
grpc::Status Size(grpc::ServerContext* context,
const svc::ContextRequest* contexts,
google::protobuf::Int64Value* result) override;
using StubType = std::unique_ptr<svc::SailService::Stub>;
using StubList = std::vector<StubType>;
using ChannelType = std::shared_ptr<grpc::Channel>;
using ChannelList = std::vector<ChannelType>;
template <class T>
using Writer = std::unique_ptr<grpc::ClientWriter<T>>;
template <class T>
using WriterList = std::vector<Writer<T>>;
private:
// Vector holding the RPC stubs to the backends.
std::vector<std::string> backends;
// Keep a list of channels open, initialised on construction.
ChannelList channels;
// Hash function, computed over binary representation of statement message,
// modulo the number of backends.
std::hash<std::string> hash_fn;
// Make a stub for the backend with the given index.
StubType makeStub(int backend);
};
} // namespace sharding
} // namespace marmotta
#endif //MARMOTTA_SHARDING_H