Spécifier les paramètres d'exécution du pipeline

Une fois que le programme Apache Beam a construit un pipeline, vous devez l'exécuter. L'exécution du pipeline est indépendante de l'exécution du programme Apache Beam. Le programme Apache Beam construit le pipeline et le code que vous avez écrit génère une série d'étapes, qui devront être exécutées par un exécuteur de pipeline. Il peut s'agir du service géré Cloud Dataflow sur Google Cloud Platform (GCP), d'un service d'exécution tiers ou d'un exécuteur de pipeline local permettant d'effectuer directement les étapes au sein de l'environnement local.

Vous pouvez spécifier l'exécuteur de pipeline et d'autres options d'exécution à l'aide de la classe PipelineOptions du SDK Apache Beam. La classe PipelineOptions permet de configurer l'emplacement et les options d'exécution du pipeline, ainsi que les ressources qu'il utilise.

Nous recommandons généralement d'exécuter le pipeline sur des ressources GCP gérées à l'aide du service d'exécution Cloud Dataflow. L'exécution du pipeline avec le service Cloud Dataflow crée une tâche Cloud Dataflow, qui utilise les ressources Compute Engine et Cloud Storage du projet GCP.

Vous pouvez également exécuter le pipeline localement. Dans ce cas, les transformations de pipeline sont exécutées sur la même machine que le programme Cloud Dataflow. L'exécution locale est utile à des fins de test et de débogage, en particulier si le pipeline peut utiliser des ensembles de données en mémoire plus petits.

Définir l'objet PipelineOptions

Vous transmettez les options de pipeline PipelineOptions lorsque vous créez l'objet Pipeline dans le programme Cloud Dataflow. Lorsque le service Cloud Dataflow exécute le pipeline, il envoie une copie des options de pipeline PipelineOptions à chaque instance de nœud de calcul.

Java : SDK 2.x

Remarque : Vous pouvez accéder à PipelineOptions dans une instance DoFn de l'opération ParDo en utilisant la méthode ProcessContext.getPipelineOptions.

Python

Cette fonctionnalité n'est pas encore disponible dans le SDK Apache Beam pour Python.

Java : SDK 1.x

Remarque : Vous pouvez accéder à PipelineOptions dans une instance DoFn de l'opération ParDo en utilisant la méthode ProcessContext.getPipelineOptions.

Définir l'objet PipelineOptions à partir d'arguments de ligne de commande

Vous pouvez configurer le pipeline en créant un objet PipelineOptions et en définissant les champs directement. Cependant, les SDK Apache Beam incluent un analyseur de ligne de commande que vous pouvez utiliser pour définir des champs dans l'objet à l'aide d'arguments de ligne de commande.

Java : SDK 2.x

Pour lire les options à partir de la ligne de commande, construisez l'objet PipelineOptions à l'aide de la méthode PipelineOptionsFactory.fromArgs, comme dans l'exemple de code suivant :

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Remarque : L'ajout de la méthode .withValidation permet à Cloud Dataflow de vérifier la présence des arguments de ligne de commande requis et de valider les valeurs des arguments.

L'utilisation de PipelineOptionsFactory.fromArgs permet d'interpréter les arguments de ligne de commande au format suivant :

--<option>=<value>

En construisant l'objet PipelineOptions de la sorte, vous pouvez spécifier les options de n'importe quelle sous-interface de org.apache.beam.sdk.options.PipelineOptions en tant qu'argument de ligne de commande.

Python

Pour lire les options à partir de la ligne de commande, créez l'objet PipelineOptions, comme dans l'exemple de code suivant :

options = PipelineOptions(flags=argv)

L'argument (flags=argv) de PipelineOptions permet d'interpréter les arguments de ligne de commande au format suivant :

--<option>=<value>

En construisant l'objet PipelineOptions de la sorte, vous pouvez spécifier les options en définissant des sous-classes de la classe .

Java : SDK 1.x

Pour lire les options à partir de la ligne de commande, construisez l'objet PipelineOptions à l'aide de la méthode PipelineOptionsFactory.fromArgs, comme dans l'exemple de code suivant :

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Remarque : L'ajout de la méthode .withValidation permet à Cloud Dataflow de vérifier la présence des arguments de ligne de commande requis et de valider les valeurs des arguments.

L'utilisation de PipelineOptionsFactory.fromArgs permet d'interpréter les arguments de ligne de commande au format suivant :

--<option>=<value>

