blob: bc5baa799c266bff90a06c58f0d24c0f0d412bb9 [file] [log] [blame]
# SPDX-License-Identifier: Apache-2.0
import json
import langchain.vectorstores
from EmbeddingUtils import (
EMBEDDING_MODEL,
HUGGING_FACE,
HUGGING_FACE_MODEL,
OPENAI,
OPENAI_MODEL,
create_embedding_service,
)
from nifiapi.documentation import use_case
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, PropertyDescriptor, StandardValidators
from pinecone import Pinecone
@use_case(
description="Create vectors/embeddings that represent text content and send the vectors to Pinecone",
notes="This use case assumes that the data has already been formatted in JSONL format with the text to store in Pinecone provided in the 'text' field.",
keywords=["pinecone", "embedding", "vector", "text", "vectorstore", "insert"],
configuration="""
Configure the 'Pinecone API Key' to the appropriate authentication token for interacting with Pinecone.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure the 'OpenAI API Key' or 'HuggingFace API Key', depending on the chosen Embedding Model.
Set 'Pinecone Environment' to the name of your Pinecone environment
Set 'Index Name' to the name of your Pinecone Index.
Set 'Namespace' to appropriate namespace, or leave it empty to use the default Namespace.
If the documents to send to Pinecone contain a unique identifier, set the 'Document ID Field Name' property to the name of the field that contains the document ID.
This property can be left blank, in which case a unique ID will be generated based on the FlowFile's filename.
""",
)
@use_case(
description="Update vectors/embeddings in Pinecone",
notes="This use case assumes that the data has already been formatted in JSONL format with the text to store in Pinecone provided in the 'text' field.",
keywords=["pinecone", "embedding", "vector", "text", "vectorstore", "update", "upsert"],
configuration="""
Configure the 'Pinecone API Key' to the appropriate authentication token for interacting with Pinecone.
Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model'
Configure the 'OpenAI API Key' or 'HuggingFace API Key', depending on the chosen Embedding Model.
Set 'Pinecone Environment' to the name of your Pinecone environment
Set 'Index Name' to the name of your Pinecone Index.
Set 'Namespace' to appropriate namespace, or leave it empty to use the default Namespace.
Set the 'Document ID Field Name' property to the name of the field that contains the identifier of the document in Pinecone to update.
""",
)
class PutPinecone(FlowFileTransform):
class Java:
implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
class ProcessorDetails:
version = "2.0.0.dev0"
description = """Publishes JSON data to Pinecone. The Incoming data must be in single JSON per Line format, each with two keys: 'text' and 'metadata'.
The text must be a string, while metadata must be a map with strings for values. Any additional fields will be ignored."""
tags = [
"pinecone",
"vector",
"vectordb",
"vectorstore",
"embeddings",
"ai",
"artificial intelligence",
"ml",
"machine learning",
"text",
"LLM",
]
PINECONE_API_KEY = PropertyDescriptor(
name="Pinecone API Key",
description="The API Key to use in order to authentication with Pinecone",
sensitive=True,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
)
HUGGING_FACE_API_KEY = PropertyDescriptor(
name="HuggingFace API Key",
description="The API Key for interacting with HuggingFace",
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
required=True,
sensitive=True,
dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
)
OPENAI_API_KEY = PropertyDescriptor(
name="OpenAI API Key",
description="The API Key for OpenAI in order to create embeddings",
sensitive=True,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
)
PINECONE_ENV = PropertyDescriptor(
name="Pinecone Environment",
description="The name of the Pinecone Environment. This can be found in the Pinecone console next to the API Key.",
sensitive=False,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
)
INDEX_NAME = PropertyDescriptor(
name="Index Name",
description="The name of the Pinecone index.",
sensitive=False,
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
TEXT_KEY = PropertyDescriptor(
name="Text Key",
description="The key in the document that contains the text to create embeddings for.",
required=True,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
default_value="text",
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
NAMESPACE = PropertyDescriptor(
name="Namespace",
description="The name of the Pinecone Namespace to put the documents to.",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
DOC_ID_FIELD_NAME = PropertyDescriptor(
name="Document ID Field Name",
description="""Specifies the name of the field in the 'metadata' element of each document where the document's ID can be found.
If not specified, an ID will be generated based on the FlowFile's filename and a one-up number.""",
required=False,
validators=[StandardValidators.NON_EMPTY_VALIDATOR],
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
)
properties = [
PINECONE_API_KEY,
EMBEDDING_MODEL,
OPENAI_API_KEY,
OPENAI_MODEL,
HUGGING_FACE_API_KEY,
HUGGING_FACE_MODEL,
PINECONE_ENV,
INDEX_NAME,
TEXT_KEY,
NAMESPACE,
DOC_ID_FIELD_NAME,
]
embeddings = None
pc = None
def __init__(self, **kwargs):
pass
def getPropertyDescriptors(self):
return self.properties
def onScheduled(self, context):
# initialize pinecone
self.pc = Pinecone(
api_key=context.getProperty(self.PINECONE_API_KEY).getValue(),
environment=context.getProperty(self.PINECONE_ENV).getValue(),
)
# initialize embedding service
self.embeddings = create_embedding_service(context)
def transform(self, context, flowfile):
# First, check if our index already exists. If it doesn't, we create it
index_name = context.getProperty(self.INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
namespace = context.getProperty(self.NAMESPACE).evaluateAttributeExpressions(flowfile).getValue()
id_field_name = context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
index = self.pc.Index(index_name)
# Read the FlowFile content as "json-lines".
json_lines = flowfile.getContentsAsBytes().decode()
i = 1
texts = []
metadatas = []
ids = []
for line in json_lines.split("\n"):
try:
doc = json.loads(line)
except Exception as e:
message = f"Could not parse line {i} as JSON"
raise ValueError(message) from e
text = doc.get("text")
metadata = doc.get("metadata")
texts.append(text)
# Remove any null values, or it will cause the embedding to fail
filtered_metadata = {}
for key, value in metadata.items():
if value is not None:
filtered_metadata[key] = value
metadatas.append(filtered_metadata)
doc_id = None
if id_field_name is not None:
doc_id = metadata.get(id_field_name)
if doc_id is None:
doc_id = flowfile.getAttribute("filename") + "-" + str(i)
ids.append(doc_id)
i += 1
text_key = context.getProperty(self.TEXT_KEY).evaluateAttributeExpressions().getValue()
vectorstore = langchain.vectorstores.Pinecone(index, self.embeddings.embed_query, text_key)
vectorstore.add_texts(texts=texts, metadatas=metadatas, ids=ids, namespace=namespace)
return FlowFileTransformResult(relationship="success")