Spécifier les paramètres d'exécution du pipeline

Une fois que le programme Apache Beam a construit un pipeline, vous devez l'exécuter. L'exécution du pipeline est indépendante de l'exécution du programme Apache Beam. Le programme Apache Beam construit le pipeline et le code que vous avez écrit génère une série d'étapes, qui devront être exécutées par un exécuteur de pipeline. Celui-ci peut correspondre au service géré Dataflow sur Google Cloud, à un service d'exécution tiers ou un exécuteur de pipeline local qui lance les tâches directement dans votre environnement local.

Vous pouvez spécifier l'exécuteur de pipeline et d'autres options d'exécution à l'aide de la classe PipelineOptions du SDK Apache Beam. La classe PipelineOptions permet de configurer l'emplacement et les options d'exécution du pipeline, ainsi que les ressources qu'il utilise.

Le plus souvent, vous aurez besoin que votre pipeline s'exécute sur les ressources gérées de Google Cloud à l'aide du service d'exécution Dataflow. L'exécution de votre pipeline avec le service Dataflow crée une tâche Dataflow. Celle-ci utilise les ressources Compute Engine et Cloud Storage de votre projet Google Cloud.

Vous pouvez également exécuter le pipeline localement. Dans ce cas, les transformations de pipeline sont exécutées sur la même machine que le programme Dataflow. L'exécution locale est utile à des fins de test et de débogage, en particulier si le pipeline peut utiliser des ensembles de données en mémoire plus petits.

Définir l'objet PipelineOptions

Vous transmettez les options de pipeline PipelineOptions lorsque vous créez l'objet Pipeline dans le programme Dataflow. Lorsque le service Dataflow exécute le pipeline, il envoie une copie des options de pipeline PipelineOptions à chaque instance de nœud de calcul.

Java : SDK 2.x

Remarque : Vous pouvez accéder à PipelineOptions dans une instance DoFn de l'opération ParDo en utilisant la méthode ProcessContext.getPipelineOptions.

Python

Cette fonctionnalité n'est pas encore disponible dans le SDK Apache Beam pour Python.

Java : SDK 1.x

Définir l'objet PipelineOptions à partir d'arguments de ligne de commande

Vous pouvez configurer le pipeline en créant un objet PipelineOptions et en définissant les champs directement. Cependant, les SDK Apache Beam incluent un analyseur de ligne de commande que vous pouvez utiliser pour définir des champs dans l'objet PipelineOptions à l'aide d'arguments de ligne de commande.

Java : SDK 2.x

Pour lire les options à partir de la ligne de commande, construisez l'objet PipelineOptions à l'aide de la méthode PipelineOptionsFactory.fromArgs, comme dans l'exemple de code suivant :

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    

Remarque : L'ajout de la méthode .withValidation permet à Dataflow de vérifier la présence des arguments de ligne de commande requis et de valider les valeurs des arguments.

L'utilisation de PipelineOptionsFactory.fromArgs permet d'interpréter les arguments de ligne de commande au format suivant :

    --<option>=<value>
    

En construisant l'objet PipelineOptions de la sorte, vous pouvez spécifier les options de n'importe quelle sous-interface de org.apache.beam.sdk.options.PipelineOptions en tant qu'argument de ligne de commande.

Python

Pour lire les options à partir de la ligne de commande, créez l'objet PipelineOptions, comme dans l'exemple de code suivant :

    options = PipelineOptions(flags=argv)

    

L'argument (flags=argv) de PipelineOptions permet d'interpréter les arguments de ligne de commande au format suivant :

    --<option>=<value>
    

En construisant l'objet PipelineOptions de la sorte, vous pouvez spécifier les options en définissant des sous-classes de la classe PipelineOptions.

Java : SDK 1.x

Créer des options personnalisées

Vous pouvez ajouter vos propres options personnalisées en plus des options de pipeline PipelineOptions standards. L'analyseur de ligne de commande de Dataflow peut également définir vos options personnalisées à l'aide d'arguments de ligne de commande spécifiés au même format.