En construisant l'objet PipelineOptions de la sorte, vous pouvez spécifier les options de n'importe quelle sous-interface de com.google.cloud.dataflow.sdk.options.PipelineOptions en tant qu'argument de ligne de commande.

Créer des options personnalisées

Vous pouvez ajouter vos propres options personnalisées en plus des options de pipeline PipelineOptions standards. L'analyseur de ligne de commande de Cloud Dataflow peut également définir vos options personnalisées à l'aide d'arguments de ligne de commande spécifiés au même format.

Java : SDK 2.x

Pour ajouter vos propres options, définissez une interface avec les méthodes "getter" et "setter" pour chaque option, comme dans l'exemple suivant :

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

Pour ajouter vos propres options, utilisez la méthode add_argument() (qui se comporte exactement comme le module argparse standard de Python), comme dans l'exemple suivant :

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java : SDK 1.x

Pour ajouter vos propres options, définissez une interface avec les méthodes "getter" et "setter" pour chaque option, comme dans l'exemple suivant :

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Vous pouvez également spécifier une description, qui apparaît lorsqu'un utilisateur transmet --help en tant qu'argument de ligne de commande, ainsi qu'une valeur par défaut.

Java : SDK 2.x

Vous définissez la description et la valeur par défaut à l'aide d'annotations, comme suit :

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Nous vous recommandons d'enregistrer votre interface avec PipelineOptionsFactory, puis de la transmettre lors de la création de l'objet PipelineOptions. Lorsque vous enregistrez l'interface avec la fonction PipelineOptionsFactory, la commande --help vous permet de rechercher l'interface d'options personnalisées et de l'ajouter à la sortie de la commande --help. La fonction PipelineOptionsFactory vérifie également que les options personnalisées sont compatibles avec toutes les autres options enregistrées.

L'exemple de code suivant montre comment enregistrer l'interface d'options personnalisées avec la fonction PipelineOptionsFactory :

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Le pipeline peut maintenant accepter --myCustomOption=value en tant qu'argument de ligne de commande.

Python

Vous définissez la description et la valeur par défaut comme suit :

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input',
                        help='Input for the pipeline',
                        default='gs://my-bucket/input')
    parser.add_argument('--output',
                        help='Output for the pipeline',
                        default='gs://my-bucket/output')

Java : SDK 1.x

Vous définissez la description et la valeur par défaut à l'aide d'annotations, comme suit :

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Nous vous recommandons d'enregistrer votre interface avec PipelineOptionsFactory, puis de la transmettre lors de la création de l'objet PipelineOptions. Lorsque vous enregistrez l'interface avec la fonction PipelineOptionsFactory, la commande --help vous permet de rechercher l'interface d'options personnalisées et de l'ajouter à la sortie de la commande --help. La fonction PipelineOptionsFactory vérifie également que les options personnalisées sont compatibles avec toutes les autres options enregistrées.

L'exemple de code suivant montre comment enregistrer l'interface d'options personnalisées avec la fonction PipelineOptionsFactory :

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Le pipeline peut maintenant accepter --myCustomOption=value en tant qu'argument de ligne de commande.

Configurer PipelineOptions pour exécution sur le service Cloud Dataflow

Pour exécuter votre pipeline à l'aide du service géré Cloud Dataflow, vous devez définir les champs suivants dans PipelineOptions :

Java : SDK 2.x

  • project : ID du projet GCP.
  • runner : exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliser DataflowRunner.
  • gcpTempLocation : chemin d'accès à Cloud Storage, dans lequel Cloud Dataflow préproduit tous les fichiers temporaires. Vous devez créer ce bucket avant d'exécuter le pipeline. Si vous ne spécifiez pas le chemin gcpTempLocation, vous pouvez spécifier l'option de pipeline tempLocation ; le chemin gcpTempLocation sera ensuite défini sur la valeur tempLocation. Si aucune option n'est spécifiée, un chemin gcpTempLocation par défaut est créé.
  • stagingLocation : bucket Cloud Storage dans lequel Cloud Dataflow préproduit les fichiers binaires. Si vous ne définissez pas cette option, la valeur spécifiée pour tempLocation est également utilisée pour l'emplacement de préproduction.
  • Un champ gcpTempLocation est créé si ni ce champ, ni le champ tempLocation ne sont spécifiés. Si le champ tempLocation est spécifié, mais que gcpTempLocation ne l'est pas, tempLocation doit être un chemin Cloud Storage qui sera utilisé comme valeur par défaut pour gcpTempLocation. Si le champ tempLocation n'est pas spécifié, mais que gcpTempLocation l'est, tempLocation n'est pas renseigné.

