blob: b064284f1996841447394c9e2b2566264592e505 [file] [log] [blame]
# SPDX-License-Identifier: Apache-2.0
import json
import QdrantUtils
import QueryUtils
from EmbeddingUtils import (
create_embedding_service,
)
from langchain.vectorstores.qdrant import Qdrant
from nifiapi.documentation import use_case
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import (
ExpressionLanguageScope,
PropertyDescriptor,
StandardValidators,
)
from qdrant_client import QdrantClient
@use_case(
description="Semantically search for documents stored in Qdrant - https://qdrant.tech/",
keywords=["qdrant", "embedding", "vector", "text", "vectorstore", "search"],
configuration="""
Configure 'Collection Name' to the name of the Qdrant collection to use.
Configure 'Qdrant URL' to the fully qualified URL of the Qdrant instance.
Configure 'Qdrant API Key' to the API Key to use in order to authenticate with Qdrant.
Configure 'Prefer gRPC' to True if you want to use gRPC for interfacing with Qdrant.
Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) while interfacing with Qdrant.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure 'HuggingFace API Key' or 'OpenAI API Key', depending on the chosen Embedding Model.
Configure 'HuggingFace Model' or 'OpenAI Model' to the name of the model to use.
Configure 'Query' to the text of the query to send to Qdrant.
Configure 'Number of Results' to the number of results to return from Qdrant.
Configure 'Metadata Filter' to apply an optional metadata filter with the query. For example: { "author": "john.doe" }
Configure 'Output Strategy' to indicate how the output should be formatted: 'Row-Oriented', 'Text', or 'Column-Oriented'.
Configure 'Results Field' to the name of the field to insert the results, if the input FlowFile is JSON Formatted,.
Configure 'Include Metadatas' to True if metadata should be included in the output.
Configure 'Include Distances' to True if distances should be included in the output.
""",
)
class QueryQdrant(FlowFileTransform):
class Java:
implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
class ProcessorDetails:
version = "2.0.0.dev0"
description = "Queries Qdrant in order to gather a specified number of documents that are most closely related to the given query."
tags = [
"qdrant",
"vector",
"vectordb",
"vectorstore",
"embeddings",
"ai",
"artificial intelligence",
"ml",
"machine learning",
"text",
"LLM",
]
QUERY = PropertyDescriptor(
name="Query",
description="The text of the query to send to Qdrant.",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
NUMBER_OF_RESULTS = PropertyDescriptor(
name="Number of Results",
description="The number of results to return from Qdrant.",
required=True,
validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
default_value="10",
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
FILTER = PropertyDescriptor(
name="Metadata Filter",
description='Optional metadata filter to apply with the query. For example: { "author": "john.doe" }',
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
properties = (
QdrantUtils.QDRANT_PROPERTIES
+ QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+ [
QUERY,
FILTER,
NUMBER_OF_RESULTS,
QueryUtils.OUTPUT_STRATEGY,
QueryUtils.RESULTS_FIELD,
QueryUtils.INCLUDE_METADATAS,
QueryUtils.INCLUDE_DISTANCES,
]
)
embeddings = None
query_utils = None
client = None
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.properties
def onScheduled(self, context):
self.client = QdrantClient(
url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
)
self.embeddings = create_embedding_service(context)
self.query_utils = QueryUtils.QueryUtils(context)
def transform(self, context, flowfile):
collection_name = (
context.getProperty(QdrantUtils.COLLECTION_NAME).evaluateAttributeExpressions(flowfile).getValue()
)
query = context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
num_results = context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger()
filter_definition = context.getProperty(self.FILTER).evaluateAttributeExpressions(flowfile).getValue()
vector_store = Qdrant(
client=self.client,
collection_name=collection_name,
embeddings=self.embeddings,
)
results = vector_store.similarity_search_with_score(
query=query,
k=num_results,
filter=None if filter_definition is None else json.loads(filter_definition),
)
documents = []
for result in results:
documents.append(result[0].page_content)
if context.getProperty(QueryUtils.INCLUDE_METADATAS).asBoolean():
metadatas = []
for result in results:
metadatas.append(result[0].metadata)
else:
metadatas = None
if context.getProperty(QueryUtils.INCLUDE_DISTANCES).asBoolean():
distances = []
for result in results:
distances.append(result[1])
else:
distances = None
(output_contents, mime_type) = self.query_utils.create_json(
flowfile, documents, metadatas, None, distances, None
)
attributes = {"mime.type": mime_type}
return FlowFileTransformResult(relationship="success", contents=output_contents, attributes=attributes)