Java : SDK 2.x

Pour ajouter vos propres options, définissez une interface avec les méthodes "getter" et "setter" pour chaque option, comme dans l'exemple suivant :

      public interface MyOptions extends PipelineOptions {
        String getMyCustomOption();
        void setMyCustomOption(String myCustomOption);
      }
    

Python

Pour ajouter vos propres options, utilisez la méthode add_argument() (qui se comporte exactement comme le module argparse standard de Python), comme dans l'exemple suivant :

    class MyOptions(PipelineOptions):
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument('--input')
        parser.add_argument('--output')

    

Java : SDK 1.x

Vous pouvez également spécifier une description, qui apparaît lorsqu'un utilisateur transmet --help en tant qu'argument de ligne de commande, ainsi qu'une valeur par défaut.

Java : SDK 2.x

Vous définissez la description et la valeur par défaut à l'aide d'annotations, comme suit :

      public interface MyOptions extends PipelineOptions {
        @Description("My custom command line argument.")
        @Default.String("DEFAULT")
        String getMyCustomOption();
        void setMyCustomOption(String myCustomOption);
      }
    

Nous vous recommandons d'enregistrer votre interface avec PipelineOptionsFactory, puis de la transmettre lors de la création de l'objet PipelineOptions. Lorsque vous enregistrez l'interface avec la fonction PipelineOptionsFactory, la commande --help vous permet de rechercher l'interface d'options personnalisées et de l'ajouter à la sortie de la commande --help. La fonction PipelineOptionsFactory vérifie également que les options personnalisées sont compatibles avec toutes les autres options enregistrées.

L'exemple de code suivant montre comment enregistrer l'interface d'options personnalisées avec la fonction PipelineOptionsFactory :

      PipelineOptionsFactory.register(MyOptions.class);
      MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                                .withValidation()
                                                .as(MyOptions.class);
    

Le pipeline peut maintenant accepter --myCustomOption=value en tant qu'argument de ligne de commande.

Python

Vous définissez la description et la valeur par défaut comme suit :

    class MyOptions(PipelineOptions):
      @classmethod
      def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',
            help='Input for the pipeline',
            default='gs://my-bucket/input')
        parser.add_argument(
            '--output',
            help='Output for the pipeline',
            default='gs://my-bucket/output')

    

Java : SDK 1.x

Configurer PipelineOptions pour exécution sur le service Cloud Dataflow

Pour exécuter votre pipeline à l'aide du service géré Dataflow, vous devez définir les champs suivants dans PipelineOptions :

Java : SDK 2.x

  • project est l'ID du projet Google Cloud.
  • runner : exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliser DataflowRunner.
  • gcpTempLocation : chemin d'accès à Cloud Storage, dans lequel Dataflow prépare tous les fichiers temporaires. Vous devez créer ce bucket avant d'exécuter le pipeline. Si vous ne spécifiez pas le chemin gcpTempLocation, vous pouvez spécifier l'option de pipeline tempLocation ; le chemin gcpTempLocation sera ensuite défini sur la valeur tempLocation. Si aucune option n'est spécifiée, un chemin gcpTempLocation par défaut est créé.
  • stagingLocation : bucket Cloud Storage dans lequel Dataflow prépare les fichiers binaires. Si vous ne définissez pas cette option, la valeur spécifiée pour tempLocation est également utilisée pour l'emplacement de préproduction.
  • Un champ gcpTempLocation est créé si ni ce champ, ni le champ tempLocation ne sont spécifiés. Si le champ tempLocation est spécifié, mais que gcpTempLocation ne l'est pas, tempLocation doit être un chemin Cloud Storage qui sera utilisé comme valeur par défaut pour gcpTempLocation. Si le champ tempLocation n'est pas spécifié, mais que gcpTempLocation l'est, tempLocation n'est pas renseigné.