Python

  • job_name : nom de la tâche Cloud Dataflow en cours d'exécution.
  • project : ID du projet GCP.
  • runner : exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliser DataflowRunner.
  • staging_location : chemin d'accès à Cloud Storage dans lequel Cloud Dataflow préproduit les packages de code requis par les nœuds de calcul exécutant la tâche.
  • temp_location : chemin d'accès à Cloud Storage dans lequel Cloud Dataflow préproduit les fichiers de tâches temporaires créés lors de l'exécution du pipeline.

Java : SDK 1.x

  • project : ID du projet GCP.
  • runner : exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliser DataflowPipelineRunner ou BlockingDataflowPipelineRunner.
  • stagingLocation : bucket Cloud Storage dans lequel Cloud Dataflow préproduit les fichiers binaires et les fichiers temporaires. Vous devez créer ce bucket avant d'exécuter le pipeline.

Vous pouvez définir ces options de façon automatisée ou les spécifier à l'aide de la ligne de commande. L'exemple de code suivant montre comment construire un pipeline en définissant, de façon automatisée, l'exécuteur et les autres options nécessaires pour exécuter le pipeline à l'aide du service géré Cloud Dataflow.

Java : SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Create and set your PipelineOptions.
options = PipelineOptions(flags=argv)

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://my-bucket/binaries'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

# Create the Pipeline with the specified options.
p = Pipeline(options=options)

Java : SDK 1.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowPipelineRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.

L'exemple de code suivant montre comment définir les options requises pour l'exécution du service Cloud Dataflow à l'aide de la ligne de commande :

Java : SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, pipeline_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=pipeline_args) as p:
  lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
  lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)

Java : SDK 1.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.

Java : SDK 2.x

Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project, --runner, --gcpTempLocation et, éventuellement, --stagingLocation.

Python

Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project, --runner et --stagingLocation.

Java : SDK 1.x

Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project, --runner et --stagingLocation.

Exécution asynchrone

Java : SDK 2.x

L'utilisation de DataflowRunner entraîne l'exécution du pipeline de manière asynchrone sur le cloud de Google. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Cloud Dataflow ou l'interface de ligne de commande de Cloud Dataflow.

Python

L'utilisation de DataflowRunner entraîne l'exécution du pipeline de manière asynchrone sur le cloud de Google. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Cloud Dataflow ou l'interface de ligne de commande de Cloud Dataflow.

Java : SDK 1.x

L'utilisation de DataflowPipelineRunner entraîne l'exécution du pipeline de manière asynchrone sur le cloud de Google. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Cloud Dataflow ou l'interface de ligne de commande de Cloud Dataflow.

Bloquer l'exécution

Java : SDK 2.x

Spécifiez DataflowRunner en tant qu'exécuteur de pipeline et appelez explicitement pipeline.run().waitUntilFinish().

Lorsque vous utilisez DataflowRunner et appelez waitUntilFinish() sur l'objet PipelineResult renvoyé par pipeline.run(), le pipeline s'exécute sur le cloud de la même manière que . Cependant, il attend la fin de la tâche lancée et renvoie l'objet DataflowPipelineJob final. Pendant l'exécution de la tâche, le service Cloud Dataflow imprime les mises à jour de l'état de la tâche et les messages de la console.

Si vous utilisez le SDK 1.x de Java, vous pouvez saisir --runner BlockingDataflowPipelineRunner dans la ligne de commande pour indiquer au programme principal de se bloquer jusqu'à la fin du pipeline. Mais avec Java 2.x, votre programme principal doit appeler explicitement waitUntilFinish().

Python

Pour bloquer l'exécution jusqu'à la fin du pipeline, utilisez la méthode wait_until_finish() de l'objet PipelineResult renvoyé par la méthode run() de l'exécuteur.

Java : SDK 1.x

Spécifiez BlockingDataflowPipelineRunner en tant qu'exécuteur de pipeline.

Lorsque vous utilisez BlockingDataflowPipelineRunner, le pipeline s'exécute sur le cloud de la même manière que DataflowPipelineRunner, mais attend la fin de la tâche lancée. Pendant l'exécution de la tâche, le service Cloud Dataflow imprime les mises à jour de l'état de la tâche et les messages de la console. L'utilisation de BlockingDataflowPipelineRunner renvoie l'objet DataflowPipelineJob final.

