blob: 42e7d7cfe375dcd038ed2632674caca7bd5afe2d [file] [log] [blame]
# SPDX-License-Identifier: Apache-2.0
import json
import QdrantUtils
from EmbeddingUtils import (
create_embedding_service,
)
from langchain.vectorstores.qdrant import Qdrant
from nifiapi.documentation import use_case
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import (
ExpressionLanguageScope,
PropertyDescriptor,
StandardValidators,
)
from qdrant_client.models import Distance
@use_case(
description="Create embeddings that semantically represent text content and upload to Qdrant - https://qdrant.tech/",
notes="This processor assumes that the data has already been formatted in JSONL format with the text to store in Qdrant provided in the 'text' field.",
keywords=["qdrant", "embedding", "vector", "text", "vectorstore", "insert"],
configuration="""
Configure 'Collection Name' to the name of the Qdrant collection to use.
Configure 'Qdrant URL' to the fully qualified URL of the Qdrant instance.
Configure 'Qdrant API Key' to the API Key to use in order to authenticate with Qdrant.
Configure 'Prefer gRPC' to True if you want to use gRPC for interfacing with Qdrant.
Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) while interfacing with Qdrant.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure 'HuggingFace API Key' or 'OpenAI API Key', depending on the chosen Embedding Model.
Configure 'HuggingFace Model' or 'OpenAI Model' to the name of the model to use.
Configure 'Force Recreate Collection' to True if you want to recreate the collection if it already exists.
Configure 'Similarity Metric' to the similarity metric to use when querying Qdrant.
If the documents to send to Qdrant contain a unique identifier(UUID), set the 'Document ID Field Name' property to the name of the field that contains the document ID.
This property can be left blank, in which case a UUID will be generated based on the FlowFile's filename.
""",
)
class PutQdrant(FlowFileTransform):
class Java:
implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
class ProcessorDetails:
version = "2.0.0.dev0"
description = """Publishes JSON data to Qdrant. The Incoming data must be in single JSON per Line format, each with two keys: 'text' and 'metadata'.
The text must be a string, while metadata must be a map with strings for values. Any additional fields will be ignored."""
tags = [
"qdrant",
"vector",
"vectordb",
"vectorstore",
"embeddings",
"ai",
"artificial intelligence",
"ml",
"machine learning",
"text",
"LLM",
]
DOC_ID_FIELD_NAME = PropertyDescriptor(
name="Document ID Field Name",
description="""Specifies the name of the field in the 'metadata' element of each document where the document's ID can be found.
If not specified, a UUID will be generated based on the FlowFile's filename and an incremental number.""",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
FORCE_RECREATE_COLLECTION = PropertyDescriptor(
name="Force Recreate Collection",
description="Specifies whether to recreate the collection if it already exists. Essentially clearing the existing data.",
required=True,
default_value="False",
allowable_values=["True", "False"],
validators=[StandardValidators.BOOLEAN_VALIDATOR],
)
SIMILARITY_METRIC = PropertyDescriptor(
name="Similarity Metric",
description="Specifies the similarity metric when creating the collection.",
required=True,
default_value=Distance.COSINE,
allowable_values=[
Distance.COSINE,
Distance.EUCLID,
Distance.DOT,
Distance.MANHATTAN,
],
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
)
properties = (
QdrantUtils.QDRANT_PROPERTIES
+ QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+ [
FORCE_RECREATE_COLLECTION,
SIMILARITY_METRIC,
DOC_ID_FIELD_NAME,
]
)
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.properties
def onScheduled(self, context):
# The Qdrant#construct_instance() internally checks if the collection exists
# and creates it if it doesn't with the appropriate dimesions and configurations.
self.vector_store = Qdrant.construct_instance(
texts=["Some text to obtain the embeddings dimension when creating the collection"],
embedding=create_embedding_service(context),
collection_name=context.getProperty(QdrantUtils.COLLECTION_NAME).getValue(),
url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
force_recreate=context.getProperty(self.FORCE_RECREATE_COLLECTION).asBoolean(),
distance_func=context.getProperty(self.SIMILARITY_METRIC).getValue(),
)
def transform(self, context, flowfile):
id_field_name = context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
# Read the FlowFile content as "json-lines".
json_lines = flowfile.getContentsAsBytes().decode()
i = 1
texts, metadatas, ids = [], [], []
for line in json_lines.split("\n"):
try:
doc = json.loads(line)
except Exception as e:
message = f"Could not parse line {i} as JSON"
raise ValueError(message) from e
metadata = doc.get("metadata")
texts.append(doc.get("text"))
metadatas.append(metadata)
doc_id = None
if id_field_name is not None:
doc_id = metadata.get(id_field_name)
if doc_id is None:
doc_id = QdrantUtils.convert_id(flowfile.getAttribute("filename") + "-" + str(i))
ids.append(doc_id)
i += 1
self.vector_store.add_texts(texts=texts, metadatas=metadatas, ids=ids)
return FlowFileTransformResult(relationship="success")