| # SPDX-License-Identifier: Apache-2.0 |
| |
| import json |
| |
| from langchain.text_splitter import Language |
| from nifiapi.documentation import ProcessorConfiguration, multi_processor_use_case, use_case |
| from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult |
| from nifiapi.properties import ExpressionLanguageScope, PropertyDependency, PropertyDescriptor, StandardValidators |
| |
| SPLIT_BY_CHARACTER = "Split by Character" |
| SPLIT_CODE = "Split Code" |
| RECURSIVELY_SPLIT_BY_CHARACTER = "Recursively Split by Character" |
| |
| TEXT_KEY = "text" |
| METADATA_KEY = "metadata" |
| |
| |
| @use_case( |
| description="Create chunks of text from a single larger chunk.", |
| notes="The input for this use case is expected to be a FlowFile whose content is a JSON Lines document, with each line having a 'text' and a 'metadata' element.", |
| keywords=["embedding", "vector", "text", "rag", "retrieval augmented generation"], |
| configuration=""" |
| Set "Input Format" to "Plain Text" |
| Set "Element Strategy" to "Single Document" |
| """, |
| ) |
| @multi_processor_use_case( |
| description=""" |
| Chunk Plaintext data in order to prepare it for storage in a vector store. The output is in "json-lines" format, |
| containing the chunked data as text, as well as metadata pertaining to the chunk.""", |
| notes="The input for this use case is expected to be a FlowFile whose content is a plaintext document.", |
| keywords=["embedding", "vector", "text", "rag", "retrieval augmented generation"], |
| configurations=[ |
| ProcessorConfiguration( |
| processor_type="ParseDocument", |
| configuration=""" |
| Set "Input Format" to "Plain Text" |
| Set "Element Strategy" to "Single Document" |
| |
| Connect the 'success' Relationship to ChunkDocument. |
| """, |
| ), |
| ProcessorConfiguration( |
| processor_type="ChunkDocument", |
| configuration=""" |
| Set the following properties: |
| "Chunking Strategy" = "Recursively Split by Character" |
| "Separator" = "\\n\\n,\\n, ," |
| "Separator Format" = "Plain Text" |
| "Chunk Size" = "4000" |
| "Chunk Overlap" = "200" |
| "Keep Separator" = "false" |
| |
| Connect the 'success' Relationship to the appropriate destination to store data in the desired vector store. |
| """, |
| ), |
| ], |
| ) |
| @multi_processor_use_case( |
| description=""" |
| Parse and chunk the textual contents of a PDF document in order to prepare it for storage in a vector store. The output is in "json-lines" format, |
| containing the chunked data as text, as well as metadata pertaining to the chunk.""", |
| notes="The input for this use case is expected to be a FlowFile whose content is a PDF document.", |
| keywords=["pdf", "embedding", "vector", "text", "rag", "retrieval augmented generation"], |
| configurations=[ |
| ProcessorConfiguration( |
| processor_type="ParseDocument", |
| configuration=""" |
| Set "Input Format" to "PDF" |
| Set "Element Strategy" to "Single Document" |
| Set "Include Extracted Metadata" to "false" |
| |
| Connect the 'success' Relationship to ChunkDocument. |
| """, |
| ), |
| ProcessorConfiguration( |
| processor_type="ChunkDocument", |
| configuration=""" |
| Set the following properties: |
| "Chunking Strategy" = "Recursively Split by Character" |
| "Separator" = "\\n\\n,\\n, ," |
| "Separator Format" = "Plain Text" |
| "Chunk Size" = "4000" |
| "Chunk Overlap" = "200" |
| "Keep Separator" = "false" |
| |
| Connect the 'success' Relationship to the appropriate destination to store data in the desired vector store. |
| """, |
| ), |
| ], |
| ) |
| class ChunkDocument(FlowFileTransform): |
| class Java: |
| implements = ["org.apache.nifi.python.processor.FlowFileTransform"] |
| |
| class ProcessorDetails: |
| version = "2.0.0.dev0" |
| description = """Chunks incoming documents that are formatted as JSON Lines into chunks that are appropriately sized for creating Text Embeddings. |
| The input is expected to be in "json-lines" format, with each line having a 'text' and a 'metadata' element. |
| Each line will then be split into one or more lines in the output.""" |
| tags = [ |
| "text", |
| "split", |
| "chunk", |
| "langchain", |
| "embeddings", |
| "vector", |
| "machine learning", |
| "ML", |
| "artificial intelligence", |
| "ai", |
| "document", |
| ] |
| dependencies = ["langchain"] |
| |
| CHUNK_STRATEGY = PropertyDescriptor( |
| name="Chunking Strategy", |
| description="Specifies which splitter should be used to split the text", |
| allowable_values=[RECURSIVELY_SPLIT_BY_CHARACTER, SPLIT_BY_CHARACTER, SPLIT_CODE], |
| required=True, |
| default_value=RECURSIVELY_SPLIT_BY_CHARACTER, |
| ) |
| SEPARATOR = PropertyDescriptor( |
| name="Separator", |
| description="""Specifies the character sequence to use for splitting apart the text. If using a Chunking Strategy of Recursively Split by Character, |
| it is a comma-separated list of character sequences. Meta-characters \\n, \\r and \\t are automatically un-escaped.""", |
| required=True, |
| default_value="\\n\\n,\\n, ,", |
| validators=[StandardValidators.NON_EMPTY_VALIDATOR], |
| dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, RECURSIVELY_SPLIT_BY_CHARACTER)], |
| expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, |
| ) |
| SEPARATOR_FORMAT = PropertyDescriptor( |
| name="Separator Format", |
| description="Specifies how to interpret the value of the <Separator> property", |
| required=True, |
| default_value="Plain Text", |
| allowable_values=["Plain Text", "Regular Expression"], |
| dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, RECURSIVELY_SPLIT_BY_CHARACTER)], |
| ) |
| CHUNK_SIZE = PropertyDescriptor( |
| name="Chunk Size", |
| description="The maximum size of a chunk that should be returned", |
| required=True, |
| default_value="4000", |
| validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR], |
| ) |
| CHUNK_OVERLAP = PropertyDescriptor( |
| name="Chunk Overlap", |
| description="The number of characters that should be overlapped between each chunk of text", |
| required=True, |
| default_value="200", |
| validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR], |
| ) |
| KEEP_SEPARATOR = PropertyDescriptor( |
| name="Keep Separator", |
| description="Whether or not to keep the text separator in each chunk of data", |
| required=True, |
| default_value="false", |
| allowable_values=["true", "false"], |
| dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, RECURSIVELY_SPLIT_BY_CHARACTER)], |
| ) |
| STRIP_WHITESPACE = PropertyDescriptor( |
| name="Strip Whitespace", |
| description="Whether or not to strip the whitespace at the beginning and end of each chunk", |
| required=True, |
| default_value="true", |
| allowable_values=["true", "false"], |
| dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_BY_CHARACTER, RECURSIVELY_SPLIT_BY_CHARACTER)], |
| ) |
| LANGUAGE = PropertyDescriptor( |
| name="Language", |
| description="The language to use for the Code's syntax", |
| required=True, |
| default_value="python", |
| allowable_values=[e.value for e in Language], |
| dependencies=[PropertyDependency(CHUNK_STRATEGY, SPLIT_CODE)], |
| ) |
| |
| property_descriptors = [ |
| CHUNK_STRATEGY, |
| SEPARATOR, |
| SEPARATOR_FORMAT, |
| CHUNK_SIZE, |
| CHUNK_OVERLAP, |
| KEEP_SEPARATOR, |
| STRIP_WHITESPACE, |
| LANGUAGE, |
| ] |
| |
| def __init__(self, **kwargs): |
| pass |
| |
| def getPropertyDescriptors(self): |
| return self.property_descriptors |
| |
| def split_docs(self, context, flowfile, documents): |
| from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter |
| |
| strategy = context.getProperty(self.CHUNK_STRATEGY).getValue() |
| if strategy == SPLIT_BY_CHARACTER: |
| text_splitter = CharacterTextSplitter( |
| separator=context.getProperty(self.SEPARATOR).evaluateAttributeExpressions(flowfile).getValue(), |
| keep_separator=context.getProperty(self.KEEP_SEPARATOR).asBoolean(), |
| is_separator_regex=context.getProperty(self.SEPARATOR_FORMAT).getValue() == "Regular Expression", |
| chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(), |
| chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(), |
| length_function=len, |
| strip_whitespace=context.getProperty(self.STRIP_WHITESPACE).asBoolean(), |
| ) |
| elif strategy == SPLIT_CODE: |
| text_splitter = RecursiveCharacterTextSplitter.from_language( |
| language=context.getProperty(self.LANGUAGE).getValue(), |
| chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(), |
| chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(), |
| ) |
| else: |
| separator_text = context.getProperty(self.SEPARATOR).evaluateAttributeExpressions(flowfile).getValue() |
| splits = separator_text.split(",") |
| unescaped = [] |
| for split in splits: |
| unescaped.append(split.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t")) |
| text_splitter = RecursiveCharacterTextSplitter( |
| separators=unescaped, |
| keep_separator=context.getProperty(self.KEEP_SEPARATOR).asBoolean(), |
| is_separator_regex=context.getProperty(self.SEPARATOR_FORMAT).getValue() == "Regular Expression", |
| chunk_size=context.getProperty(self.CHUNK_SIZE).asInteger(), |
| chunk_overlap=context.getProperty(self.CHUNK_OVERLAP).asInteger(), |
| length_function=len, |
| strip_whitespace=context.getProperty(self.STRIP_WHITESPACE).asBoolean(), |
| ) |
| |
| return text_splitter.split_documents(documents) |
| |
| def to_json(self, docs) -> str: |
| json_docs = [] |
| |
| for i, doc in enumerate(docs): |
| doc.metadata["chunk_index"] = i |
| doc.metadata["chunk_count"] = len(docs) |
| |
| json_doc = json.dumps({TEXT_KEY: doc.page_content, METADATA_KEY: doc.metadata}) |
| json_docs.append(json_doc) |
| |
| return "\n".join(json_docs) |
| |
| def load_docs(self, flowfile): |
| from langchain.schema import Document |
| |
| flowfile_contents = flowfile.getContentsAsBytes().decode() |
| docs = [] |
| for line in flowfile_contents.split("\n"): |
| stripped = line.strip() |
| if stripped == "": |
| continue |
| |
| json_element = json.loads(stripped) |
| page_content = json_element.get(TEXT_KEY) |
| if page_content is None: |
| continue |
| |
| metadata = json_element.get(METADATA_KEY) |
| if metadata is None: |
| metadata = {} |
| |
| doc = Document(page_content=page_content, metadata=metadata) |
| docs.append(doc) |
| |
| return docs |
| |
| def transform(self, context, flowfile): |
| documents = self.load_docs(flowfile) |
| split_docs = self.split_docs(context, flowfile, documents) |
| |
| output_json = self.to_json(split_docs) |
| attributes = {"document.count": str(len(split_docs))} |
| return FlowFileTransformResult("success", contents=output_json, attributes=attributes) |