Remarque : Si vous tapez Ctrl+C à partir de la ligne de commande, la tâche n'est pas annulée. Le service Cloud Dataflow exécute toujours la tâche sur GCP. Pour annuler la tâche, vous devez utiliser l'interface de surveillance de Cloud Dataflow ou l'interface de ligne de commande de Cloud Dataflow.

Exécution en mode de traitement par flux

Java : SDK 2.x

Si le pipeline lit à partir d'une source de données illimitées (par exemple, Cloud Pub/Sub), il s'exécute automatiquement en mode de traitement par flux.

Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour les classes PCollections illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey.

Python

Si le pipeline utilise une source de données ou un récepteur illimités (par exemple, Cloud Pub/Sub), vous devez définir l'option streaming sur "true".

Les tâches de traitement par flux utilisent par défaut un type de machine Compute Engine défini sur n1-standard-2 au minimum. Vous ne devez pas remplacer cette valeur, car n1-standard-2 correspond au type de machine minimal requis pour l'exécution de tâches de traitement par flux.

Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour les classes PCollections illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey.

Java : SDK 1.x

Si le pipeline utilise une source de données ou un récepteur illimités (par exemple, Cloud Pub/Sub), vous devez définir l'option streaming dans PipelineOptions sur true. Vous pouvez définir l'option streaming de façon automatisée, comme indiqué dans l'exemple suivant :

  DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
  dataflowOptions.setStreaming(true);

Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour les classes PCollections illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey.

Définir d'autres options de pipeline Cloud Dataflow

Pour exécuter le pipeline sur le cloud, vous pouvez définir de façon automatisée les champs suivants dans l'objet PipelineOptions :

Java : SDK 2.x

