blob: 46cfab35a0a1cb6279df9ae3170c4ec6abb8e6dd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use super::*;
use teaclave_proto::teaclave_storage_service::TeaclaveStorageClient;
use teaclave_rpc::transport::Channel;
use teaclave_types::{Entry, EntryBuilder};
use std::sync::{Arc, Mutex};
use anyhow::{anyhow, Result};
use tantivy::{
collector::TopDocs, query::QueryParser, schema::*, DateTime, Index, IndexReader, IndexSettings,
IndexSortByField, IndexWriter, Order, ReloadPolicy,
};
#[derive(Clone)]
pub struct Auditor {
index: Arc<Mutex<Index>>,
reader: Arc<Mutex<IndexReader>>,
writer: Arc<Mutex<IndexWriter>>,
}
impl Auditor {
pub fn try_new(
storage: Arc<tokio::sync::Mutex<TeaclaveStorageClient<Channel>>>,
) -> Result<Self> {
let directory = db_directory::DbDirectory::new(storage);
let schema = Self::log_schema();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "date".to_string(),
order: Order::Desc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.open_or_create(directory)?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommit)
.try_into()?;
// 8 is the max thread number of tantivy writer
let writer = index.writer(8 * 3_000_000)?;
let index = Arc::new(Mutex::new(index));
let reader = Arc::new(Mutex::new(reader));
let writer = Arc::new(Mutex::new(writer));
Ok(Self {
index,
reader,
writer,
})
}
pub fn add_logs(&self, logs: Vec<Entry>) -> Result<()> {
let mut writer = self.writer.lock().unwrap();
for log in logs {
let document = Self::convert_to_doc(log);
writer.add_document(document)?;
}
writer.commit()?;
Ok(())
}
/// query: the query for tantivy
/// limit: maximum number of the returned logs
pub fn query_logs(&self, query: &str, limit: usize) -> Result<Vec<Entry>> {
let reader = self.reader.lock().unwrap();
let searcher = reader.searcher();
drop(reader);
let index = self.index.lock().unwrap();
let schema = Self::log_schema();
let message = schema.get_field("message").unwrap();
let date = schema.get_field("date").unwrap();
let query_parser = QueryParser::for_index(&index, vec![message]);
let query = query_parser.parse_query(query)?;
let top_docs = searcher.search(
&query,
&TopDocs::with_limit(limit).order_by_fast_field::<DateTime>(date),
)?;
let mut entries = Vec::new();
for (_, doc_address) in top_docs {
let retrieved_doc = searcher.doc(doc_address)?;
let entry = Self::try_convert_to_entry(retrieved_doc)?;
entries.push(entry);
}
Ok(entries)
}
pub(crate) fn try_convert_to_entry(doc: Document) -> Result<Entry> {
let schema = Self::log_schema();
let date = schema.get_field("date").unwrap();
let ip = schema.get_field("ip").unwrap();
let user = schema.get_field("user").unwrap();
let message = schema.get_field("message").unwrap();
let result = schema.get_field("result").unwrap();
let date = doc
.get_first(date)
.and_then(|d| d.as_date())
.ok_or_else(|| anyhow!("failed to get date"))?;
let ip = doc
.get_first(ip)
.and_then(|i| i.as_ip_addr())
.ok_or_else(|| anyhow!("failed to get ip"))?;
let user = doc
.get_first(user)
.and_then(|u| u.as_text())
.ok_or_else(|| anyhow!("failed to get user"))?;
let message = doc
.get_first(message)
.and_then(|m| m.as_text())
.ok_or_else(|| anyhow!("failed to get message"))?;
let result = doc
.get_first(result)
.and_then(|r| r.as_bool())
.ok_or_else(|| anyhow!("failed to get result"))?;
let microsecond = date.into_timestamp_micros();
let entry = EntryBuilder::new()
.microsecond(microsecond)
.ip(ip)
.user(user.to_owned())
.message(message.to_owned())
.result(result)
.build();
Ok(entry)
}
pub(crate) fn convert_to_doc(entry: Entry) -> Document {
let schema = Self::log_schema();
let date = schema.get_field("date").unwrap();
let ip = schema.get_field("ip").unwrap();
let user = schema.get_field("user").unwrap();
let message = schema.get_field("message").unwrap();
let result = schema.get_field("result").unwrap();
let date_v = DateTime::from_timestamp_micros(entry.datetime().timestamp_micros());
let mut doc = Document::default();
doc.add_date(date, date_v);
doc.add_ip_addr(ip, entry.ip());
doc.add_text(user, &entry.user());
doc.add_text(message, &entry.message());
doc.add_bool(result, entry.result());
doc
}
pub(crate) fn log_schema() -> Schema {
let mut builder = Schema::builder();
builder.add_date_field("date", INDEXED | FAST | STORED);
builder.add_ip_addr_field("ip", INDEXED | STORED);
builder.add_text_field("user", TEXT | STORED);
builder.add_text_field("message", TEXT | STORED);
builder.add_bool_field("result", INDEXED | STORED);
builder.build()
}
}