Figura 1. Il flusso di lavoro completo di Dataflow ML.
Requisiti e limitazioni
Dataflow ML supporta le pipeline in batch e in streaming.
L'API RunInference è supportata in Apache Beam 2.40.0 e versioni successive.
L'API MLTransform è supportata in Apache Beam 2.53.0 e versioni successive.
Gli handler dei modelli sono disponibili per PyTorch, scikit-learn, TensorFlow, ONNX e TensorRT.
Per i framework non supportati, puoi utilizzare un gestore del modello personalizzato.
Preparazione dei dati per l'addestramento
Utilizza la funzionalità MLTransform per preparare i dati per l'addestramento dei modelli ML. Per maggiori informazioni, consulta Pre-elaborare i dati con MLTransform.
Dataflow ML combina la potenza di Dataflow con l'API RunInference di Apache Beam.
Con l'API RunInference, definisci le caratteristiche e le proprietà del modello
e passi la configurazione alla trasformazione RunInference. Questa funzionalità consente agli utenti di eseguire il modello all'interno delle pipeline Dataflow senza dover conoscere i dettagli di implementazione del modello. Puoi scegliere il framework più adatto ai tuoi dati, ad esempio TensorFlow e PyTorch.
Eseguire più modelli in una pipeline
Utilizza la trasformazione RunInference per aggiungere più modelli di inferenza alla
pipeline Dataflow. Per ulteriori informazioni, inclusi i dettagli del codice, consulta Pipeline multimodello nella documentazione di Apache Beam.
Creare una pipeline multilingue
Per utilizzare RunInference con una pipeline Java,
crea una trasformazione Python cross-language. La pipeline chiama la trasformazione, che esegue la pre-elaborazione, la post-elaborazione e l'inferenza.
Per le pipeline batch o in streaming che richiedono l'utilizzo di acceleratori, puoi eseguire le pipeline Dataflow su dispositivi GPU NVIDIA. Per ulteriori informazioni, consulta
Eseguire una pipeline Dataflow con GPU.
Risolvere i problemi di Dataflow ML
Questa sezione fornisce strategie e link per la risoluzione dei problemi che potresti trovare utili quando utilizzi Dataflow ML.
La funzione Stack si aspetta che ogni tensore abbia le stesse dimensioni
Se fornisci immagini di dimensioni diverse o embedding di parole di lunghezze diverse
quando utilizzi l'API RunInference, potrebbe verificarsi il seguente errore:
File "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py", line 232, in run_inference batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack expects each tensor to be equal size, but got [12] at entry 0 and [10] at entry 1 [while running 'PyTorchRunInference/ParDo(_RunInferenceDoFn)']
Questo errore si verifica perché l'API RunInference non può raggruppare elementi tensori di dimensioni diverse. Per le soluzioni alternative, consulta Impossibile raggruppare gli elementi tensore nella documentazione di Apache Beam.
Evitare errori di esaurimento della memoria con modelli di grandi dimensioni
Quando carichi un modello di ML di medie o grandi dimensioni, la tua macchina potrebbe esaurire la memoria.
Dataflow fornisce strumenti per aiutarti a evitare errori di OOM (out-of-memory)
durante il caricamento dei modelli di ML. Utilizza la tabella seguente per determinare l'approccio appropriato per il tuo scenario.
Scenario
Soluzione
I modelli sono abbastanza piccoli da rientrare in memoria.
Utilizza la trasformazione RunInference senza configurazioni aggiuntive. La trasformazione RunInference condivide i modelli tra i thread. Se puoi inserire un modello per core della CPU sulla tua macchina, la pipeline può utilizzare la configurazione predefinita.
Più modelli addestrati in modo diverso eseguono la stessa attività.
Se stai creando un gestore del modello personalizzato, anziché utilizzare il parametro
large_model, sostituisci il parametro
share_model_across_processes.
Devi configurare il numero esatto di modelli caricati sulla tua macchina.
Per controllare esattamente quanti modelli vengono caricati, utilizza il parametro
model_copies.
Se stai creando un gestore del modello personalizzato, sostituisci il parametro
model_copies.
Consulta la documentazione relativa alle
pipeline di AI/ML
di Apache Beam per informazioni dettagliate sull'utilizzo dell'apprendimento automatico con Apache Beam.
[[["Facile da capire","easyToUnderstand","thumb-up"],["Il problema è stato risolto","solvedMyProblem","thumb-up"],["Altra","otherUp","thumb-up"]],[["Difficile da capire","hardToUnderstand","thumb-down"],["Informazioni o codice di esempio errati","incorrectInformationOrSampleCode","thumb-down"],["Mancano le informazioni o gli esempi di cui ho bisogno","missingTheInformationSamplesINeed","thumb-down"],["Problema di traduzione","translationIssue","thumb-down"],["Altra","otherDown","thumb-down"]],["Ultimo aggiornamento 2025-09-10 UTC."],[[["\u003cp\u003eDataflow ML facilitates both prediction and inference pipelines, as well as data preparation for training ML models.\u003c/p\u003e\n"],["\u003cp\u003eDataflow ML supports both batch and streaming data pipelines, utilizing the \u003ccode\u003eRunInference\u003c/code\u003e API (from Apache Beam 2.40.0) and \u003ccode\u003eMLTransform\u003c/code\u003e API (from Apache Beam 2.53.0).\u003c/p\u003e\n"],["\u003cp\u003eThe system is compatible with model handlers for popular frameworks like PyTorch, scikit-learn, TensorFlow, ONNX, and TensorRT, with options for custom handlers for other frameworks.\u003c/p\u003e\n"],["\u003cp\u003eDataflow ML enables the use of multiple inference models within a single pipeline via the \u003ccode\u003eRunInference\u003c/code\u003e transform and supports the use of GPUs for pipelines that need them.\u003c/p\u003e\n"],["\u003cp\u003eDataflow ML also provides troubleshooting guidance for common issues, including tensor size mismatch errors and out-of-memory errors when dealing with large models.\u003c/p\u003e\n"]]],[],null,["You can use Dataflow ML's scale data processing abilities for\n[prediction and inference pipelines](#prediction) and for\n[data preparation for training](#data-prep).\n\n**Figure 1.** The complete Dataflow ML workflow.\n\nRequirements and limitations \n\n- Dataflow ML supports batch and streaming pipelines.\n- The `RunInference` API is supported in Apache Beam 2.40.0 and later versions.\n- The `MLTransform` API is supported in Apache Beam 2.53.0 and later versions.\n- Model handlers are available for PyTorch, scikit-learn, TensorFlow, ONNX, and TensorRT. For unsupported frameworks, you can use a custom model handler.\n\nData preparation for training\n\n- Use the `MLTransform` feature to prepare your data for training ML models. For\n more information, see\n [Preprocess data with `MLTransform`](/dataflow/docs/machine-learning/ml-preprocess-data).\n\n- Use Dataflow with ML-OPS frameworks, such as\n [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/v1/introduction/)\n (KFP) or [TensorFlow Extended](https://www.tensorflow.org/tfx) (TFX).\n To learn more, see [Dataflow ML in ML workflows](/dataflow/docs/machine-learning/ml-data).\n\nPrediction and inference pipelines\n\nDataflow ML combines the power of Dataflow with\nApache Beam's\n[`RunInference` API](https://beam.apache.org/documentation/ml/about-ml/).\nWith the `RunInference` API, you define the model's characteristics and properties\nand pass that configuration to the `RunInference` transform. This feature\nallows users to run the model within their\nDataflow pipelines without needing to know\nthe model's implementation details. You can choose the framework that best\nsuits your data, such as TensorFlow and PyTorch.\n\nRun multiple models in a pipeline\n\nUse the `RunInference` transform to add multiple inference models to\nyour Dataflow pipeline. For more information, including code details,\nsee [Multi-model pipelines](https://beam.apache.org/documentation/ml/about-ml/#multi-model-pipelines)\nin the Apache Beam documentation.\n\nBuild a cross-language pipeline\n\nTo use RunInference with a Java pipeline,\n[create a cross-language Python transform](https://beam.apache.org/documentation/programming-guide/#1312-creating-cross-language-python-transforms). The pipeline calls the\ntransform, which does the preprocessing, postprocessing, and inference.\n\nFor detailed instructions and a sample pipeline, see\n[Using RunInference from the Java SDK](https://beam.apache.org/documentation/ml/multi-language-inference/).\n\nUse GPUs with Dataflow\n\nFor batch or streaming pipelines that require the use of accelerators, you can\nrun Dataflow pipelines on NVIDIA GPU devices. For more information, see\n[Run a Dataflow pipeline with GPUs](/dataflow/docs/gpu/use-gpus).\n\nTroubleshoot Dataflow ML\n\nThis section provides troubleshooting strategies and links that you might find\nhelpful when using Dataflow ML.\n\nStack expects each tensor to be equal size\n\nIf you provide images of different sizes or word embeddings of different lengths\nwhen using the `RunInference` API, the following error might occur: \n\n File \"/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py\", line 232, in run_inference batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack expects each tensor to be equal size, but got [12] at entry 0 and [10] at entry 1 [while running 'PyTorchRunInference/ParDo(_RunInferenceDoFn)']\n\nThis error occurs because the `RunInference` API can't batch tensor elements of\ndifferent sizes. For workarounds, see\n[Unable to batch tensor elements](https://beam.apache.org/documentation/ml/about-ml/#unable-to-batch-tensor-elements)\nin the Apache Beam documentation.\n\nAvoid out-of-memory errors with large models\n\nWhen you load a medium or large ML model, your machine might run out of memory.\nDataflow provides tools to help avoid out-of-memory (OOM) errors\nwhen loading ML models. Use the following table to determine the appropriate\napproach for your scenario.\n\n| Scenario | Solution |\n|----------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n| The models are small enough to fit in memory. | Use the `RunInference` transform without any additional configurations. The `RunInference` transform shares the models across threads. If you can fit one model per CPU core on your machine, then your pipeline can use the default configuration. |\n| Multiple differently-trained models are performing the same task. | Use per-model keys. For more information, see [Run ML inference with multiple differently-trained models](/dataflow/docs/notebooks/per_key_models). |\n| One model is loaded into memory, and all processes share this model. | Use the `large_model` parameter. For more information, see [Run ML inference with multiple differently-trained models](/dataflow/docs/notebooks/per_key_models). If you're building a custom model handler, instead of using the `large_model` parameter, override the [`share_model_across_processes`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler.share_model_across_processes) parameter. |\n| You need to configure the exact number of models loaded onto your machine. | To control exactly how many models are loaded, use the [`model_copies`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler.model_copies) parameter. If you're building a custom model handler, override the `model_copies` parameter. |\n\nFor more information about memory management with Dataflow, see\n[Troubleshoot Dataflow out of memory errors](/dataflow/docs/guides/troubleshoot-oom).\n\nWhat's next\n\n- Explore the [Dataflow ML notebooks](https://github.com/apache/beam/tree/master/examples/notebooks/beam-ml) in GitHub.\n- Get in-depth information about using ML with Apache Beam in Apache Beam's [AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) documentation.\n- Learn more about the [`RunInference` API](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.html#apache_beam.ml.inference.RunInference).\n- Learn about the [metrics](https://beam.apache.org/documentation/ml/runinference-metrics/) that you can use to monitor your `RunInference` transform."]]