Remarque : Si vous utilisez le SDK Apache Beam pour Java en version 2.15.0 ou ultérieure, vous devez également spécifier region.

Python

  • job_name : nom de la tâche Dataflow en cours d'exécution.
  • project est l'ID du projet Google Cloud.
  • runner : exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliser DataflowRunner.
  • staging_location : chemin d'accès à Cloud Storage dans lequel Dataflow prépare les packages de code requis par les nœuds de calcul exécutant la tâche.
  • temp_location : chemin d'accès à Cloud Storage dans lequel Dataflow prépare les fichiers de tâches temporaires créés lors de l'exécution du pipeline.

Remarque : Si vous utilisez le SDK Apache Beam pour Python en version 2.15.0 ou ultérieure, vous devez également spécifier region.

Java : SDK 1.x

Vous pouvez définir ces options de façon automatisée ou les spécifier à l'aide de la ligne de commande. L'exemple de code suivant montre comment construire un pipeline en programmant les paramètres de l'exécuteur et les autres options nécessaires pour exécuter le pipeline à l'aide du service géré Dataflow.

Java : SDK 2.x

      // Create and set your PipelineOptions.
      DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

      // For Cloud execution, set the Cloud Platform project, staging location,
      // and specify DataflowRunner.
      options.setProject("my-project-id");
      options.setStagingLocation("gs://my-bucket/binaries");
      options.setRunner(DataflowRunner.class);

      // Create the Pipeline with the specified options.
      Pipeline p = Pipeline.create(options);
    

Python

    # Create and set your PipelineOptions.
    options = PipelineOptions(flags=argv)

    # For Cloud execution, set the Cloud Platform project, job_name,
    # staging location, temp_location and specify DataflowRunner.
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = 'my-project-id'
    google_cloud_options.job_name = 'myjob'
    google_cloud_options.staging_location = 'gs://my-bucket/binaries'
    google_cloud_options.temp_location = 'gs://my-bucket/temp'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    # Create the Pipeline with the specified options.
    p = Pipeline(options=options)
    

Java : SDK 1.x

Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.

L'exemple suivant montre comment définir les options nécessaires pour exécuter le service Dataflow à l'aide de la ligne de commande :

Java : SDK 2.x

      // Create and set your PipelineOptions.
      MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

      // Create the Pipeline with the specified options.
      Pipeline p = Pipeline.create(options);
    

Python

    # Use Python argparse module to parse custom arguments
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('--input')
    parser.add_argument('--output')
    known_args, pipeline_args = parser.parse_known_args(argv)

    # Create the Pipeline with remaining arguments.
    with beam.Pipeline(argv=pipeline_args) as p:
      lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
      lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
    

Java : SDK 1.x

Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.

Java : SDK 2.x

Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project, --runner, --gcpTempLocation et, éventuellement, --stagingLocation.

Python

Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project, --runner et --stagingLocation.

Java : SDK 1.x

Exécution asynchrone

Java : SDK 2.x

L'utilisation de DataflowRunner entraîne l'exécution du pipeline de manière asynchrone sur Google Cloud. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.

Python

L'utilisation de DataflowRunner entraîne l'exécution du pipeline de manière asynchrone sur Google Cloud. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.

Java : SDK 1.x

Bloquer l'exécution

Java : SDK 2.x

Spécifiez DataflowRunner en tant qu'exécuteur de pipeline et appelez explicitement pipeline.run().waitUntilFinish().

Lorsque vous utilisez DataflowRunner et appelez waitUntilFinish() sur l'objet PipelineResult renvoyé par pipeline.run(), le pipeline s'exécute sur le cloud de la même manière que DataflowRunner. Cependant, il attend la fin de la tâche lancée et renvoie l'objet DataflowPipelineJob final. Pendant l'exécution de la tâche, le service Dataflow imprime les mises à jour de l'état de la tâche et les messages de la console.