Champ Type Description Valeur par défaut
runner Class (NameOfRunner) Le PipelineRunner à utiliser. Ce champ vous permet de déterminer l'exécuteur PipelineRunner au moment de l'exécution. DirectRunner (mode local)
streaming boolean Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. Si le pipeline lit à partir d'une source illimitée, la valeur par défaut est définie sur true. Dans le cas contraire, cette valeur est définie sur false.
project String ID du projet GCP. Ce champ est obligatoire si vous souhaitez exécuter le pipeline à l'aide du service géré Cloud Dataflow. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur le projet actuellement configuré dans le SDK Cloud.
gcpTempLocation String Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs://.
stagingLocation String Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur celle que vous avez spécifiée pour tempLocation.
autoscalingAlgorithm String Le mode d'autoscaling pour votre tâche Cloud Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Cloud Dataflow. La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Cloud Dataflow par lot qui utilisent la version 1.6.0 ou ultérieure du SDK Cloud Dataflow pour Java. La valeur par défaut est définie sur NONE pour les tâches de traitement par flux ou par lot utilisant des versions antérieures du SDK Cloud Dataflow pour Java.
numWorkers int Nombre initial d'instances Google Compute Engine à utiliser lors de l'exécution du pipeline. Cette option détermine le nombre de nœuds de calcul démarrés par le service Dataflow au début de la tâche. Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de nœuds de calcul.
maxNumWorkers int Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par numWorkers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de nœuds de calcul.
region String La spécification d'un point de terminaison régional vous permet de définir une région pour le déploiement des tâches Cloud Dataflow. Si ce champ n'est pas défini, la valeur par défaut est us-central1.
zone String Zone Compute Engine pour le lancement d'instances de nœuds de calcul visant à exécuter le pipeline. Si vous avez spécifié le paramètre region, le paramètre zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente.
dataflowKmsKey String Spécifie la clé de chiffrement gérée par le client (CMEK, Customer-Managed Encryption Key) qui permet de chiffrer les données au repos. Vous pouvez contrôler la clé de chiffrement via Cloud KMS. Vous devez également spécifier le paramètre gcpTempLocation pour utiliser cette fonctionnalité. Dans le cas contraire, Cloud Dataflow utilise le chiffrement GCP par défaut à la place du chiffrement CMEK.
flexRSGoal String Définit la planification flexible des ressources (FlexRS) pour les tâches par lots avec autoscaling. Affecte les paramètres numWorkers, autoscalingAlgorithm, zone, region et workerMachineType. Pour plus d'informations, consultez la section traitant des options de pipeline FlexRS. Si ce champ n'est pas défini, la valeur par défaut est SPEED_OPTIMIZED, ce qui revient à omettre cet indicateur. Pour activer FlexRS, vous devez spécifier la valeur COST_OPTIMIZED afin de permettre au service Cloud Dataflow de choisir les ressources avec remise disponibles.
filesToStage List<String> Liste de fichiers locaux, de répertoires de fichiers ou d'archives (.jar, .zip) à mettre à la disposition de chaque nœud de calcul. Si vous définissez cette option, seuls les fichiers spécifiés sont importés (le paramètre Java "classpath" est ignoré). Vous devez spécifier toutes les ressources dans l'ordre "classpath" correct. Les ressources ne se limitent pas au code, mais peuvent également inclure des fichiers de configuration et d'autres ressources à mettre à la disposition de tous les nœuds de calcul. Le code peut accéder aux ressources répertoriées à l'aide des méthodes de recherche de ressources standards de Java. Attention : La spécification d'un chemin de répertoire n'est pas recommandée. En effet, Cloud Dataflow compresse les fichiers avant l'importation, ce qui implique un coût lié au temps de démarrage plus élevé. De même, n'utilisez pas cette option pour transférer des données sur les nœuds de calcul devant être traités par le pipeline. En effet, ce processus est beaucoup plus lent que l'utilisation d'API Cloud Storage/BigQuery natives associées à la source de données Cloud Dataflow appropriée. Si le champ filesToStage est vide, Cloud Dataflow déduit les fichiers à préparer selon le paramètre Java "classpath". Les considérations et précautions mentionnées dans la colonne de gauche s'appliquent également ici (types de fichiers à répertorier et modalités d'accès à ces fichiers depuis votre code).
network String Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. Si ce champ n'est pas défini, GCP suppose que vous allez utiliser un réseau nommé default.
subnetwork String Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. Le service Cloud Dataflow détermine la valeur par défaut.
usePublicIps boolean Indique si les nœuds de calcul Cloud Dataflow utilisent des adresses IP publiques. Si la valeur est définie sur false, les nœuds de calcul Cloud Dataflow utilisent des adresses IP privées pour toutes les communications. Dans ce cas, si l'option subnetwork est spécifiée, l'option network est ignorée. Assurez-vous que pour l'option network ou subnetwork indiquée, l'accès privé à Google est activé. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur true et les nœuds de calcul Cloud Dataflow utilisent des adresses IP publiques.
enableStreamingEngine boolean Indique si Streaming Engine de Cloud Dataflow est activé (valeur "true") ou désactivé. L'activation de Streaming Engine permet d'exécuter les étapes de votre pipeline de streaming dans le backend du service Cloud Dataflow, économisant ainsi les ressources de processeur, de mémoire et de stockage sur disque persistant. La valeur par défaut est false. Cela signifie que les étapes de votre pipeline de traitement par flux sont exécutées entièrement sur des VM de nœuds de calcul.
diskSizeGb int Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. La taille de disque minimale est de 30 Go pour pouvoir héberger l'image de démarrage du nœud de calcul et les journaux locaux. Si le pipeline brasse des données de manière aléatoire, vous devez allouer une taille plus élevée.

Définissez la valeur sur 0 pour utiliser la taille par défaut définie dans le projet Cloud Platform.

Avertissement : La réduction de la taille du disque réduit les E/S disponibles pour le brassage de données. Les tâches de brassage de données peuvent entraîner une augmentation du temps d'exécution et du coût des tâches.

serviceAccount String Spécifie un compte de service de contrôleur géré par l'utilisateur, au format my-service-account-name@<project-id>.iam.gserviceaccount.com. Pour plus d'informations, consultez la section dédiée au compte de service de contrôleur de la page concernant la sécurité et les autorisations dans Cloud Dataflow. Si ce champ n'est pas défini, les nœuds de calcul utilisent le compte de service Compute Engine de votre projet en tant que compte de service de contrôleur.
workerDiskType String Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. Le service Cloud Dataflow détermine la valeur par défaut.
workerMachineType String Type de machine Compute Engine que Cloud Dataflow utilise lors de l'exécution des VM de nœuds de calcul. Cloud Dataflow est compatible avec les types de machines de la série n1 et les types de machines personnalisées. Les types de machines à cœur partagé, tels que les nœuds de calcul des séries f1 et g1, ne sont pas disponibles avec le contrat de niveau de service de Cloud Dataflow. Si cette option n'est pas définie, le service Cloud Dataflow choisit le type de machine en fonction de la tâche.

