Définir les options de pipeline Dataflow

Cette page explique comment définir des options de pipeline pour vos tâches Dataflow. Ces options de pipeline permettent de configurer l'emplacement et les paramètres d'exécution du pipeline, ainsi que les ressources qu'il utilise.

L'exécution du pipeline est distincte de l'exécution de votre programme Apache Beam. Le programme Apache Beam que vous avez écrit construit un pipeline pour une exécution différée. Cela signifie que le programme génère une série d'étapes que tout exécuteur Apache Beam compatible peut exécuter. Les exécuteurs compatibles incluent l'exécuteur Dataflow sur Google Cloud et l'exécuteur direct qui exécute le pipeline directement dans un environnement local.

Vous pouvez transmettre des paramètres dans un job Dataflow au moment de l'exécution. Pour en savoir plus sur la définition des options de pipeline lors de l'exécution, consultez la page Configurer les options de pipeline.

Utiliser des options de pipeline avec les SDK Apache Beam

Vous pouvez utiliser les SDK suivants pour définir les options de pipeline des tâches Dataflow :

  • SDK Apache Beam pour Python
  • SDK Apache Beam pour Java
  • SDK Apache Beam pour Go

Pour utiliser les SDK, vous devez définir l'exécuteur de pipeline et d'autres paramètres d'exécution à l'aide de la classe PipelineOptions du SDK Apache Beam.

Il existe deux méthodes pour spécifier les options de pipeline :

  • Définissez les options de pipeline de manière automatisée en fournissant une liste d'options de pipeline.
  • Définissez les options de pipeline directement sur la ligne de commande lorsque vous exécutez le code de votre pipeline.

Définir les options de pipeline de manière automatisée

Vous pouvez définir les options de pipeline de manière automatisée en créant et en modifiant un objet PipelineOptions.

Java

Créez un objet PipelineOptions à l'aide de la méthode PipelineOptionsFactory.fromArgs.

Pour obtenir un exemple, consultez la section Lancer sur Cloud Dataflow de cette page.

Python

Créez un objet PipelineOptions.

Pour obtenir un exemple, consultez la section Lancer sur Cloud Dataflow de cette page.

Go

La définition automatisée des options de pipeline par PipelineOptions n'est pas disponible dans le SDK Apache Beam pour Go. Utilisez des arguments de ligne de commande Go.

Pour obtenir un exemple, consultez la section Lancer sur Cloud Dataflow de cette page.

Définir les options de pipeline sur la ligne de commande

Vous pouvez définir les options de pipeline à l'aide d'arguments de ligne de commande.

Java

L'exemple de syntaxe suivant provient du pipeline WordCount du guide de démarrage rapide pour Java.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

Remplacez les éléments suivants :

  • PROJECT_ID : ID de votre projet Google Cloud
  • BUCKET_NAME : nom de votre bucket Cloud Storage
  • REGION : région Dataflow, us-central1

Python

L'exemple de syntaxe suivant provient du pipeline WordCount du guide de démarrage rapide pour Python.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

Remplacez les éléments suivants :

  • DATAFLOW_REGION : région dans laquelle vous souhaitez déployer le job Dataflow (par exemple, europe-west1)

    L'option --region remplace la région par défaut définie dans le serveur de métadonnées, sur votre client local ou dans les variables d'environnement.

  • STORAGE_BUCKET : nom du bucket Cloud Storage

  • PROJECT_ID : ID de projet Google Cloud

Go

L'exemple de syntaxe suivant provient du pipeline WordCount du guide de démarrage rapide pour Go.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

Remplacez les éléments suivants :

  • BUCKET_NAME : nom du bucket Cloud Storage

  • PROJECT_ID : ID de projet Google Cloud

  • DATAFLOW_REGION : région dans laquelle vous souhaitez déployer le job Dataflow. Exemple :europe-west1 L'option --region remplace la région par défaut définie dans le serveur de métadonnées, sur votre client local ou dans les variables d'environnement.

Définir les options de pipeline expérimentales

Dans les SDK Java, Python et Go, l'option de pipeline experiments active les fonctionnalités expérimentales ou pré-DG de Dataflow.

Définir de manière automatisée

Pour définir l'option experiments de manière automatisée, utilisez la syntaxe suivante.

Java

Dans votre objet PipelineOptions, incluez l'option experiments à l'aide de la syntaxe suivante. Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.

options.setExperiments("streaming_boot_disk_size_gb=80")

Pour obtenir un exemple illustrant la création de l'objet PipelineOptions, consultez la section Lancer sur Cloud Dataflow de cette page.

Python

Dans votre objet PipelineOptions, incluez l'option experiments à l'aide de la syntaxe suivante. Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

Pour obtenir un exemple illustrant la création de l'objet PipelineOptions, consultez la section Lancer sur Cloud Dataflow de cette page.

Go

La définition automatisée des options de pipeline par PipelineOptions n'est pas disponible dans le SDK Apache Beam pour Go. Utilisez des arguments de ligne de commande Go.

Définir sur la ligne de commande

Pour définir l'option experiments sur la ligne de commande, utilisez la syntaxe suivante.

Java

Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.

