blob: 70bdb28a460891b0c7b6a6f699e3ce9958386159 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Builder to build full-text search."""
from abc import ABC, abstractmethod
from typing import Optional
from pypaimon.globalindex.global_index_result import GlobalIndexResult
from pypaimon.table.source.full_text_read import FullTextRead, FullTextReadImpl
from pypaimon.table.source.full_text_scan import FullTextScan, FullTextScanImpl
class FullTextSearchBuilder(ABC):
"""Builder to build full-text search."""
@abstractmethod
def with_limit(self, limit: int) -> 'FullTextSearchBuilder':
"""The top k results to return."""
pass
@abstractmethod
def with_text_column(self, name: str) -> 'FullTextSearchBuilder':
"""The text column to search."""
pass
@abstractmethod
def with_query_text(self, query_text: str) -> 'FullTextSearchBuilder':
"""The query text to search."""
pass
@abstractmethod
def new_full_text_scan(self) -> FullTextScan:
"""Create full-text scan to scan index files."""
pass
@abstractmethod
def new_full_text_read(self) -> FullTextRead:
"""Create full-text read to read index files."""
pass
def execute_local(self) -> GlobalIndexResult:
"""Execute full-text index search in local."""
return self.new_full_text_read().read_plan(self.new_full_text_scan().scan())
class FullTextSearchBuilderImpl(FullTextSearchBuilder):
"""Implementation for FullTextSearchBuilder."""
def __init__(self, table: 'FileStoreTable'):
self._table = table
self._limit: int = 0
self._text_column: Optional['DataField'] = None
self._query_text: Optional[str] = None
def with_limit(self, limit: int) -> 'FullTextSearchBuilder':
self._limit = limit
return self
def with_text_column(self, name: str) -> 'FullTextSearchBuilder':
field_dict = {f.name: f for f in self._table.fields}
if name not in field_dict:
raise ValueError(f"Text column '{name}' not found in table schema")
self._text_column = field_dict[name]
return self
def with_query_text(self, query_text: str) -> 'FullTextSearchBuilder':
self._query_text = query_text
return self
def new_full_text_scan(self) -> FullTextScan:
if self._text_column is None:
raise ValueError("Text column must be set via with_text_column()")
return FullTextScanImpl(self._table, self._text_column)
def new_full_text_read(self) -> FullTextRead:
if self._limit <= 0:
raise ValueError("Limit must be positive, set via with_limit()")
if self._text_column is None:
raise ValueError("Text column must be set via with_text_column()")
if self._query_text is None:
raise ValueError("Query text must be set via with_query_text()")
return FullTextReadImpl(
self._table, self._limit, self._text_column, self._query_text
)