Consultez la documentation de référence de l'API pour Java concernant l'interface PipelineOptions (et ses sous-interfaces) pour obtenir la liste complète des options de configuration du pipeline.

Python

Champ Type Description Valeur par défaut
runner str Le PipelineRunner à utiliser. Ce champ peut être défini sur DirectRunner ou DataflowRunner. DirectRunner (mode local)
streaming bool Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. false
project str ID du projet GCP. Ce champ est obligatoire si vous souhaitez exécuter le pipeline à l'aide du service géré Cloud Dataflow. Si ce champ n'est pas défini, une erreur se produit.
temp_location str Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur staging_location. Vous devez spécifier au moins un des éléments temp_location ou staging_location pour exécuter le pipeline sur Google Cloud.
staging_location str Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur un répertoire de préproduction dans temp_location. Vous devez spécifier au moins un des éléments temp_location ou staging_location pour exécuter le pipeline sur Google Cloud.
autoscaling_algorithm str Le mode d'autoscaling pour votre tâche Cloud Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Cloud Dataflow. La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Cloud Dataflow par lot et sur NONE pour toutes les tâches Cloud Dataflow de traitement par flux.
num_workers int Nombre d'instances Compute Engine à utiliser lors de l'exécution du pipeline. Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de nœuds de calcul.
max_num_workers int Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par num_workers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de nœuds de calcul.
region str La spécification d'un point de terminaison régional vous permet de définir une région pour le déploiement des tâches Cloud Dataflow. Si ce champ n'est pas défini, la valeur par défaut est us-central1.
zone str Zone Compute Engine pour le lancement d'instances de nœuds de calcul visant à exécuter le pipeline. Si vous avez spécifié le paramètre region, le paramètre zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente.
dataflow_kms_key str Spécifie la clé de chiffrement gérée par le client (CMEK, Customer-Managed Encryption Key) qui permet de chiffrer les données au repos. Vous pouvez contrôler la clé de chiffrement via Cloud KMS. Vous devez également spécifier le paramètre temp_location pour utiliser cette fonctionnalité. Dans le cas contraire, Cloud Dataflow utilise le chiffrement GCP par défaut à la place du chiffrement CMEK.
flexrs_goal str Définit la planification flexible des ressources (FlexRS) pour les tâches par lots avec autoscaling. Affecte les paramètres num_workers, autoscaling_algorithm, zone, region et machine_type. Pour plus d'informations, consultez la section traitant des options de pipeline FlexRS. Si ce champ n'est pas défini, la valeur par défaut est SPEED_OPTIMIZED, ce qui revient à omettre cet indicateur. Pour activer FlexRS, vous devez spécifier la valeur COST_OPTIMIZED afin de permettre au service Cloud Dataflow de choisir les ressources avec remise disponibles.
network str Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. Si ce champ n'est pas défini, GCP suppose que vous allez utiliser un réseau nommé default.
subnetwork str Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. Le service Cloud Dataflow détermine la valeur par défaut.
no_use_public_ips bool Indique que les nœuds de calcul Cloud Dataflow ne doivent pas utiliser d'adresses IP publiques. De ce fait, les nœuds de calcul Cloud Dataflow utilisent des adresses IP privées pour toutes les communications. Si l'option subnetwork est spécifiée, l'option network est ignorée. Assurez-vous que pour l'option network ou subnetwork indiquée, l'accès privé à Google est activé. Le paramètre des adresses IP publiques nécessite le SDK Beam pour Python. Ce paramètre n'est pas disponible avec le SDK Cloud Dataflow pour Python. Si ce champ n'est pas défini, les nœuds de calcul Cloud Dataflow utilisent des adresses IP publiques.
enable_streaming_engine boolean Indique si Streaming Engine de Cloud Dataflow est activé (valeur "true") ou désactivé. L'activation de Streaming Engine permet d'exécuter les étapes de votre pipeline de streaming dans le backend du service Cloud Dataflow, économisant ainsi les ressources de processeur, de mémoire et de stockage sur disque persistant. La valeur par défaut est false. Cela signifie que les étapes de votre pipeline de traitement par flux sont exécutées entièrement sur des VM de nœuds de calcul.
disk_size_gb int Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. La taille de disque minimale est de 30 Go pour pouvoir héberger l'image de démarrage du nœud de calcul et les journaux locaux. Si le pipeline brasse des données de manière aléatoire, vous devez allouer une taille plus élevée. Définissez la valeur sur 0 pour utiliser la taille par défaut définie dans le projet GCP.
service_account_email str Spécifie un compte de service de contrôleur géré par l'utilisateur, au format my-service-account-name@<project-id>.iam.gserviceaccount.com. Pour plus d'informations, consultez la section dédiée au compte de service de contrôleur de la page concernant la sécurité et les autorisations dans Cloud Dataflow. Si ce champ n'est pas défini, les nœuds de calcul utilisent le compte de service Compute Engine de votre projet en tant que compte de service de contrôleur.
worker_disk_type str Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. Le service Cloud Dataflow détermine la valeur par défaut.
machine_type str Type de machine Compute Engine que Cloud Dataflow utilise lors de l'exécution des VM de nœuds de calcul. Notez que worker_machine_type est un indicateur de type alias. Si cette option n'est pas définie, le service Cloud Dataflow choisit le type de machine en fonction de la tâche.