--experiments=streaming_boot_disk_size_gb=80

Python

Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.

--experiments=streaming_boot_disk_size_gb=80

Go

Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.

--experiments=streaming_boot_disk_size_gb=80

Définir dans un modèle

Pour activer une fonctionnalité expérimentale lors de l'exécution d'un modèle Dataflow, utilisez l'option --additional-experiments.

Modèle classique

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

modèle Flex

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Accéder à l'objet d'options de pipeline

Lorsque vous créez l'objet Pipeline dans votre programme Apache Beam, transmettez PipelineOptions. Lorsque le service Dataflow exécute le pipeline, il envoie une copie des options de pipeline PipelineOptions à chaque nœud de calcul.

Java

Accédez à PipelineOptions dans une instance DoFn de n'importe quelle transformation ParDo à l'aide de la méthode ProcessContext.getPipelineOptions.

Python

Cette fonctionnalité n'est pas disponible dans le SDK Apache Beam pour Python.

Go

Accédez aux options de pipeline à l'aide de beam.PipelineOptions.

Lancer sur Dataflow

Exécutez votre tâche sur des ressources Google Cloud gérées à l'aide du service d'exécution Dataflow. L'exécution de votre pipeline avec Dataflow crée une tâche Dataflow, qui utilise des ressources Compute Engine et Cloud Storage dans votre projet Google Cloud. Pour en savoir plus sur les autorisations Dataflow, consultez la page Sécurité et autorisations Dataflow.

Définir les options requises

Pour exécuter votre pipeline à l'aide de Dataflow, définissez les options de pipeline suivantes :

Java

  • project : ID de votre projet Google Cloud.
  • runner : exécuteur de pipeline qui exécute le pipeline. Pour l'exécution dans Google Cloud, vous devez utiliser DataflowRunner.
  • gcpTempLocation : chemin d'accès Cloud Storage permettant à Dataflow de préparer la plupart des fichiers temporaires. Si vous souhaitez spécifier un bucket, vous devez le créer à l'avance. Si vous ne définissez pas gcpTempLocation, vous pouvez définir l'option de pipeline tempLocation. gcpTempLocation est alors défini sur la valeur de tempLocation. Si aucun paramètre n'est spécifié, un gcpTempLocation par défaut est créé.
  • stagingLocation : chemin d'accès Cloud Storage dans lequel Dataflow prépare les fichiers binaires. Si vous utilisez le SDK Apache Beam 2.28 ou version ultérieure, ne définissez pas cette option. Pour le SDK Apache Beam 2.28 ou version antérieure, si vous ne définissez pas cette option, la valeur spécifiée pour tempLocation est utilisée en tant qu'emplacement de préproduction.

    Une valeur gcpTempLocation par défaut est créée si ni cette valeur, ni tempLocation ne sont spécifiés. Si la valeur tempLocation est spécifiée, mais que gcpTempLocation ne l'est pas, tempLocation doit être un chemin Cloud Storage qui est utilisé comme valeur par défaut pour gcpTempLocation. Si la valeur tempLocation n'est pas spécifiée, mais que gcpTempLocation l'est, tempLocation n'est pas renseigné.

Python

  • project : ID de votre projet Google Cloud.
  • region : région de votre job Dataflow.
  • runner : exécuteur de pipeline qui exécute le pipeline. Pour l'exécution dans Google Cloud, vous devez utiliser DataflowRunner.
  • temp_location : chemin d'accès à Cloud Storage dans lequel Dataflow prépare les fichiers de tâches temporaires créés lors de l'exécution du pipeline.

Go

  • project : ID de votre projet Google Cloud.
  • region : région de votre job Dataflow.
  • runner : exécuteur de pipeline qui exécute le pipeline. Pour l'exécution dans Google Cloud, vous devez utiliser dataflow.
  • staging_location : chemin d'accès à Cloud Storage dans lequel Dataflow prépare les fichiers de tâches temporaires créés lors de l'exécution du pipeline.

Définir les options de pipeline de manière automatisée

L'exemple de code suivant montre comment construire un pipeline de manière automatisée en définissant l'exécuteur et les autres options requises pour exécuter le pipeline à l'aide de Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

Le SDK Apache Beam pour Go utilise des arguments de ligne de commande Go. Utilisez flag.Set() pour définir les valeurs des options.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.

Utiliser les options de pipeline depuis la ligne de commande

L'exemple suivant montre comment utiliser les options de pipeline spécifiées sur la ligne de commande. Cet exemple ne définit pas les options de pipeline de manière automatisée.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Utilisez le module argparse de Python pour analyser les options de ligne de commande.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Utilisez le package Go flag pour analyser les options de ligne de commande. Vous devez analyser les options avant d'appeler beam.Init(). Dans cet exemple, output est une option de ligne de commande.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.

Contrôler les modes d'exécution

Lorsqu'un programme Apache Beam exécute un pipeline sur un service tel que Dataflow, il peut soit exécuter le pipeline de manière asynchrone, soit le bloquer jusqu'à ce qu'il soit terminé. Vous pouvez modifier ce comportement en suivant les instructions ci-dessous.

Java

