Spécifier les paramètres d'exécution du pipeline

Une fois que le programme Apache Beam a construit un pipeline, vous devez l'exécuter. L'exécution du pipeline est indépendante de l'exécution du programme Apache Beam. Le programme Apache Beam construit le pipeline et le code que vous avez écrit génère une série d'étapes, qui devront être exécutées par un exécuteur de pipeline. Celui-ci peut correspondre au service géré Dataflow sur Google Cloud, à un service d'exécution tiers ou un exécuteur de pipeline local qui lance les tâches directement dans votre environnement local.

Vous pouvez spécifier l'exécuteur de pipeline et d'autres options d'exécution à l'aide de la classe PipelineOptions du SDK Apache Beam. La classe PipelineOptions permet de configurer l'emplacement et les options d'exécution du pipeline, ainsi que les ressources qu'il utilise.

Le plus souvent, vous aurez besoin que votre pipeline s'exécute sur les ressources gérées de Google Cloud à l'aide du service d'exécution Dataflow. L'exécution de votre pipeline avec le service Dataflow crée une tâche Dataflow. Celle-ci utilise les ressources Compute Engine et Cloud Storage de votre projet Google Cloud.

Vous pouvez également exécuter le pipeline localement. Dans ce cas, les transformations de pipeline sont exécutées sur la même machine que le programme Dataflow. L'exécution locale est utile à des fins de test et de débogage, en particulier si le pipeline peut utiliser des ensembles de données en mémoire plus petits.

Définir l'objet PipelineOptions

Vous transmettez les options de pipeline PipelineOptions lorsque vous créez l'objet Pipeline dans le programme Dataflow. Lorsque le service Dataflow exécute le pipeline, il envoie une copie des options de pipeline PipelineOptions à chaque instance de nœud de calcul.

Java : SDK 2.x

Remarque : Vous pouvez accéder à PipelineOptions dans une instance DoFn de l'opération ParDo en utilisant la méthode ProcessContext.getPipelineOptions.

Python

Cette fonctionnalité n'est pas encore disponible dans le SDK Apache Beam pour Python.

Java : SDK 1.x

Définir l'objet PipelineOptions à partir d'arguments de ligne de commande

Vous pouvez configurer le pipeline en créant un objet PipelineOptions et en définissant les champs directement. Cependant, les SDK Apache Beam incluent un analyseur de ligne de commande que vous pouvez utiliser pour définir des champs dans l'objet PipelineOptions à l'aide d'arguments de ligne de commande.

Java : SDK 2.x

Pour lire les options à partir de la ligne de commande, construisez l'objet PipelineOptions à l'aide de la méthode PipelineOptionsFactory.fromArgs, comme dans l'exemple de code suivant :

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

Remarque : L'ajout de la méthode .withValidation permet à Dataflow de vérifier la présence des arguments de ligne de commande requis et de valider les valeurs des arguments.

L'utilisation de PipelineOptionsFactory.fromArgs permet d'interpréter les arguments de ligne de commande au format suivant :

--<option>=<value>

En construisant l'objet PipelineOptions de la sorte, vous pouvez spécifier les options de n'importe quelle sous-interface de org.apache.beam.sdk.options.PipelineOptions en tant qu'argument de ligne de commande.

Python

Pour lire les options à partir de la ligne de commande, créez l'objet PipelineOptions, comme dans l'exemple de code suivant :

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=argv)

L'argument (flags=argv) de PipelineOptions permet d'interpréter les arguments de ligne de commande au format suivant :

--<option>=<value>

En construisant l'objet PipelineOptions de la sorte, vous pouvez spécifier les options en définissant des sous-classes de la classe PipelineOptions.

Java : SDK 1.x

Créer des options personnalisées

Vous pouvez ajouter vos propres options personnalisées en plus des options de pipeline PipelineOptions standards. L'analyseur de ligne de commande de Dataflow peut également définir vos options personnalisées à l'aide d'arguments de ligne de commande spécifiés au même format.

Java : SDK 2.x

Pour ajouter vos propres options, définissez une interface avec les méthodes "getter" et "setter" pour chaque option, comme dans l'exemple suivant :

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

