Une fois que le programme Apache Beam a construit un pipeline, vous devez l'exécuter. L'exécution du pipeline est indépendante de l'exécution du programme Apache Beam. Le programme Apache Beam construit le pipeline et le code que vous avez écrit génère une série d'étapes, qui devront être exécutées par un exécuteur de pipeline. Celui-ci peut correspondre au service géré Dataflow sur Google Cloud, à un service d'exécution tiers ou un exécuteur de pipeline local qui lance les tâches directement dans votre environnement local.
Vous pouvez spécifier l'exécuteur de pipeline et d'autres options d'exécution à l'aide de la classe PipelineOptions
du SDK Apache Beam. La classe PipelineOptions
permet de configurer l'emplacement et les options d'exécution du pipeline, ainsi que les ressources qu'il utilise.
Le plus souvent, vous aurez besoin que votre pipeline s'exécute sur les ressources gérées de Google Cloud à l'aide du service d'exécution Dataflow. L'exécution de votre pipeline avec le service Dataflow crée une tâche Dataflow. Celle-ci utilise les ressources Compute Engine et Cloud Storage de votre projet Google Cloud.
Vous pouvez également exécuter le pipeline localement. Dans ce cas, les transformations de pipeline sont exécutées sur la même machine que le programme Dataflow. L'exécution locale est utile à des fins de test et de débogage, en particulier si le pipeline peut utiliser des ensembles de données en mémoire plus petits.
Définir l'objet PipelineOptions
Vous transmettez les options de pipeline PipelineOptions
lorsque vous créez l'objet Pipeline
dans le programme Dataflow. Lorsque le service Dataflow exécute le pipeline, il envoie une copie des options de pipeline PipelineOptions
à chaque instance de nœud de calcul.
Java : SDK 2.x
Remarque : Vous pouvez accéder à PipelineOptions
dans une instance DoFn
de l'opération ParDo
en utilisant la méthode ProcessContext.getPipelineOptions
.
Python
Cette fonctionnalité n'est pas encore disponible dans le SDK Apache Beam pour Python.
Java : SDK 1.x
Définir l'objet PipelineOptions à partir d'arguments de ligne de commande
Vous pouvez configurer le pipeline en créant un objet PipelineOptions
et en définissant les champs directement. Cependant, les SDK Apache Beam incluent un analyseur de ligne de commande que vous pouvez utiliser pour définir des champs dans l'objet PipelineOptions
à l'aide d'arguments de ligne de commande.
Java : SDK 2.x
Pour lire les options à partir de la ligne de commande, construisez l'objet PipelineOptions
à l'aide de la méthode PipelineOptionsFactory.fromArgs
, comme dans l'exemple de code suivant :
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Remarque : L'ajout de la méthode .withValidation
permet à Dataflow de vérifier la présence des arguments de ligne de commande requis et de valider les valeurs des arguments.
L'utilisation de PipelineOptionsFactory.fromArgs
permet d'interpréter les arguments de ligne de commande au format suivant :
--<option>=<value>
En construisant l'objet PipelineOptions
de la sorte, vous pouvez spécifier les options de n'importe quelle sous-interface de org.apache.beam.sdk.options.PipelineOptions en tant qu'argument de ligne de commande.
Python
Pour lire les options à partir de la ligne de commande, créez l'objet PipelineOptions
, comme dans l'exemple de code suivant :
from apache_beam.options.pipeline_options import PipelineOptions options = PipelineOptions(flags=argv)
L'argument (flags=argv)
de PipelineOptions
permet d'interpréter les arguments de ligne de commande au format suivant :
--<option>=<value>
En construisant l'objet PipelineOptions
de la sorte, vous pouvez spécifier les options en définissant des sous-classes de la classe PipelineOptions
.
Java : SDK 1.x
Créer des options personnalisées
Vous pouvez ajouter vos propres options personnalisées en plus des options de pipeline PipelineOptions
standards.
L'analyseur de ligne de commande de Dataflow peut également définir vos options personnalisées à l'aide d'arguments de ligne de commande spécifiés au même format.
Java : SDK 2.x
Pour ajouter vos propres options, définissez une interface avec les méthodes "getter" et "setter" pour chaque option, comme dans l'exemple suivant :
public interface MyOptions extends PipelineOptions { String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
Python
Pour ajouter vos propres options, utilisez la méthode add_argument()
(qui se comporte exactement comme le module argparse standard de Python), comme dans l'exemple suivant :
from apache_beam.options.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--input') parser.add_argument('--output')
Java : SDK 1.x
Vous pouvez également spécifier une description, qui apparaît lorsqu'un utilisateur transmet --help
en tant qu'argument de ligne de commande, ainsi qu'une valeur par défaut.
Java : SDK 2.x
Vous définissez la description et la valeur par défaut à l'aide d'annotations, comme suit :
public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
Nous vous recommandons d'enregistrer votre interface avec PipelineOptionsFactory
, puis de la transmettre lors de la création de l'objet PipelineOptions
. Lorsque vous enregistrez l'interface avec la fonction PipelineOptionsFactory
, la commande --help
vous permet de rechercher l'interface d'options personnalisées et de l'ajouter à la sortie de la commande --help
.
La fonction PipelineOptionsFactory
vérifie également que les options personnalisées sont compatibles avec toutes les autres options enregistrées.
L'exemple de code suivant montre comment enregistrer l'interface d'options personnalisées avec la fonction PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(MyOptions.class);
Le pipeline peut maintenant accepter --myCustomOption=value
en tant qu'argument de ligne de commande.
Python
Vous définissez la description et la valeur par défaut comme suit :
from apache_beam.options.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', help='Input for the pipeline', default='gs://my-bucket/input') parser.add_argument( '--output', help='Output for the pipeline', default='gs://my-bucket/output')
Java : SDK 1.x
Configurer PipelineOptions pour exécution sur le service Cloud Dataflow
Pour exécuter votre pipeline à l'aide du service géré Dataflow, vous devez définir les champs suivants dans PipelineOptions
:
Java : SDK 2.x
project
est l'ID du projet Google Cloud.runner
: exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliserDataflowRunner
.gcpTempLocation
: chemin d'accès à Cloud Storage, dans lequel Dataflow prépare tous les fichiers temporaires. Vous devez créer ce bucket avant d'exécuter le pipeline. Si vous ne spécifiez pas le chemingcpTempLocation
, vous pouvez spécifier l'option de pipelinetempLocation
; le chemingcpTempLocation
sera ensuite défini sur la valeurtempLocation
. Si aucune option n'est spécifiée, un chemingcpTempLocation
par défaut est créé.stagingLocation
: bucket Cloud Storage dans lequel Dataflow prépare les fichiers binaires. Si vous ne définissez pas cette option, la valeur spécifiée pourtempLocation
est également utilisée pour l'emplacement de préproduction.
Un champ
gcpTempLocation
est créé si ni ce champ, ni le champ tempLocation
ne sont spécifiés. Si le champ tempLocation
est spécifié, mais que gcpTempLocation
ne l'est pas, tempLocation
doit être un chemin Cloud Storage qui sera utilisé comme valeur par défaut pour gcpTempLocation
. Si le champ tempLocation
n'est pas spécifié, mais que gcpTempLocation
l'est, tempLocation
n'est pas renseigné.
Remarque : Si vous utilisez le SDK Apache Beam pour Java en version 2.15.0 ou ultérieure, vous devez également spécifier region
.
Python
project
est l'ID du projet Google Cloud.runner
: exécuteur de pipeline qui analyse le programme et construit le pipeline. Pour l'exécution dans le cloud, vous devez utiliserDataflowRunner
.staging_location
: chemin d'accès à Cloud Storage dans lequel Dataflow prépare les packages de code requis par les nœuds de calcul exécutant la tâche.temp_location
: chemin d'accès à Cloud Storage dans lequel Dataflow prépare les fichiers de tâches temporaires créés lors de l'exécution du pipeline.
Remarque : Si vous utilisez le SDK Apache Beam pour Python en version 2.15.0 ou ultérieure, vous devez également spécifier region
.
Java : SDK 1.x
Vous pouvez définir ces options de façon automatisée ou les spécifier à l'aide de la ligne de commande. L'exemple de code suivant montre comment construire un pipeline en programmant les paramètres de l'exécuteur et les autres options nécessaires pour exécuter le pipeline à l'aide du service géré Dataflow.
Java : SDK 2.x
// Create and set your PipelineOptions. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); // For Cloud execution, set the Cloud Platform project, staging location, // and specify DataflowRunner. options.setProject("my-project-id"); options.setStagingLocation("gs://my-bucket/binaries"); options.setRunner(DataflowRunner.class); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # Create and set your PipelineOptions. # For Cloud execution, specify DataflowRunner and set the Cloud Platform # project, job name, temporary files location, and region. # For more information about regions, check: # https://cloud.google.com/dataflow/docs/concepts/regional-endpoints options = PipelineOptions( flags=argv, runner='DataflowRunner', project='my-project-id', job_name='unique-job-name', temp_location='gs://my-bucket/temp', region='us-central1') # Create the Pipeline with the specified options. # with beam.Pipeline(options=options) as pipeline: # pass # build your pipeline here.
Java : SDK 1.x
Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.
L'exemple suivant montre comment définir les options nécessaires pour exécuter le service Dataflow à l'aide de la ligne de commande :
Java : SDK 2.x
// Create and set your PipelineOptions. MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
# Use Python argparse module to parse custom arguments import argparse import apache_beam as beam parser = argparse.ArgumentParser() parser.add_argument('--input') parser.add_argument('--output') args, beam_args = parser.parse_known_args(argv) # Create the Pipeline with remaining arguments. with beam.Pipeline(argv=beam_args) as pipeline: lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input) lines | 'Write files' >> beam.io.WriteToText(args.output)
Java : SDK 1.x
Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.
Java : SDK 2.x
Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project
, --runner
, --gcpTempLocation
et, éventuellement, --stagingLocation
.
Python
Lorsque vous transmettez les options requises sur la ligne de commande, utilisez les options --project
, --runner
et --staging_location
.
Java : SDK 1.x
Exécution asynchrone
Java : SDK 2.x
L'utilisation de DataflowRunner
entraîne l'exécution du pipeline de manière asynchrone sur le cloud de Google. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.
Python
L'utilisation de DataflowRunner
entraîne l'exécution du pipeline de manière asynchrone sur le cloud de Google. Pendant que le pipeline s'exécute, vous pouvez surveiller la progression de la tâche, afficher les détails de l'exécution et recevoir les mises à jour des résultats du pipeline via l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.
Java : SDK 1.x
Exécution synchrone
Java : SDK 2.x
Spécifiez DataflowRunner
en tant qu'exécuteur de pipeline et appelez explicitement pipeline.run().waitUntilFinish()
.
Lorsque vous utilisez DataflowRunner
et appelez waitUntilFinish()
sur l'objet PipelineResult
renvoyé par pipeline.run()
, le pipeline s'exécute sur le cloud, mais le code local attend la fin de la tâche cloud et renvoie la valeur finale DataflowPipelineJob
.
Pendant l'exécution de la tâche, le service Dataflow imprime les mises à jour de l'état de la tâche et les messages de la console.
Si vous utilisez le SDK 1.x de Java, vous pouvez saisir --runner
BlockingDataflowPipelineRunner
dans la ligne de commande pour indiquer au programme principal de se bloquer jusqu'à la fin du pipeline. Mais avec Java 2.x, votre programme principal doit appeler explicitement waitUntilFinish()
.
Python
Pour bloquer l'exécution jusqu'à la fin du pipeline, utilisez la méthode wait_until_finish()
de l'objet PipelineResult
renvoyé par la méthode run()
de l'exécuteur.
Java : SDK 1.x
Remarque : Si vous tapez Ctrl+C
à partir de la ligne de commande, la tâche n'est pas annulée. Le service Dataflow exécute toujours la tâche sur Google Cloud. Pour annuler la tâche, vous devez utiliser l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.
Exécution en mode de traitement par flux
Java : SDK 2.x
Si le pipeline lit à partir d'une source de données illimitées (par exemple, Cloud Pub/Sub), il s'exécute automatiquement en mode de traitement par flux.
Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour les classes PCollections illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey
.
Python
Si le pipeline utilise une source de données ou un récepteur illimités (par exemple, Pub/Sub), vous devez définir l'option streaming
sur "true".
Les tâches de traitement par flux utilisent par défaut un type de machine Compute Engine défini sur n1-standard-2
au minimum. Vous ne devez pas remplacer cette valeur, car n1-standard-2
correspond au type de machine minimal requis pour l'exécution de tâches de traitement par flux.
Si le pipeline utilise des sources de données et des récepteurs illimités, vous devez choisir une stratégie de fenêtrage pour les classes PCollections illimitées avant de pouvoir utiliser une agrégation telle que GroupByKey
.
Java : SDK 1.x
Définir d'autres options de pipeline Cloud Dataflow
Pour exécuter le pipeline sur le cloud, vous pouvez définir de façon automatisée les champs suivants dans l'objet PipelineOptions
:
Java : SDK 2.x
Champ | Type | Description | Valeur par défaut |
---|---|---|---|
runner |
Class (NameOfRunner) |
Le PipelineRunner à utiliser. Ce champ vous permet de déterminer l'exécuteur PipelineRunner au moment de l'exécution. |
DirectRunner (mode local) |
streaming |
boolean |
Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. | Si le pipeline lit à partir d'une source illimitée, la valeur par défaut est définie sur true .
Dans le cas contraire, cette valeur est définie sur false . |
project |
String |
ID du projet Google Cloud. Veuillez compléter ce champ si vous souhaitez exécuter votre pipeline à l'aide du service géré Dataflow. | Si ce champ n'est pas spécifié, la valeur par défaut est définie sur le projet actuellement configuré dans le SDK Cloud. |
jobName |
String |
Nom de la tâche Dataflow en cours d'exécution, tel qu'il apparaît dans la liste des tâches Dataflow et les détails de la tâche. Utilisé également lors de la mise à jour d'un pipeline existant. | Dataflow génère automatiquement un nom unique. |
gcpTempLocation |
String |
Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs:// . |
|
stagingLocation |
String |
Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs:// . |
Si ce champ n'est pas spécifié, la valeur par défaut est définie sur celle que vous avez spécifiée pour tempLocation . |
autoscalingAlgorithm |
String
| Le mode d'autoscaling pour votre tâche Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Dataflow. |
La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Dataflow par lots et pour les tâches de streaming utilisant Streaming Engine.
La valeur par défaut est définie sur NONE pour les tâches de streaming qui n'utilisent pas Streaming Engine. |
numWorkers |
int |
Nombre initial d'instances Google Compute Engine à utiliser lors de l'exécution du pipeline. Cette option détermine le nombre de nœuds de calcul démarrés par le service Dataflow au début de la tâche. | Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul. |
maxNumWorkers |
int |
Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par numWorkers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). |
Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul. |
numberOfWorkerHarnessThreads |
int |
Nombre de threads par faisceau de nœud de calcul. | Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de threads par nœud de calcul. |
region |
String |
Spécifie un point de terminaison régional pour le déploiement de vos tâches Dataflow. | Si ce champ n'est pas défini, la valeur par défaut est us-central1 . |
workerRegion |
String |
Spécifie une région Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter le pipeline. Cette option permet d'exécuter des nœuds de calcul dans un emplacement différent de celui utilisé par Remarque : Cette option ne peut pas être combinée avec |
Si ce champ n'est pas défini, la valeur par défaut est region . |
workerZone |
String |
Spécifie une zone Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter le pipeline. Cette option permet d'exécuter des nœuds de calcul dans un emplacement différent de celui utilisé par Remarque : Cette option ne peut pas être combinée avec |
Si vous spécifiez region ou workerRegion , workerZone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente. |
zone |
String |
(Obsolète) Pour le SDK Apache Beam 2.17.0 ou version antérieure, spécifie la zone Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter votre pipeline. | Si vous spécifiez region , zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente. |
dataflowKmsKey |
String |
Spécifie la clé de chiffrement gérée par le client (CMEK, Customer-Managed Encryption Key) qui permet de chiffrer les données au repos. Vous pouvez contrôler la clé de chiffrement via Cloud KMS.
Vous devez également spécifier le paramètre gcpTempLocation pour utiliser cette fonctionnalité. |
Dans le cas contraire, Dataflow utilise le chiffrement Google Cloud par défaut à la place du chiffrement CMEK. |
flexRSGoal |
String |
Définit la planification flexible des ressources (FlexRS) pour les tâches par lots avec autoscaling. Affecte les paramètres numWorkers , autoscalingAlgorithm ,
zone , region et workerMachineType . Pour plus d'informations, consultez la section traitant des options de pipeline FlexRS. |
Si ce champ n'est pas défini, la valeur par défaut est SPEED_OPTIMIZED, ce qui revient à omettre cet indicateur. Pour activer FlexRS, vous devez spécifier la valeur COST_OPTIMIZED afin de permettre au service Dataflow de choisir les ressources avec remise disponibles. |
filesToStage |
List<String> |
Une liste non vide de fichiers locaux, de répertoires de fichiers ou d'archives (telles que des fichiers JAR ou ZIP) à mettre à la disposition de chaque nœud de calcul. Si vous définissez cette option, seuls les fichiers spécifiés sont importés (le paramètre Java "classpath" est ignoré). Vous devez spécifier toutes les ressources dans l'ordre "classpath" correct. Les ressources ne se limitent pas au code, mais peuvent également inclure des fichiers de configuration et d'autres ressources à mettre à la disposition de tous les nœuds de calcul. Le code peut accéder aux ressources répertoriées à l'aide des méthodes de recherche de ressources standards de Java. Attention : La spécification d'un chemin de répertoire n'est pas recommandée. En effet, Dataflow compresse les fichiers avant l'importation, ce qui implique un coût lié au temps de démarrage plus élevé. De plus, évitez d'utiliser cette option pour transférer des données à des nœuds de calcul censés être traités par le pipeline. Utiliser l'API native Cloud Storage/BigQuery en association avec une source de données Dataflow adaptée est beaucoup plus rapide.
|
Si le champ filesToStage est omis, Dataflow déduit les fichiers à préparer selon le paramètre Java "classpath". Les considérations et précautions mentionnées dans la colonne de gauche s'appliquent également ici (types de fichiers à répertorier et modalités d'accès à ces fichiers depuis votre code).
|
network |
String |
Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. | Si ce champ n'est pas défini, Google Cloud considère que vous allez utiliser un réseau nommé default . |
subnetwork |
String |
Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. | Le service Dataflow détermine la valeur par défaut. |
usePublicIps |
boolean |
Indique si les nœuds de calcul Dataflow utilisent des adresses IP publiques.
Si la valeur est définie sur false , les nœuds de calcul Dataflow utilisent des adresses IP privées pour toutes les communications. Dans ce cas, si l'option subnetwork est spécifiée, l'option network est ignorée. Assurez-vous que pour l'option network ou subnetwork indiquée, l'accès privé à Google est activé. |
Si ce champ n'est pas spécifié, la valeur par défaut est définie sur true et les nœuds de calcul Cloud Dataflow utilisent des adresses IP publiques. |
enableStreamingEngine |
boolean |
Indique si Streaming Engine de Dataflow est activé (valeur "true") ou désactivé. L'activation de Streaming Engine permet d'exécuter les étapes de votre pipeline de streaming dans le backend du service Dataflow, économisant ainsi les ressources de processeur, de mémoire et de stockage sur disque persistant. | La valeur par défaut est false. Cela signifie que les étapes de votre pipeline de streaming sont exécutées entièrement sur des VM de nœuds de calcul. |
createFromSnapshot |
String |
Spécifie l'ID d'instantané à utiliser lors de la création d'une tâche de streaming. Les instantanés enregistrent l'état d'un pipeline de traitement par flux et vous permettent de démarrer une nouvelle version de votre tâche à partir de cet état. Pour en savoir plus sur les instantanés, consultez la page Utiliser des instantanés. | Si ce champ n'est pas défini, aucun instantané n'est utilisé pour créer une tâche. |
hotKeyLoggingEnabled |
boolean |
Indique qu'une clé d'hôte est détectée dans le pipeline. Elle est affichée dans le projet Cloud Logging de l'utilisateur. | Si ce champ n'est pas défini, seule la présence d'une clé d'hôte est enregistrée. |
diskSizeGb |
int |
Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. Si ce champ est défini, spécifiez au moins 30 Go pour tenir compte de l'image de démarrage du nœud de calcul et des journaux locaux. Pour les tâches par lots utilisant Dataflow Shuffle, cette option définit la taille du disque de démarrage d'une VM de calcul. Pour les tâches par lots qui n'utilisent pas Dataflow Shuffle, cette option définit la taille des disques utilisés pour stocker les données brassées. La taille du disque de démarrage n'est pas affectée.
Pour les tâches traitées par flux à l'aide de Streaming Engine, cette option définit la taille des disques de démarrage. Pour les tâches de streaming qui n'utilisent pas Streaming Engine, cette option définit la taille de chaque disque persistant supplémentaire créé par le service Dataflow. Le disque de démarrage n'est pas affecté.
Vous pouvez définir la taille du disque de démarrage avec l'option |
Définissez la valeur sur La valeur par défaut est définie sur 25 Go pour les tâches par lots qui utilisent Dataflow Shuffle. Sinon, la valeur par défaut est définie sur 250 Go. La valeur par défaut est définie sur 30 Go pour les tâches de streaming qui utilisent Streaming Engine. Sinon, la valeur par défaut est définie sur 400 Go. Avertissement : La réduction de la taille du disque réduit les E/S disponibles pour le brassage de données. Les tâches de brassage de données qui n'utilisent ni Dataflow Shuffle, ni Streaming Engine peuvent entraîner une augmentation du temps d'exécution et du coût des tâches. |
serviceAccount |
String |
Spécifie un compte de service de contrôleur géré par l'utilisateur, au format my-service-account-name@<project-id>.iam.gserviceaccount.com . Pour plus d'informations, consultez la section dédiée au compte de service de contrôleur de la page concernant la sécurité et les autorisations dans Cloud Dataflow.
|
Si ce champ n'est pas défini, les nœuds de calcul utilisent le compte de service Compute Engine de votre projet en tant que compte de service de contrôleur. |
workerDiskType |
String |
Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. |
Le service Dataflow détermine la valeur par défaut. |
workerMachineType |
String |
Type de machine Compute Engine utilisé par Dataflow lors du démarrage des VM de nœud de calcul. Vous pouvez utiliser toutes les familles de types de machines Compute Engine disponibles, ainsi que des types de machines personnalisés. Pour des résultats optimaux, utilisez des types de machine Notez que l'utilisation de Dataflow est facturée en fonction du nombre de processeurs virtuels et de Go de mémoire consommés par les nœuds de calcul. La facturation ne dépend pas de la famille du type de machine. |
Si cette option n'est pas définie, le service Dataflow choisit le type de machine en fonction de la tâche. |
Consultez la documentation de référence de l'API pour Java concernant l'interface PipelineOptions (et ses sous-interfaces) pour obtenir la liste complète des options de configuration du pipeline.
Python
Champ | Type | Description | Valeur par défaut |
---|---|---|---|
runner |
str |
Le PipelineRunner à utiliser. Ce champ peut être défini sur DirectRunner ou DataflowRunner . |
DirectRunner (mode local) |
streaming |
bool |
Indique si le mode de streaming est activé (champ défini sur "true") ou désactivé. | false |
project |
str |
ID du projet Google Cloud. Veuillez compléter ce champ si vous souhaitez exécuter votre pipeline à l'aide du service géré Dataflow. | Si ce champ n'est pas défini, une erreur se produit. |
job_name |
String |
Nom de la tâche Dataflow en cours d'exécution, tel qu'il apparaît dans la liste des tâches Dataflow et les détails de la tâche. | Dataflow génère automatiquement un nom unique. |
temp_location |
str |
Chemin d'accès Cloud Storage pour les fichiers temporaires. Doit être une URL Cloud Storage valide commençant par gs:// . |
Si ce champ n'est pas spécifié, la valeur par défaut est définie sur staging_location . Vous devez spécifier au moins un des éléments temp_location ou staging_location pour exécuter le pipeline sur Google Cloud. |
staging_location |
str |
Chemin d'accès Cloud Storage pour les fichiers de préproduction locaux. Doit être une URL Cloud Storage valide commençant par gs:// . |
Si ce champ n'est pas spécifié, la valeur par défaut est définie sur un répertoire de préproduction dans temp_location . Vous devez spécifier au moins un des éléments temp_location ou staging_location pour exécuter le pipeline sur Google Cloud. |
autoscaling_algorithm |
str
| Le mode d'autoscaling pour votre tâche Dataflow. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver. Consultez la section Fonctions de réglage automatique pour plus d'informations sur le fonctionnement de l'autoscaling dans le service géré Dataflow. |
La valeur par défaut est définie sur THROUGHPUT_BASED pour toutes les tâches Dataflow par lots et pour les tâches de streaming utilisant Streaming Engine.
La valeur par défaut est définie sur NONE pour les tâches de streaming qui n'utilisent pas Streaming Engine. |
num_workers |
int |
Nombre d'instances Compute Engine à utiliser lors de l'exécution du pipeline. | Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul. |
max_num_workers |
int |
Nombre maximal d'instances Compute Engine à mettre à la disposition du pipeline pendant l'exécution. Sachez que cette valeur peut être supérieure au nombre initial de nœuds de calcul (spécifié par num_workers pour permettre à la tâche d'évoluer, automatiquement ou par un autre moyen). |
Si ce champ n'est pas défini, le service Dataflow détermine un nombre approprié de nœuds de calcul. |
number_of_worker_harness_threads |
int |
Nombre de threads par faisceau de nœud de calcul. | Si ce champ n'est pas défini, le service Cloud Dataflow détermine un nombre approprié de threads par nœud de calcul. Pour utiliser ce paramètre, vous devez également utiliser l'option --experiments=use_runner_v2 |
region |
str |
Spécifie un point de terminaison régional pour le déploiement de vos tâches Dataflow. | Si ce champ n'est pas défini, la valeur par défaut est us-central1 . |
worker_region |
String |
Spécifie une région Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter le pipeline. Cette option permet d'exécuter des nœuds de calcul dans un emplacement différent de celui utilisé par Remarque : Cette option ne peut pas être combinée avec |
Si ce champ n'est pas défini, la valeur par défaut est region . |
worker_zone |
String |
Spécifie une zone Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter le pipeline. Cette option permet d'exécuter des nœuds de calcul dans un emplacement différent de celui utilisé par Remarque : Cette option ne peut pas être combinée avec |
Si vous spécifiez region ou worker_region , worker_zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente. |
zone |
str |
(Obsolète) Pour le SDK Apache Beam 2.17.0 ou version antérieure, spécifie la zone Compute Engine permettant de lancer des instances de nœuds de calcul pour exécuter votre pipeline. | Si vous spécifiez region , zone est défini par défaut sur une zone de la région correspondante. Vous pouvez ignorer ce comportement en spécifiant une zone différente. |
dataflow_kms_key |
str |
Spécifie la clé de chiffrement gérée par le client (CMEK, Customer-Managed Encryption Key) qui permet de chiffrer les données au repos. Vous pouvez contrôler la clé de chiffrement via Cloud KMS.
Vous devez également spécifier le paramètre temp_location pour utiliser cette fonctionnalité. |
Dans le cas contraire, Dataflow utilise le chiffrement Google Cloud par défaut à la place du chiffrement CMEK. |
flexrs_goal |
str |
Définit la planification flexible des ressources (FlexRS) pour les tâches par lots avec autoscaling. Affecte les paramètres num_workers , autoscaling_algorithm , zone , region et machine_type . Pour plus d'informations, consultez la section traitant des options de pipeline FlexRS. |
Si ce champ n'est pas défini, la valeur par défaut est SPEED_OPTIMIZED, ce qui revient à omettre cet indicateur. Pour activer FlexRS, vous devez spécifier la valeur COST_OPTIMIZED afin de permettre au service Dataflow de choisir les ressources avec remise disponibles. |
network |
str |
Réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le réseau. | Si ce champ n'est pas défini, Google Cloud considère que vous allez utiliser un réseau nommé default . |
subnetwork |
str |
Sous-réseau Compute Engine permettant de lancer des instances Compute Engine pour exécuter le pipeline. Découvrez comment spécifier le sous-réseau. | Le service Dataflow détermine la valeur par défaut. |
use_public_ips |
bool |
Indique que les nœuds de calcul Dataflow doivent utiliser des adresses IP publiques.
Si la valeur est définie sur false , les nœuds de calcul Dataflow utilisent des adresses IP privées pour toutes les communications. Dans ce cas, si l'option subnetwork est spécifiée, l'option network est ignorée. Assurez-vous que pour l'option network ou subnetwork indiquée, l'accès privé à Google est activé. Cette option nécessite le SDK Beam pour Python.
Le SDK Dataflow pour Python obsolète n'est plus compatible. |
Si ce champ n'est pas défini, les nœuds de calcul Dataflow utilisent des adresses IP publiques. |
enable_streaming_engine |
bool |
Indique si Streaming Engine de Dataflow est activé (valeur "true") ou désactivé. L'activation de Streaming Engine permet d'exécuter les étapes de votre pipeline de streaming dans le backend du service Dataflow, économisant ainsi les ressources de processeur, de mémoire et de stockage sur disque persistant. | La valeur par défaut est false. Cela signifie que les étapes de votre pipeline de streaming sont exécutées entièrement sur des VM de nœuds de calcul. |
disk_size_gb |
int |
Taille du disque, en gigaoctets, à utiliser sur chaque instance de nœud de calcul Compute Engine distante. Si vous définissez ce paramètre, spécifiez au moins 30 Go pour prendre en compte l'image de démarrage du nœud de calcul et les journaux locaux. Pour les tâches par lots utilisant Dataflow Shuffle, cette option définit la taille du disque de démarrage d'une VM de calcul. Pour les tâches par lots qui n'utilisent pas Dataflow Shuffle, cette option définit la taille des disques utilisés pour stocker les données brassées. La taille du disque de démarrage n'est pas affectée.
Pour les tâches traitées par flux à l'aide de Streaming Engine, cette option définit la taille des disques de démarrage. Pour les tâches de streaming qui n'utilisent pas Streaming Engine, cette option définit la taille de chaque disque persistant supplémentaire créé par le service Dataflow. Le disque de démarrage n'est pas affecté.
Vous pouvez définir la taille du disque de démarrage avec l'option |
Définissez la valeur sur La valeur par défaut est définie sur 25 Go pour les tâches par lots qui utilisent Dataflow Shuffle. Sinon, la valeur par défaut est définie sur 250 Go. La valeur par défaut est définie sur 30 Go pour les tâches de streaming qui utilisent Streaming Engine. Sinon, la valeur par défaut est définie sur 400 Go. Avertissement : La réduction de la taille du disque réduit les E/S disponibles pour le brassage de données. Les tâches de brassage de données qui n'utilisent ni Dataflow Shuffle, ni Streaming Engine peuvent entraîner une augmentation du temps d'exécution et du coût des tâches. |
service_account_email |
str |
Spécifie un compte de service de contrôleur géré par l'utilisateur, au format my-service-account-name@<project-id>.iam.gserviceaccount.com . Pour plus d'informations, consultez la section dédiée au compte de service de contrôleur de la page concernant la sécurité et les autorisations dans Cloud Dataflow.
|
Si ce champ n'est pas défini, les nœuds de calcul utilisent le compte de service Compute Engine de votre projet en tant que compte de service de contrôleur. |
worker_disk_type |
str |
Type de disque persistant à utiliser, spécifié par une URL complète de la ressource de type de disque. Par exemple, utilisez compute.googleapis.com/projects//zones//diskTypes/pd-ssd pour spécifier un disque persistant SSD. Pour plus d'informations, consultez la page de référence de l'API Compute Engine pour diskTypes. |
Le service Dataflow détermine la valeur par défaut. |
machine_type |
str |
Type de machine Compute Engine utilisé par Dataflow lors du démarrage des VM de nœud de calcul. Vous pouvez utiliser toutes les familles de types de machines Compute Engine disponibles, ainsi que des types de machines personnalisés. Pour des résultats optimaux, utilisez des types de machine Notez que l'utilisation de Dataflow est facturée en fonction du nombre de processeurs virtuels et de Go de mémoire consommés par les nœuds de calcul. La facturation ne dépend pas de la famille du type de machine. |
Si cette option n'est pas définie, le service Dataflow choisit le type de machine en fonction de la tâche. |
Java : SDK 1.x
Configurer PipelineOptions pour une exécution locale
Au lieu d'exécuter le pipeline sur des ressources cloud gérées, vous pouvez choisir d'exécuter le pipeline localement. L'exécution locale présente certains avantages pour tester, déboguer ou exécuter le pipeline sur de petits ensembles de données. Par exemple, l'exécution locale supprime la dépendance sur le service Dataflow distant et le projet Google Cloud associé.
Si vous optez pour une exécution locale, il est vivement recommandé d'exécuter le pipeline avec des ensembles de données suffisamment petits pour tenir dans la mémoire locale. Vous pouvez créer un petit ensemble de données en mémoire à l'aide d'une transformation Create
, ou utiliser une transformation Read
pour travailler sur de petits fichiers locaux ou distants. Une exécution locale constitue un moyen simple et rapide d'effectuer des tests et des opérations de débogage, avec moins de dépendances externes. Cependant, une exécution locale est limitée par la mémoire disponible dans l'environnement local.
L'exemple de code suivant montre comment construire un pipeline qui s'exécute dans l'environnement local.
Java : SDK 2.x
// Create and set our Pipeline Options. PipelineOptions options = PipelineOptionsFactory.create(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectRunner
est déjà définie par défaut. Cependant, vous devez inclure explicitement DirectRunner
en tant que dépendance, ou l'ajouter au paramètre "classpath".
Python
# Create and set your Pipeline Options. options = PipelineOptions(flags=argv) my_options = options.view_as(MyOptions) with Pipeline(options=options) as pipeline: pass # build your pipeline here.
Remarque : En mode local, il n'est pas nécessaire de définir l'exécuteur, car l'option DirectRunner
est déjà définie par défaut.
Java : SDK 1.x
Une fois le pipeline construit, exécutez-le.
Définir d'autres options de pipeline locales
Lors de l'exécution locale du pipeline, les valeurs par défaut des propriétés de PipelineOptions
sont généralement suffisantes.
Java : SDK 2.x
Vous pouvez trouver les valeurs par défaut de la classe Java PipelineOptions
dans la documentation de référence de l'API Java. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.
Si votre pipeline utilise des services Google cloud tels que BigQuery ou Cloud Storage pour l'E/S, vous devrez peut-être définir certaines options de projet Google Cloud et d'identification. Dans ce cas, vous devez utiliser GcpOptions.setProject
pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GcpOptions pour plus d'informations.
Python
Vous pouvez trouver les valeurs par défaut de la classe Python PipelineOptions
dans la documentation de référence de l'API Python. Consultez la liste associée à la classe PipelineOptions pour plus d'informations.
Si votre pipeline utilise des services Google Cloud tels que BigQuery ou Cloud Storage pour l'E/S, vous devrez peut-être définir certaines options de projet Google Cloud et d'identification. Dans ce cas, vous devez utiliser options.view_as(GoogleCloudOptions).project
pour définir l'ID du projet Google Cloud. Vous devrez peut-être également définir explicitement des identifiants. Consultez la documentation concernant la classe GoogleCloudOptions pour plus d'informations.
Java : SDK 1.x