Java : SDK 1.x

Champ Type Description Valeur par défaut
runner Class (NameOfRunner) Le PipelineRunner à utiliser. Ce champ vous permet de déterminer l'exécuteur PipelineRunner au moment de l'exécution. DirectPipelineRunner (mode local)
streaming boolean Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. false
project String ID du projet GCP. Ce champ est obligatoire si vous souhaitez exécuter le pipeline à l'aide du service géré Cloud Dataflow. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur le projet actuellement configuré dans le SDK Cloud.
tempLocation String Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur stagingLocation. Vous devez spécifier au moins un des éléments tempLocation ou stagingLocation pour exécuter le pipeline sur Google Cloud.
stagingLocation String Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur un répertoire de préproduction dans tempLocation. Vous devez spécifier au moins un des éléments tempLocation ou stagingLocation pour exécuter le pipeline sur Google Cloud.
autoscalingAlgorithm String Mode d'autoscaling à utiliser pour la tâche Cloud Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Cloud Dataflow. La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Cloud Dataflow par lot qui utilisent la version 1.6.0 ou ultérieure du SDK Cloud Dataflow pour Java. La valeur par défaut est définie sur NONE pour les tâches de traitement par flux ou par lot utilisant des versions antérieures du SDK Cloud Dataflow pour Java.
numWorkers int Nombre initial d'instances Google Compute Engine à utiliser lors de l'exécution du pipeline. Cette option détermine le nombre de nœuds de calcul démarrés par le service Cloud Dataflow au début de la tâche. Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de nœuds de calcul.
maxNumWorkers int Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par numWorkers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de nœuds de calcul.
zone String Zone Compute Engine pour le lancement d'instances de nœuds de calcul visant à exécuter le pipeline. Si ce champ n'est pas défini, la valeur par défaut est us-central1-f. Vous pouvez spécifier une zone différente pour vous assurer, par exemple, que les nœuds de calcul sont lancés dans l'UE.
filesToStage List<String> Liste de fichiers locaux, de répertoires de fichiers ou d'archives (.jar, .zip) à mettre à la disposition de chaque nœud de calcul. Si vous définissez cette option, seuls les fichiers spécifiés sont importés (le paramètre Java "classpath" est ignoré). Vous devez spécifier toutes les ressources dans l'ordre "classpath" correct. Les ressources ne se limitent pas au code, mais peuvent également inclure des fichiers de configuration et d'autres ressources à mettre à la disposition de tous les nœuds de calcul. Le code peut accéder aux ressources répertoriées à l'aide des méthodes de recherche de ressources standards de Java. Attention : La spécification d'un chemin de répertoire n'est pas recommandée. En effet, Cloud Dataflow compresse les fichiers avant l'importation, ce qui implique un coût lié au temps de démarrage plus élevé. De même, n'utilisez pas cette option pour transférer des données sur les nœuds de calcul devant être traités par le pipeline. En effet, ce processus est beaucoup plus lent que l'utilisation d'API Cloud Storage/BigQuery natives associées à la source de données Cloud Dataflow appropriée. Si le champ filesToStage est vide, Cloud Dataflow déduit les fichiers à préparer selon le paramètre Java "classpath". Les considérations et précautions mentionnées dans la colonne de gauche s'appliquent également ici (types de fichiers à répertorier et modalités d'accès à ces fichiers depuis votre code).
network String Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. Si ce champ n'est pas défini, GCP suppose que vous allez utiliser un réseau nommé default.
subnetwork String Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. Le service Cloud Dataflow détermine la valeur par défaut.
diskSizeGb int Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. La taille de disque minimale est de 30 Go pour pouvoir héberger l'image de démarrage du nœud de calcul et les journaux locaux. Si le pipeline brasse des données de manière aléatoire, vous devez allouer une taille plus élevée.