Pour ajouter vos propres options, utilisez la méthode add_argument() (qui se comporte exactement comme le module argparse standard de Python), comme dans l'exemple suivant :

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java : SDK 1.x

Vous pouvez également spécifier une description, qui apparaît lorsqu'un utilisateur transmet --help en tant qu'argument de ligne de commande, ainsi qu'une valeur par défaut.

Java : SDK 2.x

Vous définissez la description et la valeur par défaut à l'aide d'annotations, comme suit :

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Nous vous recommandons d'enregistrer votre interface avec PipelineOptionsFactory, puis de la transmettre lors de la création de l'objet PipelineOptions. Lorsque vous enregistrez l'interface avec la fonction PipelineOptionsFactory, la commande --help vous permet de rechercher l'interface d'options personnalisées et de l'ajouter à la sortie de la commande --help. La fonction PipelineOptionsFactory vérifie également que les options personnalisées sont compatibles avec toutes les autres options enregistrées.

L'exemple de code suivant montre comment enregistrer l'interface d'options personnalisées avec la fonction PipelineOptionsFactory :

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

Le pipeline peut maintenant accepter --myCustomOption=value en tant qu'argument de ligne de commande.

Python

Vous définissez la description et la valeur par défaut comme suit :

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')

Java : SDK 1.x

Configurer PipelineOptions pour exécution sur le service Cloud Dataflow

Pour exécuter votre pipeline à l'aide du service géré Dataflow, vous devez définir les champs suivants dans PipelineOptions :

Java : SDK 2.x

  • project est l'ID du projet Google Cloud.
  • runner : exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliser DataflowRunner.
  • gcpTempLocation : chemin d'accès à Cloud Storage, dans lequel Dataflow prépare tous les fichiers temporaires. Vous devez créer ce bucket avant d'exécuter le pipeline. Si vous ne spécifiez pas le chemin gcpTempLocation, vous pouvez spécifier l'option de pipeline tempLocation ; le chemin gcpTempLocation sera ensuite défini sur la valeur tempLocation. Si aucune option n'est spécifiée, un chemin gcpTempLocation par défaut est créé.
  • stagingLocation : bucket Cloud Storage dans lequel Dataflow prépare les fichiers binaires. Si vous ne définissez pas cette option, la valeur spécifiée pour tempLocation est également utilisée pour l'emplacement de préproduction.
  • Un champ gcpTempLocation est créé si ni ce champ, ni le champ tempLocation ne sont spécifiés. Si le champ tempLocation est spécifié, mais que gcpTempLocation ne l'est pas, tempLocation doit être un chemin Cloud Storage qui sera utilisé comme valeur par défaut pour gcpTempLocation. Si le champ tempLocation n'est pas spécifié, mais que gcpTempLocation l'est, tempLocation n'est pas renseigné.

Remarque : Si vous utilisez le SDK Apache Beam pour Java en version 2.15.0 ou ultérieure, vous devez également spécifier region.

Python

  • project est l'ID du projet Google Cloud.
  • runner : exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliser DataflowRunner.
  • staging_location : chemin d'accès à Cloud Storage dans lequel Dataflow prépare les packages de code requis par les nœuds de calcul exécutant la tâche.
  • temp_location : chemin d'accès à Cloud Storage dans lequel Dataflow prépare les fichiers de tâches temporaires créés lors de l'exécution du pipeline.

Remarque : Si vous utilisez le SDK Apache Beam pour Python en version 2.15.0 ou ultérieure, vous devez également spécifier region.

Java : SDK 1.x

Vous pouvez définir ces options de façon automatisée ou les spécifier à l'aide de la ligne de commande. L'exemple de code suivant montre comment construire un pipeline en programmant les paramètres de l'exécuteur et les autres options nécessaires pour exécuter le pipeline à l'aide du service géré Dataflow.