Lorsqu'un programme Java Apache Beam exécute un pipeline sur un service tel que Dataflow, il est généralement exécuté de manière asynchrone. Pour exécuter un pipeline et attendre la fin de la tâche, définissez DataflowRunner en tant qu'exécuteur de pipeline et appelez explicitement pipeline.run().waitUntilFinish().

Lorsque vous utilisez DataflowRunner et appelez waitUntilFinish() sur l'objet PipelineResult renvoyé par pipeline.run(), le pipeline s'exécute sur Google Cloud, mais le code local attend que la tâche cloud se termine et renvoie l'objet DataflowPipelineJob final. Pendant l'exécution de la tâche, le service Dataflow imprime les mises à jour de l'état de la tâche et les messages de la console.

Python

Lorsqu'un programme Python Apache Beam exécute un pipeline sur un service tel que Dataflow, il est généralement exécuté de manière asynchrone. Pour bloquer l'exécution jusqu'à la fin du pipeline, utilisez la méthode wait_until_finish() de l'objet PipelineResult renvoyé par la méthode run() de l'exécuteur.

Go

Lorsqu'un programme Go Apache Beam exécute un pipeline sur Dataflow, il est synchrone par défaut et se bloque jusqu'à la fin de l'exécution du pipeline. Si vous ne souhaitez pas bloquer le programme, deux options s'offrent à vous :

  1. Démarrez la tâche dans une routine Go.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Utilisez l'indicateur de ligne de commande --async, qui se trouve dans le package jobopts.

Pour afficher les détails de l'exécution, surveiller la progression et vérifier l'état d'avancement de la tâche, utilisez l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.

Utiliser des sources de diffusion en continu

Java

Si le pipeline lit à partir d'une source de données illimitées (par exemple, Pub/Sub), il s'exécute automatiquement en mode de traitement par flux.

Python

Si le pipeline utilise une source de données illimitée (par exemple, Pub/Sub), vous devez définir l'option streaming sur "true".

Go

Si le pipeline lit à partir d'une source de données illimitées (par exemple, Pub/Sub), il s'exécute automatiquement en mode de traitement par flux.

Les tâches de traitement par flux utilisent par défaut un type de machine Compute Engine défini sur n1-standard-2 au minimum.

Lancer en local

Au lieu d'exécuter le pipeline sur des ressources cloud gérées, vous pouvez choisir d'exécuter le pipeline localement. L'exécution locale présente certains avantages pour tester, déboguer ou exécuter le pipeline sur de petits ensembles de données. Par exemple, l'exécution locale supprime la dépendance sur le service Dataflow distant et le projet Google Cloud associé.

Si vous optez pour une exécution locale, vous devez exécuter le pipeline avec des ensembles de données suffisamment petits pour tenir dans la mémoire locale. Vous pouvez créer un petit ensemble de données en mémoire à l'aide d'une transformation Create, ou utiliser une transformation Read pour travailler sur de petits fichiers locaux ou distants. Une exécution locale constitue généralement un moyen plus rapide et plus facile d'effectuer des tests et des opérations de débogage avec moins de dépendances externes. Cependant, ce type d'exécution est limité par la mémoire disponible dans l'environnement local.

L'exemple de code suivant montre comment construire un pipeline qui s'exécute dans l'environnement local.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

Une fois le pipeline construit, exécutez-le.

Créer des options de pipeline personnalisées

Vous pouvez ajouter vos propres options personnalisées en plus des options de pipeline PipelineOptions standards. La ligne de commande d'Apache Beam peut également analyser des options personnalisées à l'aide d'arguments de ligne de commande spécifiés au même format.

Java

Pour ajouter vos propres options, définissez une interface avec les méthodes "getter" et "setter" pour chaque option, comme dans l'exemple suivant :

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

Pour ajouter vos propres options, utilisez la méthode add_argument() (qui se comporte exactement comme le module argparse standard de Python), comme dans l'exemple suivant :

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

Pour ajouter vos propres options, utilisez le package d'options Go, comme illustré dans l'exemple suivant :

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

Vous pouvez également spécifier une description, qui apparaît lorsqu'un utilisateur transmet --help en tant qu'argument de ligne de commande, ainsi qu'une valeur par défaut.

Java

Vous définissez la description et la valeur par défaut à l'aide d'annotations, comme suit :

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Nous vous recommandons d'enregistrer votre interface avec PipelineOptionsFactory, puis de la transmettre lors de la création de l'objet PipelineOptions. Lorsque vous enregistrez l'interface avec la fonction PipelineOptionsFactory, la commande --help vous permet de rechercher l'interface d'options personnalisées et de l'ajouter à la sortie de la commande --help. PipelineOptionsFactory vérifie que les options personnalisées sont compatibles avec toutes les autres options enregistrées.

L'exemple de code suivant montre comment enregistrer l'interface d'options personnalisées avec la fonction PipelineOptionsFactory :

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Le pipeline peut maintenant accepter --myCustomOption=value en tant qu'argument de ligne de commande.

Python

Vous définissez la description et la valeur par défaut comme suit :

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

Vous définissez la description et la valeur par défaut comme suit :

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)