Prompt: How can I use Apache Beam to run inference on Large Language Models (LLMs)?
Response: The optimal approach for conducting inference within an Apache Beam pipeline is by leveraging the RunInference API provided by the Apache Beam Python SDK. This feature allows you to seamlessly incorporate ML models into your pipeline or execute remote inference calls.
You can use the RunInference transform with large models as long as they fit on your hardware. In addition, RunInference has several mechanisms for reducing memory usage. For example, RunInference by default will load at most a single copy of each model per process (rather than one per thread).
Many Beam runners run multiple Beam processes per machine at once. Loading large models like LLMs multiple times might cause problems because of the negative impact on memory footprint. For memory-intensive models, RunInference provides a mechanism for more intelligently sharing memory across multiple processes to reduce the overall memory footprint. To enable this mode, set the parameter large_model to True in model configuration and Beam will take care of the memory management.
The typical workflow for conducting inference on Large Language Models (LLMs) within an Apache Beam pipeline involves the following steps:
RunInference transform to obtain predictions from the model.Here is an example demonstrating how to leverage the RunInference API with LLMs in an Apache Beam pipeline:
import apache_beam as beam from apache_beam.ml.inference.base import RunInference with beam.Pipeline(options=pipeline_options) as pipeline: _ = ( pipeline | "CreateInputs" >> beam.Create(task_sentences) | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer)) | "RunInference" >> RunInference(model_handler=model_handler) | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer)) )
In this example, Preprocess and Postprocess are DoFn classes responsible for encoding and decoding the input and output data using a tokenizer. The model_handler is a ModelHandler object that wraps the underlying model and allows you to configure environment variables required to load the model.
Here is an example of how to configure a PyTorch model handler for use in your pipeline:
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor model_handler = PytorchModelHandlerTensor( state_dict_path=known_args.model_state_dict_path, model_class=T5ForConditionalGeneration, model_params={ "config": AutoConfig.from_pretrained(known_args.model_name) }, device="cpu", inference_fn=gen_fn, large_model=True)
Each specific model handler has its own configuration parameters. For example, the PytorchModelHandlerTensor requires the following parameters:
state_dict_path: the path to the saved model state dictionary.model_class: the class of the PyTorch model that defines model structure.model_params: the dictionary of model parameters.device: the device to run the model on (e.g. “cpu” or “gpu”).inference_fn: the function to run the inference during RunInference.large_model: whether to use the memory minimization mode for large models.For information on supported popular frameworks and models, refer to the reference documentation for the apache_beam.ml.inference package.
You can find the comprehensive example of using the RunInference API with LLMs in the Apache Beam GitHub repository.