Java : SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
options = PipelineOptions(
    flags=argv,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
# with beam.Pipeline(options=options) as pipeline:
#   pass  # build your pipeline here.

Java : SDK 1.x

Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.

L'exemple suivant montre comment définir les options nécessaires pour exécuter le service Dataflow à l'aide de la ligne de commande :

Java : SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
args, beam_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=beam_args) as pipeline:
  lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input)
  lines | 'Write files' >> beam.io.WriteToText(args.output)

Java : SDK 1.x

Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.

Java : SDK 2.x

Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project, --runner, --gcpTempLocation et, éventuellement, --stagingLocation.

Python

Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project, --runner et --staging_location.

Java : SDK 1.x

Exécution asynchrone

Java : SDK 2.x

L'utilisation de DataflowRunner entraîne l'exécution du pipeline de manière asynchrone sur le cloud de Google. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.

Python

L'utilisation de DataflowRunner entraîne l'exécution du pipeline de manière asynchrone sur le cloud de Google. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.

Java : SDK 1.x

Exécution synchrone

Java : SDK 2.x

Spécifiez DataflowRunner en tant qu'exécuteur de pipeline et appelez explicitement pipeline.run().waitUntilFinish().

Lorsque vous utilisez DataflowRunner et appelez waitUntilFinish() sur l'objet PipelineResult renvoyé par pipeline.run(), le pipeline s'exécute sur le cloud, mais le code local attend la fin de la tâche cloud et renvoie la valeur finale DataflowPipelineJob. Pendant l'exécution de la tâche, le service Dataflow imprime les mises à jour de l'état de la tâche et les messages de la console.

Si vous utilisez le SDK 1.x de Java, vous pouvez saisir --runner BlockingDataflowPipelineRunner dans la ligne de commande pour indiquer au programme principal de se bloquer jusqu'à la fin du pipeline. Mais avec Java 2.x, votre programme principal doit appeler explicitement waitUntilFinish().

Python

Pour bloquer l'exécution jusqu'à la fin du pipeline, utilisez la méthode wait_until_finish() de l'objet PipelineResult renvoyé par la méthode run() de l'exécuteur.

Java : SDK 1.x

Remarque : Si vous tapez Ctrl+C à partir de la ligne de commande, la tâche n'est pas annulée. Le service Dataflow exécute toujours la tâche sur Google Cloud. Pour annuler la tâche, vous devez utiliser l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.

Exécution en mode de traitement par flux

Java : SDK 2.x

Si le pipeline lit à partir d'une source de données illimitées (par exemple, Cloud Pub/Sub), il s'exécute automatiquement en mode de traitement par flux.

Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour les classes PCollections illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey.

Python

Si le pipeline utilise une source de données ou un récepteur illimités (par exemple, Pub/Sub), vous devez définir l'option streaming sur "true".

Les tâches de traitement par flux utilisent par défaut un type de machine Compute Engine défini sur n1-standard-2 au minimum. Vous ne devez pas remplacer cette valeur, car n1-standard-2 correspond au type de machine minimal requis pour l'exécution de tâches de traitement par flux.

Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour les classes PCollections illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey.

Java : SDK 1.x

Définir d'autres options de pipeline Cloud Dataflow

Pour exécuter le pipeline sur le cloud, vous pouvez définir de façon automatisée les champs suivants dans l'objet PipelineOptions :

Java : SDK 2.x

