blob: 6e0f3c2ccbb7a980ea6670d145a6b0bf449052e4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file references
// https://github.com/quickwit-oss/tantivy/blob/main/src/directory/ram_directory.rs
use teaclave_proto::teaclave_storage_service::{
DeleteRequest, GetRequest, PutRequest, TeaclaveStorageClient,
};
use teaclave_rpc::transport::Channel;
use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};
use std::{fmt, result};
use tantivy::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use tantivy::directory::{
AntiCallToken, Directory, FileHandle, FileSlice, TerminatingWrite, WatchCallback,
WatchCallbackList, WatchHandle, WritePtr,
};
use tokio::runtime::{Builder, Runtime};
use tokio::sync::Mutex;
pub static META_FILEPATH: LazyLock<&'static Path> = LazyLock::new(|| Path::new("meta.json"));
pub static DB_PREFIX: LazyLock<String> = LazyLock::new(|| String::from("tantivy/"));
static INDEX_WRITER_LOCK: LazyLock<&'static Path> =
LazyLock::new(|| Path::new(".tantivy-writer.lock"));
struct Cache {
path: PathBuf,
shared_directory: DbDirectory,
data: Cursor<Vec<u8>>,
is_flushed: bool,
}
impl Cache {
fn new(path_buf: PathBuf, shared_directory: DbDirectory) -> Self {
Cache {
path: path_buf,
data: Cursor::new(Vec::new()),
shared_directory,
is_flushed: true,
}
}
}
impl Drop for Cache {
fn drop(&mut self) {
if !self.is_flushed {
let _ = self.flush();
}
}
}
impl Seek for Cache {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.data.seek(pos)
}
}
impl Write for Cache {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.is_flushed = false;
self.data.write_all(buf)?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.shared_directory
.write(&self.path, self.data.get_ref())?;
self.is_flushed = true;
Ok(())
}
}
impl TerminatingWrite for Cache {
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
self.flush()
}
}
/// A Directory storing everything in the storage service.
#[derive(Clone)]
pub struct DbDirectory {
db: Arc<Mutex<TeaclaveStorageClient<Channel>>>,
watch_router: Arc<WatchCallbackList>,
rt: Arc<Runtime>,
}
impl fmt::Debug for DbDirectory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DbDirectory")
}
}
impl DbDirectory {
pub fn new(db: Arc<Mutex<TeaclaveStorageClient<Channel>>>) -> Self {
let rt = Arc::new(Builder::new_current_thread().enable_all().build().unwrap());
let dir = Self {
db,
watch_router: Arc::default(),
rt,
};
// remove the lockfile if it exists
let _ = dir.delete(&INDEX_WRITER_LOCK);
dir
}
fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
let key = DB_PREFIX.clone() + &path.to_string_lossy();
let request = PutRequest::new(key.as_bytes(), data);
self.rt
.block_on(self.db.blocking_lock().put(request))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
Ok(())
}
}
impl Directory for DbDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Arc::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
let key = DB_PREFIX.clone() + &path.to_string_lossy();
let request = GetRequest::new(key.as_bytes());
self.rt
.block_on(self.db.blocking_lock().get(request))
.map_err(|_| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
.map(|r| FileSlice::from(r.into_inner().value))
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
let key = DB_PREFIX.clone() + &path.to_string_lossy();
let request = DeleteRequest::new(key.as_bytes());
self.rt
.block_on(self.db.blocking_lock().delete(request))
.map_err(|_| DeleteError::FileDoesNotExist(PathBuf::from(path)))?;
Ok(())
}
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
let key = DB_PREFIX.clone() + &path.to_string_lossy();
let request = GetRequest::new(key.as_bytes());
let get = self.rt.block_on(self.db.blocking_lock().get(request));
Ok(get.is_ok())
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
let cache = Cache::new(path.to_owned(), self.clone());
let exists = self.exists(path).unwrap();
// force the creation of the file to mimic the MMap directory.
if exists {
self.write(path, &[])
.map_err(|io_error| OpenWriteError::IoError {
io_error: Arc::new(io_error),
filepath: PathBuf::from(path),
})?;
Err(OpenWriteError::FileAlreadyExists(PathBuf::from(path)))
} else {
Ok(BufWriter::new(Box::new(cache)))
}
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let bytes =
self.open_read(path)?
.read_bytes()
.map_err(|io_error| OpenReadError::IoError {
io_error: Arc::new(io_error),
filepath: PathBuf::from(path),
})?;
Ok(bytes.as_slice().to_owned())
}
fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
self.write(path, data)?;
if path == *META_FILEPATH {
drop(self.watch_router.broadcast());
}
Ok(())
}
fn watch(&self, watch_callback: WatchCallback) -> tantivy::Result<WatchHandle> {
Ok(self.watch_router.subscribe(watch_callback))
}
fn sync_directory(&self) -> io::Result<()> {
Ok(())
}
}