This notebook shows how to use the Apache Beam RunInference transform for generative AI tasks. It uses a large language model (LLM) from the Hugging Face Model Hub.
This notebook demonstrates the following steps:
Load and save a model from the Hugging Face Model Hub.
The input and output for the google/flan-t5-small model are token tensors. These utility functions are used for the conversion of text to token tensors and then back to text.
defto_tensors(input_text:str,tokenizer)-> torch.Tensor:"""Encodes input text into token tensors. Args: input_text: Input text for the LLM model. tokenizer: Tokenizer for the LLM model. Returns: Tokenized input tokens. """returntokenizer(input_text,return_tensors="pt").input_ids[0]deffrom_tensors(result:PredictionResult,tokenizer)-> str:"""Decodes output token tensors into text. Args: result: Prediction results from the RunInference transform. tokenizer: Tokenizer for the LLM model. Returns: The model's response as text. """output_tokens=result.inferencereturntokenizer.decode(output_tokens,skip_special_tokens=True)
# Load the tokenizer.tokenizer=AutoTokenizer.from_pretrained(model_name)# Create an instance of the PyTorch model handler.model_handler=PytorchModelHandlerTensor(state_dict_path=state_dict_path,model_class=AutoModelForSeq2SeqLM.from_config,model_params={"config":AutoConfig.from_pretrained(model_name)},inference_fn=make_tensor_model_fn("generate"),)
Run the Pipeline
example=["translate English to Spanish: We are in New York City."]pipeline=beam.Pipeline(options=PipelineOptions(save_main_session=True,pickle_library="cloudpickle"))withpipelineasp:_=(p|"Create Examples" >> beam.Create(example)|"To tensors" >> beam.Map(to_tensors,tokenizer)|"RunInference"
>> RunInference(model_handler,inference_args={"max_new_tokens":MAX_RESPONSE_TOKENS},)|"From tensors" >> beam.Map(from_tensors,tokenizer)|"Print" >> beam.Map(print))
Estamos en Nueva York City.
Except as otherwise noted, the content of this page is licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-04-30 UTC.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2025-04-30 UTC."],[[["\u003cp\u003eThis notebook demonstrates how to use Apache Beam's \u003ccode\u003eRunInference\u003c/code\u003e transform for generative AI tasks, specifically with a large language model (LLM) from the Hugging Face Model Hub.\u003c/p\u003e\n"],["\u003cp\u003eThe example shows how to load and save a model from the Hugging Face Model Hub using the Hugging Face's auto classes, and how to use the PyTorch model handler for \u003ccode\u003eRunInference\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eThe notebook includes utility functions to convert text input to token tensors and convert the resulting token tensors back to text, facilitating interaction with the LLM.\u003c/p\u003e\n"],["\u003cp\u003eThe implemented pipeline shows how to generate output from the LLM using the \u003ccode\u003eRunInference\u003c/code\u003e transform, with an example that translates an english phrase into spanish.\u003c/p\u003e\n"],["\u003cp\u003eThe code requires the installation of the Apache Beam Python SDK, PyTorch, and Transformers, and provides the necessary import dependencies along with directions to restart your runtime if an error occurs.\u003c/p\u003e\n"]]],[],null,["# Use RunInference for Generative AI\n\n\u003cbr /\u003e\n\nThis notebook shows how to use the Apache Beam [RunInference](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.RunInference) transform for generative AI tasks. It uses a large language model (LLM) from the [Hugging Face Model Hub](https://huggingface.co/models).\n\nThis notebook demonstrates the following steps:\n\n- Load and save a model from the Hugging Face Model Hub.\n- Use the PyTorch model handler for RunInference.\n\nFor more information about using RunInference, see [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) in the Apache Beam documentation.\n\nInstall the Apache Beam SDK and dependencies\n--------------------------------------------\n\nUse the following code to install the Apache Beam Python SDK, PyTorch, and Transformers. \n\n pip install apache_beam[gcp]==2.48.0\n pip install torch\n pip install transformers\n\nUse the following code to import dependencies\n**Important:** If an error occurs, restart your runtime. \n\n import os\n import apache_beam as beam\n from apache_beam.options.pipeline_options import PipelineOptions\n from apache_beam.ml.inference.base import PredictionResult\n from apache_beam.ml.inference.base import RunInference\n from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn\n from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor\n import torch\n from transformers import AutoConfig\n from transformers import AutoModelForSeq2SeqLM\n from transformers import AutoTokenizer\n from transformers.tokenization_utils import PreTrainedTokenizer\n\n\n MAX_RESPONSE_TOKENS = 256\n\n model_name = \"google/flan-t5-small\"\n state_dict_path = \"saved_model\"\n\nDownload and save the model\n---------------------------\n\nThis notebook uses the [auto classes](https://huggingface.co/docs/transformers/model_doc/auto) from Hugging Face to instantly load the model in memory. Later, the model is saved to the path defined previously. \n\n model = AutoModelForSeq2SeqLM.from_pretrained(\n model_name, torch_dtype=torch.bfloat16\n )\n\n directory = os.path.dirname(state_dict_path)\n torch.save(model.state_dict(), state_dict_path)\n\nDefine utility functions\n------------------------\n\nThe input and output for the [`google/flan-t5-small`](https://huggingface.co/google/flan-t5-small) model are token tensors. These utility functions are used for the conversion of text to token tensors and then back to text. \n\n def to_tensors(input_text: str, tokenizer) -\u003e torch.Tensor:\n \"\"\"Encodes input text into token tensors.\n Args:\n input_text: Input text for the LLM model.\n tokenizer: Tokenizer for the LLM model.\n Returns: Tokenized input tokens.\n \"\"\"\n return tokenizer(input_text, return_tensors=\"pt\").input_ids[0]\n\n\n def from_tensors(result: PredictionResult, tokenizer) -\u003e str:\n \"\"\"Decodes output token tensors into text.\n Args:\n result: Prediction results from the RunInference transform.\n tokenizer: Tokenizer for the LLM model.\n Returns: The model's response as text.\n \"\"\"\n output_tokens = result.inference\n return tokenizer.decode(output_tokens, skip_special_tokens=True)\n\n # Load the tokenizer.\n tokenizer = AutoTokenizer.from_pretrained(model_name)\n\n # Create an instance of the PyTorch model handler.\n model_handler = PytorchModelHandlerTensor(\n state_dict_path=state_dict_path,\n model_class=AutoModelForSeq2SeqLM.from_config,\n model_params={\"config\": AutoConfig.from_pretrained(model_name)},\n inference_fn=make_tensor_model_fn(\"generate\"),\n )\n\nRun the Pipeline\n----------------\n\n example = [\"translate English to Spanish: We are in New York City.\"]\n\n pipeline = beam.Pipeline(options=PipelineOptions(save_main_session=True,pickle_library=\"cloudpickle\"))\n\n with pipeline as p:\n _ = (\n p\n | \"Create Examples\" \u003e\u003e beam.Create(example)\n | \"To tensors\" \u003e\u003e beam.Map(to_tensors, tokenizer)\n | \"RunInference\"\n \u003e\u003e RunInference(\n model_handler,\n inference_args={\"max_new_tokens\": MAX_RESPONSE_TOKENS},\n )\n | \"From tensors\" \u003e\u003e beam.Map(from_tensors, tokenizer)\n | \"Print\" \u003e\u003e beam.Map(print)\n )\n\n```\nEstamos en Nueva York City.\n```"]]