Champ Type Description Valeur par défaut
runner Class (NameOfRunner) Le PipelineRunner à utiliser. Ce champ vous permet de déterminer l'exécuteur PipelineRunner au moment de l'exécution. DirectRunner (mode local)
streaming boolean Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. Si le pipeline lit à partir d'une source illimitée, la valeur par défaut est définie sur true. Dans le cas contraire, cette valeur est définie sur false.
project String ID du projet Google Cloud. Veuillez compléter ce champ si vous souhaitez exécuter votre pipeline à l'aide du service géré Dataflow. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur le projet actuellement configuré dans le SDK Cloud.
jobName String Nom de la tâche Dataflow en cours d'exécution, tel qu'il apparaît dans la liste des tâches Dataflow et les détails de la tâche. Utilisé également lors de la mise à jour d'un pipeline existant. Dataflow génère automatiquement un nom unique.
gcpTempLocation String Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs://.
stagingLocation String Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur celle que vous avez spécifiée pour tempLocation.
autoscalingAlgorithm String Le mode d'autoscaling pour votre tâche Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Dataflow. La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Dataflow par lots et pour les tâches de streaming utilisant Streaming Engine. La valeur par défaut est définie sur NONE pour les tâches de streaming qui n'utilisent pas Streaming Engine.
numWorkers int Nombre initial d'instances Google Compute Engine à utiliser lors de l'exécution du pipeline. Cette option détermine le nombre de nœuds de calcul démarrés par le service Dataflow au début de la tâche. Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul.
maxNumWorkers int Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par numWorkers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul.
numberOfWorkerHarnessThreads int Nombre de threads par faisceau de nœud de calcul. Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de threads par nœud de calcul.
region String Spécifie un point de terminaison régional pour le déploiement de vos tâches Dataflow. Si ce champ n'est pas défini, la valeur par défaut est us-central1.
workerRegion String

Spécifie une région Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter le pipeline. Cette option permet d'exécuter des nœuds de calcul dans un emplacement différent de celui utilisé par region pour déployer, gérer et surveiller les tâches. La zone de workerRegion est automatiquement attribuée.

Remarque : Cette option ne peut pas être combinée avec workerZone ou zone.

Si ce champ n'est pas défini, la valeur par défaut est region.
workerZone String

Spécifie une zone Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter le pipeline. Cette option permet d'exécuter des nœuds de calcul dans un emplacement différent de celui utilisé par region pour déployer, gérer et surveiller les tâches.

Remarque : Cette option ne peut pas être combinée avec workerRegion ou zone.

