blob: 4097b4efa8785af11e5b316ccc585c6f801773c8 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "skyhook/protocol/rados_protocol.h"
#include <sys/stat.h>
#include <sstream>
#include "arrow/compute/expression.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/type.h"
#define SCAN_UNKNOWN_ERR_MSG "something went wrong while scanning file fragment"
#define SCAN_ERR_CODE 25
#define SCAN_ERR_MSG "failed to scan file fragment"
#define SCAN_REQ_DESER_ERR_CODE 26
#define SCAN_REQ_DESER_ERR_MSG "failed to deserialize scan request"
#define SCAN_RES_SER_ERR_CODE 27
#define SCAN_RES_SER_ERR_MSG "failed to serialize result table"
namespace skyhook {
/// An enum to represent the different
/// types of file formats that Skyhook supports.
struct SkyhookFileType {
enum type { PARQUET, IPC };
};
/// A struct encapsulating all the parameters
/// required to be serialized in the form of flatbuffers for
/// sending to the cls.
struct ScanRequest {
arrow::compute::Expression filter_expression;
arrow::compute::Expression partition_expression;
std::shared_ptr<arrow::Schema> projection_schema;
std::shared_ptr<arrow::Schema> dataset_schema;
int64_t file_size;
SkyhookFileType::type file_format;
};
/// Utility functions to serialize and deserialize scan requests and result Arrow tables.
arrow::Status SerializeScanRequest(ScanRequest& req, ceph::bufferlist* bl);
arrow::Status DeserializeScanRequest(ceph::bufferlist& bl, ScanRequest* req);
arrow::Status SerializeTable(const std::shared_ptr<arrow::Table>& table,
ceph::bufferlist* bl);
arrow::Status DeserializeTable(ceph::bufferlist& bl, bool use_threads,
arrow::RecordBatchVector* batches);
/// Utility function to invoke a RADOS object class function on an RADOS object.
arrow::Status ExecuteObjectClassFn(const std::shared_ptr<rados::RadosConn>& connection,
const std::string& oid, const std::string& fn,
ceph::bufferlist& in, ceph::bufferlist& out);
/// An interface for translating the name of a file in CephFS to its
/// corresponding object ID in RADOS assuming 1:1 mapping between a file
/// and it's underlying object.
class SkyhookDirectObjectAccess {
public:
explicit SkyhookDirectObjectAccess(std::shared_ptr<rados::RadosConn> connection)
: connection_(std::move(connection)) {}
~SkyhookDirectObjectAccess() = default;
/// Execute a POSIX stat on a file.
arrow::Status Stat(const std::string& path, struct stat& st) {
struct stat file_st;
if (stat(path.c_str(), &file_st) < 0)
return arrow::Status::Invalid("stat returned non-zero exit code.");
st = file_st;
return arrow::Status::OK();
}
/// Convert a file inode to RADOS object ID.
std::string ConvertInodeToOID(uint64_t inode) {
std::stringstream ss;
/// In Ceph, the underlying stripes that make up a file are
/// named in the format [hex(inode)].[8-bit-binary(stripe_index)].
ss << std::hex << inode;
/// Since in Skyhook, we ensure a single stripe per file,
/// we can assume the stripe index to be always 0 and hence
/// hardcode it's 8-bit binary form.
std::string oid(ss.str() + ".00000000");
return oid;
}
/// Execute an object class method. It uses the `librados::exec` api to
/// perform object class method calls on the storage node and
/// stores the result in an output bufferlist.
arrow::Status Exec(uint64_t inode, const std::string& fn, ceph::bufferlist& in,
ceph::bufferlist& out) {
std::string oid = ConvertInodeToOID(inode);
return ExecuteObjectClassFn(connection_, oid, fn, in, out);
}
private:
std::shared_ptr<rados::RadosConn> connection_;
};
} // namespace skyhook