Définissez la valeur sur 0 pour utiliser la taille par défaut définie dans le projet GCP.

Avertissement : La réduction de la taille du disque réduit les E/S disponibles pour le brassage de données. Les tâches de brassage de données peuvent entraîner une augmentation du temps d'exécution et du coût des tâches.

workerDiskType String Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. Le service Cloud Dataflow détermine la valeur par défaut.
workerMachineType String Type de machine Compute Engine que Cloud Dataflow utilise lors de l'exécution des VM de nœuds de calcul. Cloud Dataflow est compatible avec les types de machines de la série n1 et les types de machines personnalisées. Les types de machines à cœur partagé, tels que les nœuds de calcul des séries f1 et g1, ne sont pas disponibles avec le contrat de niveau de service de Cloud Dataflow. Si cette option n'est pas définie, le service Cloud Dataflow choisit le type de machine en fonction de la tâche.

Consultez la documentation de référence de l'API pour Java concernant l'interface PipelineOptions (et ses sous-interfaces) pour obtenir la liste complète des options de configuration du pipeline.

Configurer PipelineOptions pour une exécution locale

Au lieu d'exécuter le pipeline sur des ressources cloud gérées, vous pouvez choisir d'exécuter le pipeline localement. L'exécution locale présente certains avantages pour tester, déboguer ou exécuter le pipeline sur de petits ensembles de données. Par exemple, l'exécution locale supprime la dépendance sur le service Cloud Dataflow distant et le projet GCP associé.

Si vous optez pour une exécution locale, il est vivement recommandé d'exécuter le pipeline avec des ensembles de données suffisamment petits pour tenir dans la mémoire locale. Vous pouvez créer un petit ensemble de données en mémoire à l'aide d'une transformation Create, ou utiliser une transformation Read pour travailler sur de petits fichiers locaux ou distants. Une exécution locale constitue un moyen simple et rapide d'effectuer des tests et des opérations de débogage, avec moins de dépendances externes. Cependant, une exécution locale est limitée par la mémoire disponible dans l'environnement local.

L'exemple de code suivant montre comment construire un pipeline qui s'exécute dans l'environnement local.

Java : SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectRunner est déjà définie par défaut. Cependant, vous devez inclure explicitement DirectRunner en tant que dépendance, ou l'ajouter au paramètre "classpath".

Python

# Create and set your Pipeline Options.
options = PipelineOptions()
p = Pipeline(options=options)

Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectRunner est déjà définie par défaut.

Java : SDK 1.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectPipelineRunner est déjà définie par défaut.

Une fois le pipeline construit, exécutez-le.

Définir d'autres options de pipeline locales

Lors de l'exécution locale du pipeline, les valeurs par défaut des propriétés de PipelineOptions sont généralement suffisantes.

Java : SDK 2.x

Vous pouvez trouver les valeurs par défaut de la classe Java PipelineOptions dans la documentation de référence de l'API Java. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.

Si le pipeline utilise des produits GCP pour les entrées/sorties, par exemple BigQuery ou Cloud Storage, vous devrez peut-être définir certaines options de projet GCP et certains identifiants. Dans ce cas, vous devez utiliser GcpOptions.setProject pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GcpOptions pour plus d'informations.

Python

Vous pouvez trouver les valeurs par défaut de la classe Python PipelineOptions dans la documentation de référence de l'API Python. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.

Si le pipeline utilise des produits GCP pour les entrées/sorties, par exemple BigQuery ou Cloud Storage, vous devrez peut-être définir certaines options de projet GCP et d'identifiants. Dans ce cas, vous devez utiliser options.view_as(GoogleCloudOptions).project pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GoogleCloudOptions pour plus d'informations.

Java : SDK 1.x

Vous pouvez trouver les valeurs par défaut de la classe Java PipelineOptions dans la documentation de référence de l'API Java. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.

Si le pipeline utilise des produits GCP pour les entrées/sorties, par exemple BigQuery ou Cloud Storage, vous devrez peut-être définir certaines options de projet GCP et d'identifiants. Dans ce cas, vous devez utiliser GcpOptions.setProject pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GcpOptions pour plus d'informations.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.