Si vous spécifiez region ou workerRegion, workerZone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente.
zone String (Obsolète) Pour le SDK Apache Beam 2.17.0 ou version antérieure, spécifie la zone Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter votre pipeline. Si vous spécifiez region, zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente.
dataflowKmsKey String Spécifie la clé de chiffrement gérée par le client (CMEK, Customer-Managed Encryption Key) qui permet de chiffrer les données au repos. Vous pouvez contrôler la clé de chiffrement via Cloud KMS. Vous devez également spécifier le paramètre gcpTempLocation pour utiliser cette fonctionnalité. Dans le cas contraire, Dataflow utilise le chiffrement Google Cloud par défaut à la place du chiffrement CMEK.
flexRSGoal String Définit la planification flexible des ressources (FlexRS) pour les tâches par lots avec autoscaling. Affecte les paramètres numWorkers, autoscalingAlgorithm, zone, region et workerMachineType. Pour plus d'informations, consultez la section traitant des options de pipeline FlexRS. Si ce champ n'est pas défini, la valeur par défaut est SPEED_OPTIMIZED, ce qui revient à omettre cet indicateur. Pour activer FlexRS, vous devez spécifier la valeur COST_OPTIMIZED afin de permettre au service Dataflow de choisir les ressources avec remise disponibles.
filesToStage List<String> Une liste non vide de fichiers locaux, de répertoires de fichiers ou d'archives (telles que des fichiers JAR ou ZIP) à mettre à la disposition de chaque nœud de calcul. Si vous définissez cette option, seuls les fichiers spécifiés sont importés (le paramètre Java "classpath" est ignoré). Vous devez spécifier toutes les ressources dans l'ordre "classpath" correct. Les ressources ne se limitent pas au code, mais peuvent également inclure des fichiers de configuration et d'autres ressources à mettre à la disposition de tous les nœuds de calcul. Le code peut accéder aux ressources répertoriées à l'aide des méthodes de recherche de ressources standards de Java. Attention : La spécification d'un chemin de répertoire n'est pas recommandée. En effet, Dataflow compresse les fichiers avant l'importation, ce qui implique un coût lié au temps de démarrage plus élevé. De plus, évitez d'utiliser cette option pour transférer des données à des nœuds de calcul censés être traités par le pipeline. Utiliser l'API native Cloud Storage/BigQuery en association avec une source de données Dataflow adaptée est beaucoup plus rapide. Si le champ filesToStage est omis, Dataflow déduit les fichiers à préparer selon le paramètre Java "classpath". Les considérations et précautions mentionnées dans la colonne de gauche s'appliquent également ici (types de fichiers à répertorier et modalités d'accès à ces fichiers depuis votre code).
network String Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. Si ce champ n'est pas défini, Google Cloud considère que vous allez utiliser un réseau nommé default.
subnetwork String Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. Le service Dataflow détermine la valeur par défaut.
usePublicIps boolean Indique si les nœuds de calcul Dataflow utilisent des adresses IP publiques. Si la valeur est définie sur false, les nœuds de calcul Dataflow utilisent des adresses IP privées pour toutes les communications. Dans ce cas, si l'option subnetwork est spécifiée, l'option network est ignorée. Assurez-vous que pour l'option network ou subnetwork indiquée, l'accès privé à Google est activé. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur true et les nœuds de calcul Cloud Dataflow utilisent des adresses IP publiques.
enableStreamingEngine boolean Indique si Streaming Engine de Dataflow est activé (valeur "true") ou désactivé. L'activation de Streaming Engine permet d'exécuter les étapes de votre pipeline de streaming dans le backend du service Dataflow, économisant ainsi les ressources de processeur, de mémoire et de stockage sur disque persistant. La valeur par défaut est false. Cela signifie que les étapes de votre pipeline de streaming sont exécutées entièrement sur des VM de nœuds de calcul.
createFromSnapshot String Spécifie l'ID d'instantané à utiliser lors de la création d'une tâche de streaming. Les instantanés enregistrent l'état d'un pipeline de traitement par flux et vous permettent de démarrer une nouvelle version de votre tâche à partir de cet état. Pour en savoir plus sur les instantanés, consultez la page Utiliser des instantanés. Si ce champ n'est pas défini, aucun instantané n'est utilisé pour créer une tâche.
hotKeyLoggingEnabled boolean Indique qu'une clé d'hôte est détectée dans le pipeline. Elle est affichée dans le projet Cloud Logging de l'utilisateur. Si ce champ n'est pas défini, seule la présence d'une clé d'hôte est enregistrée.
diskSizeGb int

Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. Si ce champ est défini, spécifiez au moins 30 Go pour tenir compte de l'image de démarrage du nœud de calcul et des journaux locaux.

Pour les tâches par lots utilisant Dataflow Shuffle, cette option définit la taille du disque de démarrage d'une VM de calcul. Pour les tâches par lots qui n'utilisent pas Dataflow Shuffle, cette option définit la taille des disques utilisés pour stocker les données brassées. La taille du disque de démarrage n'est pas affectée.

Pour les tâches traitées par flux à l'aide de Streaming Engine, cette option définit la taille des disques de démarrage. Pour les tâches de streaming qui n'utilisent pas Streaming Engine, cette option définit la taille de chaque disque persistant supplémentaire créé par le service Dataflow. Le disque de démarrage n'est pas affecté. Vous pouvez définir la taille du disque de démarrage avec l'option streaming_boot_disk_size_gb (en cours de test) pour les tâches de streaming qui n'utilisent pas Streaming Engine. Par exemple, spécifiez --experiments=streaming_boot_disk_size_gb=80 pour créer des disques de démarrage de 80 Go.

Définissez la valeur sur 0 pour utiliser la taille par défaut définie dans le projet Cloud Platform.

La valeur par défaut est définie sur 25 Go pour les tâches par lots qui utilisent Dataflow Shuffle. Sinon, la valeur par défaut est définie sur 250 Go.

La valeur par défaut est définie sur 30 Go pour les tâches de streaming qui utilisent Streaming Engine. Sinon, la valeur par défaut est définie sur 400 Go.

Avertissement : La réduction de la taille du disque réduit les E/S disponibles pour le brassage de données. Les tâches de brassage de données qui n'utilisent ni Dataflow Shuffle, ni Streaming Engine peuvent entraîner une augmentation du temps d'exécution et du coût des tâches.

serviceAccount String Spécifie un compte de service de contrôleur géré par l'utilisateur, au format my-service-account-name@<project-id>.iam.gserviceaccount.com. Pour plus d'informations, consultez la section dédiée au compte de service de contrôleur de la page concernant la sécurité et les autorisations dans Cloud Dataflow. Si ce champ n'est pas défini, les nœuds de calcul utilisent le compte de service Compute Engine de votre projet en tant que compte de service de contrôleur.
workerDiskType String Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. Le service Dataflow détermine la valeur par défaut.
workerMachineType String

Type de machine Compute Engine utilisé par Dataflow lors du démarrage des VM de nœud de calcul. Vous pouvez utiliser toutes les familles de types de machines Compute Engine disponibles, ainsi que des types de machines personnalisés.

Pour des résultats optimaux, utilisez des types de machine n1. Les types de machines à cœur partagé, tels que les nœuds de calcul des séries f1 et g1, ne sont pas disponibles avec le contrat de niveau de service de Dataflow.

Notez que l'utilisation de Dataflow est facturée en fonction du nombre de processeurs virtuels et de Go de mémoire consommés par les nœuds de calcul. La facturation ne dépend pas de la famille du type de machine.

Si cette option n'est pas définie, le service Dataflow choisit le type de machine en fonction de la tâche.

Consultez la documentation de référence de l'API pour Java concernant l'interface PipelineOptions (et ses sous-interfaces) pour obtenir la liste complète des options de configuration du pipeline.

Python

Champ Type Description Valeur par défaut
runner str Le PipelineRunner à utiliser. Ce champ peut être défini sur DirectRunner ou DataflowRunner. DirectRunner (mode local)
streaming bool Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. false
project str ID du projet Google Cloud. Veuillez compléter ce champ si vous souhaitez exécuter votre pipeline à l'aide du service géré Dataflow. Si ce champ n'est pas défini, une erreur se produit.
job_name String Nom de la tâche Dataflow en cours d'exécution, tel qu'il apparaît dans la liste des tâches Dataflow et les détails de la tâche. Dataflow génère automatiquement un nom unique.
temp_location str Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur staging_location. Vous devez spécifier au moins un des éléments temp_location ou staging_location pour exécuter le pipeline sur Google Cloud.
staging_location str Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur un répertoire de préproduction dans temp_location. Vous devez spécifier au moins un des éléments temp_location ou staging_location pour exécuter le pipeline sur Google Cloud.
autoscaling_algorithm str Le mode d'autoscaling pour votre tâche Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Dataflow. La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Dataflow par lots et pour les tâches de streaming utilisant Streaming Engine. La valeur par défaut est définie sur NONE pour les tâches de streaming qui n'utilisent pas Streaming Engine.
num_workers int Nombre d'instances Compute Engine à utiliser lors de l'exécution du pipeline. Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul.
max_num_workers int Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par num_workers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul.
number_of_worker_harness_threads int Nombre de threads par faisceau de nœud de calcul. Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de threads par nœud de calcul. Pour utiliser ce paramètre, vous devez également utiliser l'option --experiments=use_runner_v2
region str Spécifie un point de terminaison régional pour le déploiement de vos tâches Dataflow. Si ce champ n'est pas défini, la valeur par défaut est us-central1.
worker_region String

Spécifie une région Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter le pipeline. Cette option permet d'exécuter des nœuds de calcul dans un emplacement différent de celui utilisé par region pour déployer, gérer et surveiller les tâches. La zone de worker_region est automatiquement attribuée.

Remarque : Cette option ne peut pas être combinée avec worker_zone ou zone.

Si ce champ n'est pas défini, la valeur par défaut est region.
worker_zone String

Spécifie une zone Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter le pipeline. Cette option permet d'exécuter des nœuds de calcul dans un emplacement différent de celui utilisé par region pour déployer, gérer et surveiller les tâches.

Remarque : Cette option ne peut pas être combinée avec worker_region ou zone.

Si vous spécifiez region ou worker_region, worker_zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente.
zone str (Obsolète) Pour le SDK Apache Beam 2.17.0 ou version antérieure, spécifie la zone Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter votre pipeline. Si vous spécifiez region, zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente.
dataflow_kms_key str Spécifie la clé de chiffrement gérée par le client (CMEK, Customer-Managed Encryption Key) qui permet de chiffrer les données au repos. Vous pouvez contrôler la clé de chiffrement via Cloud KMS. Vous devez également spécifier le paramètre temp_location pour utiliser cette fonctionnalité. Dans le cas contraire, Dataflow utilise le chiffrement Google Cloud par défaut à la place du chiffrement CMEK.
flexrs_goal str Définit la planification flexible des ressources (FlexRS) pour les tâches par lots avec autoscaling. Affecte les paramètres num_workers, autoscaling_algorithm, zone, region et machine_type. Pour plus d'informations, consultez la section traitant des options de pipeline FlexRS. Si ce champ n'est pas défini, la valeur par défaut est SPEED_OPTIMIZED, ce qui revient à omettre cet indicateur. Pour activer FlexRS, vous devez spécifier la valeur COST_OPTIMIZED afin de permettre au service Dataflow de choisir les ressources avec remise disponibles.
network str Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. Si ce champ n'est pas défini, Google Cloud considère que vous allez utiliser un réseau nommé default.
subnetwork str Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. Le service Dataflow détermine la valeur par défaut.
use_public_ips bool Indique que les nœuds de calcul Dataflow doivent utiliser des adresses IP publiques. Si la valeur est définie sur false, les nœuds de calcul Dataflow utilisent des adresses IP privées pour toutes les communications. Dans ce cas, si l'option subnetwork est spécifiée, l'option network est ignorée. Assurez-vous que pour l'option network ou subnetwork indiquée, l'accès privé à Google est activé. Cette option nécessite le SDK Beam pour Python. Le SDK Dataflow pour Python obsolète n'est plus compatible. Si ce champ n'est pas défini, les nœuds de calcul Dataflow utilisent des adresses IP publiques.
enable_streaming_engine bool Indique si Streaming Engine de Dataflow est activé (valeur "true") ou désactivé. L'activation de Streaming Engine permet d'exécuter les étapes de votre pipeline de streaming dans le backend du service Dataflow, économisant ainsi les ressources de processeur, de mémoire et de stockage sur disque persistant. La valeur par défaut est false. Cela signifie que les étapes de votre pipeline de streaming sont exécutées entièrement sur des VM de nœuds de calcul.
disk_size_gb int

Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. Si vous définissez ce paramètre, spécifiez au moins 30 Go pour prendre en compte l'image de démarrage du nœud de calcul et les journaux locaux.

Pour les tâches par lots utilisant Dataflow Shuffle, cette option définit la taille du disque de démarrage d'une VM de calcul. Pour les tâches par lots qui n'utilisent pas Dataflow Shuffle, cette option définit la taille des disques utilisés pour stocker les données brassées. La taille du disque de démarrage n'est pas affectée.

Pour les tâches traitées par flux à l'aide de Streaming Engine, cette option définit la taille des disques de démarrage. Pour les tâches de streaming qui n'utilisent pas Streaming Engine, cette option définit la taille de chaque disque persistant supplémentaire créé par le service Dataflow. Le disque de démarrage n'est pas affecté. Vous pouvez définir la taille du disque de démarrage avec l'option streaming_boot_disk_size_gb (en cours de test) pour les tâches de streaming qui n'utilisent pas Streaming Engine. Par exemple, spécifiez --experiments=streaming_boot_disk_size_gb=80 pour créer des disques de démarrage de 80 Go.

Définissez la valeur sur 0 pour utiliser la taille par défaut définie dans le projet Cloud Platform.

La valeur par défaut est définie sur 25 Go pour les tâches par lots qui utilisent Dataflow Shuffle. Sinon, la valeur par défaut est définie sur 250 Go.

La valeur par défaut est définie sur 30 Go pour les tâches de streaming qui utilisent Streaming Engine. Sinon, la valeur par défaut est définie sur 400 Go.

Avertissement : La réduction de la taille du disque réduit les E/S disponibles pour le brassage de données. Les tâches de brassage de données qui n'utilisent ni Dataflow Shuffle, ni Streaming Engine peuvent entraîner une augmentation du temps d'exécution et du coût des tâches.

service_account_email str Spécifie un compte de service de contrôleur géré par l'utilisateur, au format my-service-account-name@<project-id>.iam.gserviceaccount.com. Pour plus d'informations, consultez la section dédiée au compte de service de contrôleur de la page concernant la sécurité et les autorisations dans Cloud Dataflow. Si ce champ n'est pas défini, les nœuds de calcul utilisent le compte de service Compute Engine de votre projet en tant que compte de service de contrôleur.
worker_disk_type str Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. Le service Dataflow détermine la valeur par défaut.
machine_type str

Type de machine Compute Engine utilisé par Dataflow lors du démarrage des VM de nœud de calcul. Vous pouvez utiliser toutes les familles de types de machines Compute Engine disponibles, ainsi que des types de machines personnalisés.

Pour des résultats optimaux, utilisez des types de machine n1. Les types de machines à cœur partagé, tels que les nœuds de calcul des séries f1 et g1, ne sont pas disponibles avec le contrat de niveau de service de Dataflow.

Notez que l'utilisation de Dataflow est facturée en fonction du nombre de processeurs virtuels et de Go de mémoire consommés par les nœuds de calcul. La facturation ne dépend pas de la famille du type de machine.

Si cette option n'est pas définie, le service Dataflow choisit le type de machine en fonction de la tâche.

Java : SDK 1.x

Configurer PipelineOptions pour une exécution locale

Au lieu d'exécuter le pipeline sur des ressources cloud gérées, vous pouvez choisir d'exécuter le pipeline localement. L'exécution locale présente certains avantages pour tester, déboguer ou exécuter le pipeline sur de petits ensembles de données. Par exemple, l'exécution locale supprime la dépendance sur le service Dataflow distant et le projet Google Cloud associé.

Si vous optez pour une exécution locale, il est vivement recommandé d'exécuter le pipeline avec des ensembles de données suffisamment petits pour tenir dans la mémoire locale. Vous pouvez créer un petit ensemble de données en mémoire à l'aide d'une transformation Create, ou utiliser une transformation Read pour travailler sur de petits fichiers locaux ou distants. Une exécution locale constitue un moyen simple et rapide d'effectuer des tests et des opérations de débogage, avec moins de dépendances externes. Cependant, une exécution locale est limitée par la mémoire disponible dans l'environnement local.

L'exemple de code suivant montre comment construire un pipeline qui s'exécute dans l'environnement local.

Java : SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectRunner est déjà définie par défaut. Cependant, vous devez inclure explicitement DirectRunner en tant que dépendance, ou l'ajouter au paramètre "classpath".

Python

# Create and set your Pipeline Options.
options = PipelineOptions(flags=argv)
my_options = options.view_as(MyOptions)

with Pipeline(options=options) as pipeline:
  pass  # build your pipeline here.

Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectRunner est déjà définie par défaut.

Java : SDK 1.x

Une fois le pipeline construit, exécutez-le.

Définir d'autres options de pipeline locales

Lors de l'exécution locale du pipeline, les valeurs par défaut des propriétés de PipelineOptions sont généralement suffisantes.

Java : SDK 2.x

Vous pouvez trouver les valeurs par défaut de la classe Java PipelineOptions dans la documentation de référence de l'API Java. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.

Si votre pipeline utilise des services Google cloud tels que BigQuery ou Cloud Storage pour l'E/S, vous devrez peut-être définir certaines options de projet Google Cloud et d'identification. Dans ce cas, vous devez utiliser GcpOptions.setProject pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GcpOptions pour plus d'informations.

Python

Vous pouvez trouver les valeurs par défaut de la classe Python PipelineOptions dans la documentation de référence de l'API Python. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.

Si votre pipeline utilise des services Google Cloud tels que BigQuery ou Cloud Storage pour l'E/S, vous devrez peut-être définir certaines options de projet Google Cloud et d'identification. Dans ce cas, vous devez utiliser options.view_as(GoogleCloudOptions).project pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GoogleCloudOptions pour plus d'informations.

Java : SDK 1.x