blob: f2637bd9de7e2f759e4417e539c547be31eed410 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "leveldb_service.h"
#include "leveldb_sparql.h"
#include <unordered_set>
#include <model/rdf_operators.h>
#include <util/unique.h>
#include <util/time_logger.h>
#include <glog/logging.h>
using grpc::Status;
using grpc::StatusCode;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerReader;
using grpc::ServerWriter;
using google::protobuf::Int64Value;
using google::protobuf::Message;
using google::protobuf::Empty;
using marmotta::rdf::proto::Statement;
using marmotta::rdf::proto::Namespace;
using marmotta::rdf::proto::Resource;
using marmotta::service::proto::ContextRequest;
using marmotta::persistence::sparql::LevelDBTripleSource;
using marmotta::sparql::SparqlService;
using marmotta::sparql::TripleSource;
namespace marmotta {
namespace service {
// A STL iterator wrapper around a client reader.
template <class Proto>
class ReaderIterator : public util::CloseableIterator<Proto> {
public:
ReaderIterator(grpc::ServerReader<Proto>* r) : reader(r) {
// Immediately move to first element.
finished = !reader->Read(&next_);
}
const Proto& next() override {
current_.Swap(&next_);
if (!finished) {
finished = !reader->Read(&next_);
}
return current_;
}
const Proto& current() const override {
return current_;
}
bool hasNext() override {
return !finished;
}
private:
grpc::ServerReader<Proto>* reader;
Proto current_;
Proto next_;
bool finished;
};
typedef ReaderIterator<rdf::proto::Statement> StatementIterator;
typedef ReaderIterator<rdf::proto::Namespace> NamespaceIterator;
typedef ReaderIterator<service::proto::UpdateRequest> UpdateIterator;
Status LevelDBService::AddNamespaces(
ServerContext* context, ServerReader<Namespace>* reader, Int64Value* result) {
auto it = NamespaceIterator(reader);
int64_t count = persistence->AddNamespaces(it);
result->set_value(count);
return Status::OK;
}
grpc::Status LevelDBService::GetNamespace(
ServerContext *context, const rdf::proto::Namespace *pattern, Namespace *result) {
Status status(StatusCode::NOT_FOUND, "Namespace not found");
persistence->GetNamespaces(*pattern, [&result, &status](const Namespace &r) -> bool {
*result = r;
status = Status::OK;
return true;
});
return status;
}
grpc::Status LevelDBService::GetNamespaces(
ServerContext *context, const Empty *ignored, ServerWriter<Namespace> *result) {
Namespace pattern; // empty pattern
persistence->GetNamespaces(pattern, [&result](const Namespace &r) -> bool {
return result->Write(r);
});
return Status::OK;
}
Status LevelDBService::AddStatements(
ServerContext* context, ServerReader<Statement>* reader, Int64Value* result) {
util::TimeLogger timeLogger("Adding statements");
auto it = StatementIterator(reader);
int64_t count = persistence->AddStatements(it);
result->set_value(count);
return Status::OK;
}
Status LevelDBService::GetStatements(
ServerContext* context, const Statement* pattern, ServerWriter<Statement>* result) {
util::TimeLogger timeLogger("Retrieving statements");
persistence->GetStatements(*pattern, [&result](const Statement& stmt) -> bool {
return result->Write(stmt);
});
return Status::OK;
}
Status LevelDBService::RemoveStatements(
ServerContext* context, const Statement* pattern, Int64Value* result) {
util::TimeLogger timeLogger("Removing statements");
int64_t count = persistence->RemoveStatements(*pattern);
result->set_value(count);
return Status::OK;
}
Status LevelDBService::Clear(
ServerContext* context, const ContextRequest* contexts, Int64Value* result) {
util::TimeLogger timeLogger("Clearing contexts");
int64_t count = 0;
Statement pattern;
if (contexts->context_size() > 0) {
for (const Resource &r : contexts->context()) {
pattern.mutable_context()->CopyFrom(r);
count += persistence->RemoveStatements(pattern);
}
} else {
count += persistence->RemoveStatements(pattern);
}
result->set_value(count);
return Status::OK;
}
Status LevelDBService::Size(
ServerContext* context, const ContextRequest* contexts, Int64Value* result) {
util::TimeLogger timeLogger("Computing context size");
int64_t count = 0;
if (contexts->context_size() > 0) {
Statement pattern;
for (const Resource &r : contexts->context()) {
pattern.mutable_context()->CopyFrom(r);
persistence->GetStatements(pattern, [&count](const Statement& stmt) -> bool {
count++;
return true;
});
}
} else {
count = persistence->Size();
}
result->set_value(count);
return Status::OK;
}
grpc::Status LevelDBService::GetContexts(
ServerContext *context, const Empty *ignored, ServerWriter<Resource> *result) {
util::TimeLogger timeLogger("Retrieving contexts");
// Currently we need to iterate over all statements and collect the results.
Statement pattern;
std::unordered_set<Resource> contexts;
persistence->GetStatements(pattern, [&contexts](const Statement& stmt) -> bool {
if (stmt.has_context()) {
contexts.insert(stmt.context());
}
return true;
});
for (auto c : contexts) {
result->Write(c);
}
return Status::OK;
}
grpc::Status LevelDBService::Update(grpc::ServerContext *context,
grpc::ServerReader<service::proto::UpdateRequest> *reader,
service::proto::UpdateResponse *result) {
util::TimeLogger timeLogger("Updating database");
auto it = UpdateIterator(reader);
persistence::UpdateStatistics stats = persistence->Update(it);
result->set_added_namespaces(stats.added_ns);
result->set_removed_namespaces(stats.removed_ns);
result->set_added_statements(stats.added_stmts);
result->set_removed_statements(stats.removed_stmts);
return Status::OK;
}
grpc::Status LevelDBSparqlService::TupleQuery(
grpc::ServerContext* context, const spq::SparqlRequest* query,
grpc::ServerWriter<spq::SparqlResponse>* result) {
SparqlService svc(util::make_unique<LevelDBTripleSource>(persistence));
rdf::URI base_uri = query->base_uri();
try {
svc.TupleQuery(query->query(), base_uri,
[&result](const SparqlService::RowType& row) {
spq::SparqlResponse response;
for (auto it = row.cbegin(); it != row.cend(); it++) {
auto b = response.add_binding();
b->set_variable(it->first);
*b->mutable_value() = it->second.getMessage();
}
return result->Write(response);
});
return Status::OK;
} catch (sparql::SparqlException e) {
LOG(ERROR) << "SPARQL execution failed: " << e.what();
return Status(StatusCode::INVALID_ARGUMENT, e.what());
}
}
grpc::Status LevelDBSparqlService::GraphQuery(grpc::ServerContext* context,
const spq::SparqlRequest* query,
grpc::ServerWriter<rdf::proto::Statement>* result) {
SparqlService svc(util::make_unique<LevelDBTripleSource>(persistence));
rdf::URI base_uri = query->base_uri();
try {
svc.GraphQuery(query->query(), base_uri,
[&result](const rdf::Statement& triple) {
return result->Write(triple.getMessage());
});
return Status::OK;
} catch (sparql::SparqlException e) {
LOG(ERROR) << "SPARQL execution failed: " << e.what();
return Status(StatusCode::INVALID_ARGUMENT, e.what());
}
}
grpc::Status LevelDBSparqlService::AskQuery(grpc::ServerContext* context,
const spq::SparqlRequest* query,
google::protobuf::BoolValue* result) {
SparqlService svc(util::make_unique<LevelDBTripleSource>(persistence));
rdf::URI base_uri = query->base_uri();
try {
result->set_value(svc.AskQuery(query->query(), base_uri));
return Status::OK;
} catch (sparql::SparqlException e) {
LOG(ERROR) << "SPARQL execution failed: " << e.what();
return Status(StatusCode::INVALID_ARGUMENT, e.what());
}
}
} // namespace service
} // namespace marmotta