Si vous utilisez le SDK 1.x de Java, vous pouvez saisir --runner BlockingDataflowPipelineRunner dans la ligne de commande pour indiquer au programme principal de se bloquer jusqu'à la fin du pipeline. Mais avec Java 2.x, votre programme principal doit appeler explicitement waitUntilFinish().

Python

Pour bloquer l'exécution jusqu'à la fin du pipeline, utilisez la méthode wait_until_finish() de l'objet PipelineResult renvoyé par la méthode run() de l'exécuteur.

Java : SDK 1.x

Remarque : Si vous tapez Ctrl+C à partir de la ligne de commande, la tâche n'est pas annulée. Le service Dataflow exécute toujours la tâche sur Google Cloud. Pour annuler la tâche, vous devez utiliser l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.

Exécution en mode de traitement par flux

Java : SDK 2.x

Si le pipeline lit à partir d'une source de données illimitées (par exemple, Pub/Sub), il s'exécute automatiquement en mode de traitement par flux.

Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour les classes PCollection illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey.

Python

Si le pipeline utilise une source de données ou un récepteur illimités (par exemple, Pub/Sub), vous devez définir l'option streaming sur "true".

Les tâches de traitement par flux utilisent par défaut un type de machine Compute Engine défini sur n1-standard-2 au minimum. Vous ne devez pas remplacer cette valeur, car n1-standard-2 correspond au type de machine minimal requis pour l'exécution de tâches de traitement par flux.

Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour vos classes PCollection illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey.

Java : SDK 1.x

Définir d'autres options de pipeline Cloud Dataflow

Pour exécuter le pipeline sur le cloud, vous pouvez définir de façon automatisée les champs suivants dans l'objet PipelineOptions :

Java : SDK 2.x

