Gemma 模型的 Keras 实现具有一个 generate() 方法,该方法基于提示生成文本。如需将元素传递给 generate() 方法,请使用自定义推理函数。
defgemma_inference_function(model,batch,inference_args,model_id):vectorized_batch=np.stack(batch,axis=0)# The only inference_arg expected here is a max_length parameter to# determine how many words are included in the output.predictions=model.generate(vectorized_batch,**inference_args)returnutils._convert_to_result(batch,predictions,model_id)
运行流水线,并指定经过训练的模型的路径。此示例使用 TensorFlow 模型处理程序。
classFormatOutput(beam.DoFn):defprocess(self,element,*args,**kwargs):yield"Input: {input}, Output: {output}".format(input=element.example,output=element.inference)# Instantiate a NumPy array of string prompts for the model.examples=np.array(["Tell me the sentiment of the phrase 'I like pizza': "])# Specify the model handler, providing a path and the custom inference function.model_handler=TFModelHandlerNumpy(model_path,inference_fn=gemma_inference_function)withbeam.Pipeline()asp:_=(p|beam.Create(examples)# Create a PCollection of the prompts.|RunInference(model_handler,inference_args={'max_length':32})# Send the prompts to the model and get responses.|beam.ParDo(FormatOutput())# Format the output.|beam.Map(print)# Print the formatted output.)
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-09-04。"],[[["\u003cp\u003eGemma is a family of open-weight, lightweight models derived from the technology behind Google's Gemini models, and is available for use in Apache Beam inference pipelines.\u003c/p\u003e\n"],["\u003cp\u003eGemma models can be leveraged for various tasks, such as sentiment analysis, by processing data in real-time as it arrives, and is compatible with Dataflow for seamless workflows.\u003c/p\u003e\n"],["\u003cp\u003eUtilizing Gemma models requires specific prerequisites, including downloading the model in \u003ccode\u003e.keras\u003c/code\u003e format, accessing them via Kaggle, completing a consent form, and creating a custom container image for Dataflow job execution.\u003c/p\u003e\n"],["\u003cp\u003eTo use a Gemma model in an Apache Beam pipeline, you must provide the path to your saved model, define a custom inference function (like \u003ccode\u003egemma_inference_function\u003c/code\u003e), and then run your pipeline, specifying the model handler and inference arguments.\u003c/p\u003e\n"],["\u003cp\u003eGemma models support batch and streaming pipelines with specific requirements, such as Apache Beam Python SDK versions 2.46.0 or later, Dataflow Runner v2, and the use of GPU types like T4 and L4.\u003c/p\u003e\n"]]],[],null,["# Use Gemma open models with Dataflow\n\nGemma is a family of lightweight, state-of-the art open models built\nfrom research and technology used to create the Gemini models.\nYou can use Gemma models in your Apache Beam inference pipelines.\nThe term *open weight* means that a model's pretrained parameters, or weights, are\nreleased. Details such as the original dataset, model architecture, and training\ncode aren't provided.\n\n- For a list of available models and the details about each model, see the\n [Gemma models overview](https://ai.google.dev/gemma/docs/).\n\n- To learn how to download and use models, see\n [Get started with Gemma using KerasNLP](https://ai.google.dev/gemma/docs/get_started).\n\n- To download a model, see [Gemma models](https://www.kaggle.com/models/keras/gemma).\n\nUse cases\n---------\n\nYou can use Gemma models with Dataflow for\n[sentiment analysis](https://en.wikipedia.org/wiki/Sentiment_analysis).\nWith Dataflow and the Gemma models, you can process events, such\nas customer reviews, as they arrive. Run the reviews through the model to\nanalyze them, and then generate recommendations. By combining Gemma with\nApache Beam, you can seamlessly complete this workflow.\n\nSupport and limitations\n-----------------------\n\nGemma open models are supported with Apache Beam and Dataflow\nwith the following requirements:\n\n- Available for batch and streaming pipelines that use the Apache Beam Python SDK versions 2.46.0 and later.\n- Dataflow jobs must use [Runner v2](/dataflow/docs/runner-v2).\n- Dataflow jobs must use [GPUs](/dataflow/docs/gpu/gpu-support). For a list of GPU types supported with Dataflow, see [Availability](/dataflow/docs/gpu/gpu-support#availability). The T4 and L4 GPU types are recommended.\n- The model must be downloaded and saved in the `.keras` file format.\n- The [TensorFlow model handler](https://beam.apache.org/documentation/ml/about-ml/#tensorflow) is recommended but not required.\n\nPrerequisites\n-------------\n\n- Access Gemma models through [Kaggle](https://www.kaggle.com/models/keras/gemma).\n- Complete the [consent form](https://www.kaggle.com/models/google/gemma/license/consent) and accept the terms and conditions.\n- Download the Gemma model. Save it in the `.keras` file format in a location that your Dataflow job can access, such as a Cloud Storage bucket. When you specify a value for the model path variable, use the path to this storage location.\n- To run your job on Dataflow, create a custom container image. This step makes it possible to run the pipeline with GPUs on the Dataflow service.\n - To see a complete workflow that includes creating a Docker image, see [RunInference on Dataflow streaming with Gemma](https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/gemma) in GitHub.\n - For more information about building the Docker image, see [Build a custom container image](/dataflow/docs/gpu/use-gpus#custom-container) in \"Run a pipeline with GPUs.\"\n - To push the container to Artifact Registry by using Docker, see the [Build and push the image](/dataflow/docs/guides/build-container-image#build_and_push_the_image) section in \"Build custom container images for Dataflow.\"\n\nUse Gemma in your pipeline\n--------------------------\n\nTo use a Gemma model in your Apache Beam pipeline, follow these steps.\n\n1. In your Apache Beam code, after you import your pipeline dependencies, include\n a path to your saved model:\n\n model_path = \"\u003cvar translate=\"no\"\u003eMODEL_PATH\u003c/var\u003e\"\n\n Replace \u003cvar translate=\"no\"\u003eMODEL_PATH\u003c/var\u003e with the path where you saved the\n downloaded model. For example, if you save your model to a Cloud Storage\n bucket, the path has the format\n `gs://`\u003cvar translate=\"no\"\u003eSTORAGE_PATH\u003c/var\u003e`/`\u003cvar translate=\"no\"\u003eFILENAME\u003c/var\u003e`.keras`.\n2. The Keras implementation of the Gemma models has a `generate()` method\n that generates text based on a prompt. To pass elements to the\n `generate()` method, use a custom inference function.\n\n def gemma_inference_function(model, batch, inference_args, model_id):\n vectorized_batch = np.stack(batch, axis=0)\n # The only inference_arg expected here is a max_length parameter to\n # determine how many words are included in the output.\n predictions = model.generate(vectorized_batch, **inference_args)\n return utils._convert_to_result(batch, predictions, model_id)\n\n3. Run your pipeline, specifying the path to the trained model. This\n example uses a TensorFlow model handler.\n\n class FormatOutput(beam.DoFn):\n def process(self, element, *args, **kwargs):\n yield \"Input: {input}, Output: {output}\".format(input=element.example, output=element.inference)\n\n # Instantiate a NumPy array of string prompts for the model.\n examples = np.array([\"Tell me the sentiment of the phrase 'I like pizza': \"])\n # Specify the model handler, providing a path and the custom inference function.\n model_handler = TFModelHandlerNumpy(model_path, inference_fn=gemma_inference_function)\n with beam.Pipeline() as p:\n _ = (p | beam.Create(examples) # Create a PCollection of the prompts.\n | RunInference(model_handler, inference_args={'max_length': 32}) # Send the prompts to the model and get responses.\n | beam.ParDo(FormatOutput()) # Format the output.\n | beam.Map(print) # Print the formatted output.\n )\n\nWhat's next\n-----------\n\n- [Create a Dataflow streaming pipeline that uses RunInference and Gemma](https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/gemma).\n- [Run inference with a Gemma open model in Google Colab](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_gemma.ipynb) (requires Colab Enterprise).\n- [Run a pipeline with GPUs](/dataflow/docs/gpu/use-gpus).\n- [Tune your model](https://ai.google.dev/gemma/docs/lora_tuning)."]]