Champ Type Description Valeur par défaut
runner Class (NameOfRunner) Le PipelineRunner à utiliser. Ce champ vous permet de déterminer l'exécuteur PipelineRunner au moment de l'exécution. DirectRunner (mode local)
streaming boolean Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. Si le pipeline lit à partir d'une source illimitée, la valeur par défaut est définie sur true. Dans le cas contraire, cette valeur est définie sur false.
project String ID du projet Google Cloud. Veuillez compléter ce champ si vous souhaitez exécuter votre pipeline à l'aide du service géré Dataflow. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur le projet actuellement configuré dans le SDK Cloud.
gcpTempLocation String Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs://.
stagingLocation String Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur celle que vous avez spécifiée pour tempLocation.
autoscalingAlgorithm String Le mode d'autoscaling pour votre tâche Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Dataflow. La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Dataflow par lot qui utilisent la version 1.6.0 ou ultérieure du SDK Dataflow pour Java. La valeur par défaut est définie sur NONE pour les tâches de traitement par flux ou par lot utilisant des versions antérieures du SDK Dataflow pour Java.
numWorkers int Nombre initial d'instances Google Compute Engine à utiliser lors de l'exécution du pipeline. Cette option détermine le nombre de nœuds de calcul démarrés par le service Dataflow au début de la tâche. Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul.
maxNumWorkers int Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par numWorkers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul.
region String La spécification d'un point de terminaison régional vous permet de définir une région pour le déploiement des tâches Dataflow. Si ce champ n'est pas défini, la valeur par défaut est us-central1.
zone String Zone Compute Engine pour le lancement d'instances de nœuds de calcul visant à exécuter le pipeline. Si vous avez spécifié le paramètre region, le paramètre zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente.
dataflowKmsKey String Spécifie la clé de chiffrement gérée par le client (CMEK, Customer-Managed Encryption Key) qui permet de chiffrer les données au repos. Vous pouvez contrôler la clé de chiffrement via Cloud KMS. Vous devez également spécifier le paramètre gcpTempLocation pour utiliser cette fonctionnalité. Dans le cas contraire, Dataflow utilise le chiffrement Google Cloud par défaut à la place du chiffrement CMEK.
flexRSGoal String Définit la planification flexible des ressources (FlexRS) pour les tâches par lots avec autoscaling. Affecte les paramètres numWorkers, autoscalingAlgorithm, zone, region et workerMachineType. Pour plus d'informations, consultez la section traitant des options de pipeline FlexRS. Si ce champ n'est pas défini, la valeur par défaut est SPEED_OPTIMIZED, ce qui revient à omettre cet indicateur. Pour activer FlexRS, vous devez spécifier la valeur COST_OPTIMIZED afin de permettre au service Dataflow de choisir les ressources avec remise disponibles.
filesToStage List<String> Liste de fichiers locaux, de répertoires de fichiers ou d'archives (.jar, .zip) à mettre à la disposition de chaque nœud de calcul. Si vous définissez cette option, seuls les fichiers spécifiés sont importés (le paramètre Java classpath est ignoré). Vous devez spécifier toutes les ressources dans l'ordre classpath correct. Les ressources ne se limitent pas au code, mais peuvent également inclure des fichiers de configuration et d'autres ressources à mettre à la disposition de tous les nœuds de calcul. Le code peut accéder aux ressources répertoriées à l'aide des méthodes de recherche de ressources standards de Java. Attention : La spécification d'un chemin de répertoire n'est pas recommandée. En effet, Dataflow compresse les fichiers avant l'importation, ce qui implique un coût lié au temps de démarrage plus élevé. De plus, évitez d'utiliser cette option pour transférer des données à des nœuds de calcul censés être traités par le pipeline. Utiliser l'API native Cloud Storage/BigQuery en association avec une source de données Dataflow adaptée est beaucoup plus rapide. Si le champ filesToStage est vide, Dataflow déduit les fichiers à préparer selon le paramètre Java "classpath". Les considérations et précautions mentionnées dans la colonne de gauche s'appliquent également ici (types de fichiers à répertorier et modalités d'accès à ces fichiers depuis votre code).
network String Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. Si ce champ n'est pas défini, Google Cloud considère que vous allez utiliser un réseau nommé default.
subnetwork String Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. Le service Dataflow détermine la valeur par défaut.
usePublicIps boolean Indique si les nœuds de calcul Dataflow utilisent des adresses IP publiques. Si la valeur est définie sur false, les nœuds de calcul Dataflow utilisent des adresses IP privées pour toutes les communications. Dans ce cas, si l'option subnetwork est spécifiée, l'option network est ignorée. Assurez-vous que pour l'option network ou subnetwork indiquée, l'accès privé à Google est activé. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur true et les nœuds de calcul Dataflow utilisent des adresses IP publiques.
enableStreamingEngine boolean Indique si Streaming Engine de Dataflow est activé (valeur "true") ou désactivé. L'activation de Streaming Engine permet d'exécuter les étapes de votre pipeline de streaming dans le backend du service Dataflow, économisant ainsi les ressources de processeur, de mémoire et de stockage sur disque persistant. La valeur par défaut est false. Cela signifie que les étapes de votre pipeline de traitement par flux sont exécutées entièrement sur des VM de nœuds de calcul.
diskSizeGb int

Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. Si vous définissez ce paramètre, spécifiez au moins 30 Go pour prendre en compte l'image de démarrage du nœud de calcul et les journaux locaux.

Pour les tâches par lots, cette option permet de définir la taille du disque de démarrage de la VM du nœud de calcul.

Cette option définit la taille des disques de démarrage pour les tâches de streaming qui utilisent Streaming Engine. Pour les tâches de streaming qui n'utilisent pas Streaming Engine, cette option définit la taille de chaque disque persistant supplémentaire créé par le service Dataflow. Le disque de démarrage n'est pas affecté. Vous pouvez définir la taille du disque de démarrage avec l'option streaming_boot_disk_size_gb (en cours de test) pour les tâches de streaming qui n'utilisent pas Streaming Engine. Par exemple, spécifiez --experiments=streaming_boot_disk_size_gb=80 pour créer des disques de démarrage de 80 Go.

Définissez la valeur sur 0 pour utiliser la taille par défaut définie dans le projet Cloud Platform.

La valeur par défaut est définie sur 25 Go pour les tâches par lots qui utilisent Dataflow Shuffle. Sinon, la valeur par défaut est définie sur 250 Go.

La valeur par défaut est définie sur 30 Go pour les tâches de streaming qui utilisent Streaming Engine. Sinon, la valeur par défaut est définie sur 400 Go.

Avertissement : La réduction de la taille du disque réduit les E/S disponibles pour le brassage de données. Les tâches de brassage de données qui n'utilisent ni Dataflow Shuffle, ni Streaming Engine peuvent entraîner une augmentation du temps d'exécution et du coût des tâches.

serviceAccount String Spécifie un compte de service de contrôleur géré par l'utilisateur, au format my-service-account-name@<project-id>.iam.gserviceaccount.com. Pour plus d'informations, consultez la section dédiée au compte de service de contrôleur de la page sur la sécurité et les autorisations dans Cloud Dataflow. Si ce champ n'est pas défini, les nœuds de calcul utilisent le compte de service Compute Engine de votre projet en tant que compte de service de contrôleur.
workerDiskType String Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. Le service Dataflow détermine la valeur par défaut.
workerMachineType String

Type de machine Compute Engine utilisé par Dataflow lors du démarrage des VM de nœud de calcul. Vous pouvez utiliser toutes les familles de types de machines Compute Engine disponibles, ainsi que des types de machines personnalisés.

Pour des résultats optimaux, utilisez des types de machine n1. Les types de machines à cœur partagé, tels que les nœuds de calcul des séries f1 et g1, ne sont pas disponibles avec le contrat de niveau de service de Dataflow.

Notez que l'utilisation de Dataflow est facturée en fonction du nombre de processeurs virtuels et de Go de mémoire consommés par les nœuds de calcul. La facturation ne dépend pas de la famille du type de machine.

Si cette option n'est pas définie, le service Dataflow choisit le type de machine en fonction de la tâche.

Consultez la documentation de référence de l'API pour Java concernant l'interface PipelineOptions (et ses sous-interfaces) pour obtenir la liste complète des options de configuration du pipeline.

Python

Champ Type Description Valeur par défaut
runner str Le PipelineRunner à utiliser. Ce champ peut être défini sur DirectRunner ou DataflowRunner. DirectRunner (mode local)
streaming bool Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. false
project str ID du projet Google Cloud. Veuillez compléter ce champ si vous souhaitez exécuter votre pipeline à l'aide du service géré Dataflow. Si ce champ n'est pas défini, une erreur se produit.
temp_location str Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur staging_location. Vous devez spécifier au moins un des éléments temp_location ou staging_location pour exécuter le pipeline sur Google Cloud.
staging_location str Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs://. Si ce champ n'est pas spécifié, la valeur par défaut est définie sur un répertoire de préproduction dans temp_location. Vous devez spécifier au moins un des éléments temp_location ou staging_location pour exécuter le pipeline sur Google Cloud.
autoscaling_algorithm str Le mode d'autoscaling pour votre tâche Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Dataflow. La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Dataflow par lot et sur NONE pour toutes les tâches Dataflow de traitement par flux.
num_workers int Nombre d'instances Compute Engine à utiliser lors de l'exécution du pipeline. Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul.
max_num_workers int Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par num_workers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul.
region str La spécification d'un point de terminaison régional vous permet de définir une région pour le déploiement des tâches Dataflow. Si ce champ n'est pas défini, la valeur par défaut est us-central1.
zone str Zone Compute Engine pour le lancement d'instances de nœuds de calcul visant à exécuter le pipeline. Si vous avez spécifié le paramètre region, le paramètre zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente.
dataflow_kms_key str Spécifie la clé de chiffrement gérée par le client (CMEK, Customer-Managed Encryption Key) qui permet de chiffrer les données au repos. Vous pouvez contrôler la clé de chiffrement via Cloud KMS. Vous devez également spécifier le paramètre temp_location pour utiliser cette fonctionnalité. Dans le cas contraire, Dataflow utilise le chiffrement Google Cloud par défaut à la place du chiffrement CMEK.
flexrs_goal str Définit la planification flexible des ressources (FlexRS) pour les tâches par lots avec autoscaling. Affecte les paramètres num_workers, autoscaling_algorithm, zone, region et machine_type. Pour plus d'informations, consultez la section traitant des options de pipeline FlexRS. Si ce champ n'est pas défini, la valeur par défaut est SPEED_OPTIMIZED, ce qui revient à omettre cet indicateur. Pour activer FlexRS, vous devez spécifier la valeur COST_OPTIMIZED afin de permettre au service Dataflow de choisir les ressources avec remise disponibles.
network str Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. Si ce champ n'est pas défini, Google Cloud considère que vous allez utiliser un réseau nommé default.
subnetwork str Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. Le service Dataflow détermine la valeur par défaut.
no_use_public_ips bool Indique que les nœuds de calcul Dataflow ne doivent pas utiliser d'adresses IP publiques. De ce fait, les nœuds de calcul Dataflow utilisent des adresses IP privées pour toutes les communications. Si l'option subnetwork est spécifiée, l'option network est ignorée. Assurez-vous que pour l'option network ou subnetwork indiquée, l'accès privé à Google est activé. Le paramètre des adresses IP publiques nécessite le SDK Beam pour Python. Ce paramètre n'est pas disponible avec le SDK Dataflow pour Python. Si ce champ n'est pas défini, les nœuds de calcul Dataflow utilisent des adresses IP publiques.
enable_streaming_engine boolean Indique si Streaming Engine de Dataflow est activé (valeur "true") ou désactivé. L'activation de Streaming Engine permet d'exécuter les étapes de votre pipeline de streaming dans le backend du service Dataflow, économisant ainsi les ressources de processeur, de mémoire et de stockage sur disque persistant. La valeur par défaut est false. Cela signifie que les étapes de votre pipeline de traitement par flux sont exécutées entièrement sur des VM de nœuds de calcul.
disk_size_gb int

Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. Si vous définissez ce paramètre, spécifiez au moins 30 Go pour prendre en compte l'image de démarrage du nœud de calcul et les journaux locaux.

Pour les tâches par lots, cette option permet de définir la taille du disque de démarrage de la VM du nœud de calcul.

Cette option définit la taille des disques de démarrage pour les tâches de streaming qui utilisent Streaming Engine. Pour les tâches de streaming qui n'utilisent pas Streaming Engine, cette option définit la taille de chaque disque persistant supplémentaire créé par le service Dataflow. Le disque de démarrage n'est pas affecté. Vous pouvez définir la taille du disque de démarrage avec l'option streaming_boot_disk_size_gb (en cours de test) pour les tâches de streaming qui n'utilisent pas Streaming Engine. Par exemple, spécifiez --experiments=streaming_boot_disk_size_gb=80 pour créer des disques de démarrage de 80 Go.

Définissez la valeur sur 0 pour utiliser la taille par défaut définie dans le projet Cloud Platform.

La valeur par défaut est définie sur 25 Go pour les tâches par lots qui utilisent Dataflow Shuffle. Sinon, la valeur par défaut est définie sur 250 Go.

La valeur par défaut est définie sur 30 Go pour les tâches de streaming qui utilisent Streaming Engine. Sinon, la valeur par défaut est définie sur 400 Go.

Avertissement : La réduction de la taille du disque réduit les E/S disponibles pour le brassage de données. Les tâches de brassage de données qui n'utilisent ni Dataflow Shuffle, ni Streaming Engine peuvent entraîner une augmentation du temps d'exécution et du coût des tâches.

service_account_email str Spécifie un compte de service de contrôleur géré par l'utilisateur, au format my-service-account-name@<project-id>.iam.gserviceaccount.com. Pour plus d'informations, consultez la section dédiée au compte de service de contrôleur de la page sur la sécurité et les autorisations dans Cloud Dataflow. Si ce champ n'est pas défini, les nœuds de calcul utilisent le compte de service Compute Engine de votre projet en tant que compte de service de contrôleur.
worker_disk_type str Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. Le service Dataflow détermine la valeur par défaut.
machine_type str

Type de machine Compute Engine utilisé par Dataflow lors du démarrage des VM de nœud de calcul. Vous pouvez utiliser toutes les familles de types de machines Compute Engine disponibles, ainsi que des types de machines personnalisés.

Pour des résultats optimaux, utilisez des types de machine n1. Les types de machines à cœur partagé, tels que les nœuds de calcul des séries f1 et g1, ne sont pas disponibles avec le contrat de niveau de service de Dataflow.

Notez que l'utilisation de Dataflow est facturée en fonction du nombre de processeurs virtuels et de Go de mémoire consommés par les nœuds de calcul. La facturation ne dépend pas de la famille du type de machine.

Si cette option n'est pas définie, le service Dataflow choisit le type de machine en fonction de la tâche.

Java : SDK 1.x

Configurer PipelineOptions pour une exécution locale

Au lieu d'exécuter le pipeline sur des ressources cloud gérées, vous pouvez choisir d'exécuter le pipeline localement. L'exécution locale présente certains avantages pour tester, déboguer ou exécuter le pipeline sur de petits ensembles de données. Par exemple, l'exécution locale supprime la dépendance sur le service Dataflow distant et le projet Google Cloud associé.

Si vous optez pour une exécution locale, il est vivement recommandé d'exécuter le pipeline avec des ensembles de données suffisamment petits pour tenir dans la mémoire locale. Vous pouvez créer un petit ensemble de données en mémoire à l'aide d'une transformation Create, ou utiliser une transformation Read pour travailler sur de petits fichiers locaux ou distants. Une exécution locale constitue un moyen simple et rapide d'effectuer des tests et des opérations de débogage, avec moins de dépendances externes. Cependant, une exécution locale est limitée par la mémoire disponible dans l'environnement local.

L'exemple de code suivant montre comment construire un pipeline qui s'exécute dans l'environnement local.

Java : SDK 2.x

      // Create and set our Pipeline Options.
      PipelineOptions options = PipelineOptionsFactory.create();

      // Create the Pipeline with the specified options.
      Pipeline p = Pipeline.create(options);
    

Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectRunner est déjà définie par défaut. Cependant, vous devez inclure explicitement DirectRunner en tant que dépendance, ou l'ajouter au paramètre classpath.

Python

    # Create and set your Pipeline Options.
    options = PipelineOptions()
    with Pipeline(options=options) as p:
    

Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectRunner est déjà définie par défaut.

Java : SDK 1.x

Une fois le pipeline construit, exécutez-le.

Définir d'autres options de pipeline locales

Lors de l'exécution locale du pipeline, les valeurs par défaut des propriétés de PipelineOptions sont généralement suffisantes.

Java : SDK 2.x

Vous pouvez trouver les valeurs par défaut de la classe Java PipelineOptions dans la documentation de référence de l'API Java. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.

Si votre pipeline utilise des services Google Cloud tels que BigQuery ou Cloud Storage pour l'E/S, vous devrez peut-être définir certaines options de projet Google Cloud et d'identification. Dans ce cas, vous devez utiliser GcpOptions.setProject pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GcpOptions pour plus d'informations.

Python

Vous pouvez trouver les valeurs par défaut de la classe Python PipelineOptions dans la documentation de référence de l'API Python. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.

Si votre pipeline utilise des services Google Cloud tels que BigQuery ou Cloud Storage pour l'E/S, vous devrez peut-être définir certaines options de projet Google Cloud et d'identification. Dans ce cas, vous devez utiliser options.view_as(GoogleCloudOptions).project pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GoogleCloudOptions pour plus d'informations.

Java : SDK 1.x