Cette page explique comment définir des options de pipeline pour vos tâches Dataflow. Ces options de pipeline permettent de configurer l'emplacement et les paramètres d'exécution du pipeline, ainsi que les ressources qu'il utilise.
L'exécution du pipeline est distincte de l'exécution de votre programme Apache Beam. Le programme Apache Beam que vous avez écrit construit un pipeline pour une exécution différée. Cela signifie que le programme génère une série d'étapes que tout exécuteur Apache Beam compatible peut exécuter. Les exécuteurs compatibles incluent l'exécuteur Dataflow sur Google Cloud et l'exécuteur direct qui exécute le pipeline directement dans un environnement local.
Vous pouvez transmettre des paramètres dans un job Dataflow au moment de l'exécution. Pour en savoir plus sur la définition des options de pipeline lors de l'exécution, consultez Configurer les options de pipeline.
Utiliser des options de pipeline avec les SDK Apache Beam
Vous pouvez utiliser les SDK suivants pour définir les options de pipeline des tâches Dataflow :
- SDK Apache Beam pour Python
- SDK Apache Beam pour Java
- SDK Apache Beam pour Go
Pour utiliser les SDK, vous devez définir l'exécuteur de pipeline et d'autres paramètres d'exécution à l'aide de la classe PipelineOptions
du SDK Apache Beam.
Il existe deux méthodes pour spécifier les options de pipeline :
- Définissez les options de pipeline par programmation en fournissant une liste d'options de pipeline.
- Définissez les options de pipeline directement sur la ligne de commande lorsque vous exécutez le code de votre pipeline.
Définir les options de pipeline de manière automatisée
Vous pouvez définir les options de pipeline de manière automatisée en créant et en modifiant un objet PipelineOptions
.
Java
Créez un objet PipelineOptions
à l'aide de la méthode PipelineOptionsFactory.fromArgs
.
Pour obtenir un exemple, consultez la section Lancer sur Cloud Dataflow de cette page.
Python
Créer un objet PipelineOptions
.
Pour obtenir un exemple, consultez la section Lancer sur Cloud Dataflow de cette page.
Go
La définition automatisée des options de pipeline par PipelineOptions
n'est pas disponible dans le SDK Apache Beam pour Go. Utilisez des arguments de ligne de commande Go.
Pour obtenir un exemple, consultez la section Lancer sur Cloud Dataflow de cette page.
Définir les options de pipeline sur la ligne de commande
Vous pouvez définir les options de pipeline à l'aide d'arguments de ligne de commande.
Java
L'exemple de syntaxe suivant provient du pipeline WordCount
du guide de démarrage rapide pour Java.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projet Google CloudBUCKET_NAME
: nom de votre bucket Cloud StorageREGION
: région Dataflow,us-central1
Python
L'exemple de syntaxe suivant provient du pipeline WordCount
du guide de démarrage rapide pour Python.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Remplacez les éléments suivants :
DATAFLOW_REGION
: région dans laquelle vous souhaitez déployer le job Dataflow (par exemple,europe-west1
)L'option
--region
remplace la région par défaut définie dans le serveur de métadonnées, sur votre client local ou dans les variables d'environnement.STORAGE_BUCKET
: nom du bucket Cloud StoragePROJECT_ID
: ID de projet Google Cloud
Go
L'exemple de syntaxe suivant provient du pipeline WordCount
du guide de démarrage rapide pour Go.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Remplacez les éléments suivants :
BUCKET_NAME
: nom du bucket Cloud StoragePROJECT_ID
: ID de projet Google CloudDATAFLOW_REGION
: région dans laquelle vous souhaitez déployer le job Dataflow. Par exemple,europe-west1
. L'option--region
remplace la région par défaut définie dans le serveur de métadonnées, sur votre client local ou dans les variables d'environnement.
Définir les options de pipeline expérimentales
Dans les SDK Java, Python et Go, l'option de pipeline experiments
active les fonctionnalités expérimentales ou pré-DG de Dataflow.
Définir de manière automatisée
Pour définir l'option experiments
de manière automatisée, utilisez la syntaxe suivante.
Java
Dans votre objet PipelineOptions
, incluez l'option experiments
à l'aide de la syntaxe suivante.
Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.
options.setExperiments("streaming_boot_disk_size_gb=80")
Pour obtenir un exemple illustrant la création de l'objet PipelineOptions
, consultez la section Lancer sur Cloud Dataflow de cette page.
Python
Dans votre objet PipelineOptions
, incluez l'option experiments
à l'aide de la syntaxe suivante.
Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
Pour obtenir un exemple illustrant la création de l'objet PipelineOptions
, consultez la section Lancer sur Cloud Dataflow de cette page.
Go
La définition automatisée des options de pipeline par PipelineOptions
n'est pas disponible dans le SDK Apache Beam pour Go. Utilisez des arguments de ligne de commande Go.
Définir sur la ligne de commande
Pour définir l'option experiments
sur la ligne de commande, utilisez la syntaxe suivante.
Java
Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.
--experiments=streaming_boot_disk_size_gb=80
Python
Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.
--experiments=streaming_boot_disk_size_gb=80
Go
Cet exemple définit la taille du disque de démarrage sur 80 Go avec l'option de test.
--experiments=streaming_boot_disk_size_gb=80
Définir dans un modèle
Pour activer une fonctionnalité expérimentale lors de l'exécution d'une application Dataflow, utilisez l'option --additional-experiments
.
Modèle classique
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
modèle Flex
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Accéder à l'objet d'options de pipeline
Lorsque vous créez l'objet Pipeline
dans votre programme Apache Beam, transmettez PipelineOptions
. Lorsque le service Dataflow exécute le pipeline, il envoie une copie des options de pipeline PipelineOptions
à chaque nœud de calcul.
Java
Accédez à PipelineOptions
dans une instance DoFn
de n'importe quelle transformation ParDo
à l'aide de la méthode ProcessContext.getPipelineOptions
.
Python
Cette fonctionnalité n'est pas disponible dans le SDK Apache Beam pour Python.
Go
Accédez aux options de pipeline à l'aide de beam.PipelineOptions
.
Lancer sur Dataflow
Exécutez votre tâche sur des ressources Google Cloud gérées à l'aide du service d'exécution Dataflow. L'exécution de votre pipeline avec Dataflow crée une tâche Dataflow, qui utilise des ressources Compute Engine et Cloud Storage dans votre projet Google Cloud. Pour en savoir plus sur les autorisations Dataflow, consultez la page Sécurité et autorisations Dataflow.
Les jobs Dataflow utilisent Cloud Storage pour stocker des fichiers temporaires lors de l'exécution du pipeline. Pour éviter de payer des frais de stockage inutiles, désactivez la fonctionnalité de suppression temporaire sur les buckets utilisés par vos jobs Dataflow pour le stockage temporaire. Pour en savoir plus, consultez la section Supprimer une règle de suppression réversible d'un bucket.
Définir les options requises
Pour exécuter votre pipeline à l'aide de Dataflow, définissez les options de pipeline suivantes :
Java
project
: ID de votre projet Google Cloud.runner
: exécuteur de pipeline qui exécute le pipeline. Pour l'exécution dans Google Cloud, vous devez utiliserDataflowRunner
.gcpTempLocation
: chemin d'accès Cloud Storage permettant à Dataflow de préparer la plupart des fichiers temporaires. Si vous souhaitez spécifier un bucket, vous devez le créer à l'avance. Si vous ne définissez pasgcpTempLocation
, vous pouvez définir l'option de pipelinetempLocation
.gcpTempLocation
est alors défini sur la valeur detempLocation
. Si aucun paramètre n'est spécifié, ungcpTempLocation
par défaut est créé.stagingLocation
: chemin d'accès Cloud Storage dans lequel Dataflow prépare les fichiers binaires. Si vous utilisez le SDK Apache Beam 2.28 ou version ultérieure, ne configurez pas cette option. Pour le SDK Apache Beam 2.28 ou version antérieure, si vous ne définissez pas cette option, ce que vous avez spécifié pourtempLocation
est utilisé pour l'emplacement de préproduction.Une valeur
gcpTempLocation
par défaut est créée si ni cette valeur, nitempLocation
ne sont spécifiés. Si la valeurtempLocation
est spécifiée, mais quegcpTempLocation
ne l'est pas,tempLocation
doit être un chemin Cloud Storage qui est utilisé comme valeur par défaut pourgcpTempLocation
. Si la valeurtempLocation
n'est pas spécifiée, mais quegcpTempLocation
l'est,tempLocation
n'est pas renseigné.
Python
project
: ID de votre projet Google Cloud.region
: région de votre job Dataflow.runner
: exécuteur de pipeline qui exécute le pipeline. Pour l'exécution dans Google Cloud, vous devez utiliserDataflowRunner
.temp_location
: chemin d'accès à Cloud Storage dans lequel Dataflow prépare les fichiers de tâches temporaires créés lors de l'exécution du pipeline.
Go
project
: ID de votre projet Google Cloud.region
: région de votre job Dataflow.runner
: exécuteur de pipeline qui exécute le pipeline. Pour l'exécution dans Google Cloud, vous devez utiliserdataflow
.staging_location
: chemin d'accès à Cloud Storage dans lequel Dataflow prépare les fichiers de tâches temporaires créés lors de l'exécution du pipeline.
Définir les options de pipeline de manière automatisée
L'exemple de code suivant montre comment construire un pipeline de manière automatisée en définissant l'exécuteur et les autres options requises pour exécuter le pipeline à l'aide de Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
Le SDK Apache Beam pour Go utilise des arguments de ligne de commande Go. Utilisez flag.Set()
pour définir les valeurs des options.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.
Utiliser les options de pipeline depuis la ligne de commande
L'exemple suivant montre comment utiliser les options de pipeline spécifiées sur la ligne de commande. Cet exemple ne définit pas les options de pipeline de manière automatisée.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Utilisez le module argparse de Python pour analyser les options de ligne de commande.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
Utilisez le package Go flag
pour analyser les options de ligne de commande. Vous devez analyser les options avant d'appeler beam.Init()
. Dans cet exemple, output
est une option de ligne de commande.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
Après avoir construit le pipeline, spécifiez toutes les lectures, transformations et écritures associées, puis exécutez le pipeline.
Contrôler les modes d'exécution
Lorsqu'un programme Apache Beam exécute un pipeline sur un service tel que Dataflow, il peut soit exécuter le pipeline de manière asynchrone, soit le bloquer jusqu'à ce qu'il soit terminé. Vous pouvez modifier ce comportement en suivant les instructions ci-dessous.
Java
Lorsqu'un programme Java Apache Beam exécute un pipeline sur un service tel que Dataflow, il est généralement exécuté de manière asynchrone. Pour exécuter un pipeline et attendre la fin de la tâche, définissez DataflowRunner
en tant qu'exécuteur de pipeline et appelez explicitement pipeline.run().waitUntilFinish()
.
Lorsque vous utilisez DataflowRunner
et appelez waitUntilFinish()
sur l'objet PipelineResult
renvoyé par pipeline.run()
, le pipeline s'exécute sur Google Cloud, mais le code local attend que la tâche cloud se termine et renvoie l'objet DataflowPipelineJob
final. Pendant l'exécution de la tâche, le service Dataflow imprime les mises à jour de l'état de la tâche et les messages de la console.
Python
Lorsqu'un programme Python Apache Beam exécute un pipeline sur un service tel que Dataflow, il est généralement exécuté de manière asynchrone. Pour bloquer l'exécution jusqu'à la fin du pipeline, utilisez la méthode wait_until_finish()
de l'objet PipelineResult
renvoyé par la méthode run()
de l'exécuteur.
Go
Lorsqu'un programme Go Apache Beam exécute un pipeline sur Dataflow, il est synchrone par défaut et se bloque jusqu'à la fin de l'exécution du pipeline. Si vous ne souhaitez pas bloquer le programme, deux options s'offrent à vous :
Démarrez la tâche dans une routine Go.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
Utilisez l'indicateur de ligne de commande
--async
, qui se trouve dans le packagejobopts
.
Pour afficher les détails de l'exécution, surveiller la progression et vérifier l'état d'avancement de la tâche, utilisez l'interface de surveillance de Dataflow ou l'interface de ligne de commande de Dataflow.
Utiliser des sources de diffusion en continu
Java
Si le pipeline lit à partir d'une source de données illimitées (par exemple, Pub/Sub), il s'exécute automatiquement en mode de traitement par flux.
Python
Si le pipeline utilise une source de données illimitée (par exemple, Pub/Sub), vous devez définir l'option streaming
sur "true".
Go
Si le pipeline lit à partir d'une source de données illimitées (par exemple, Pub/Sub), il s'exécute automatiquement en mode de traitement par flux.
Les tâches de traitement par flux utilisent par défaut un type de machine Compute Engine défini sur n1-standard-2
au minimum.
Lancer en local
Au lieu d'exécuter le pipeline sur des ressources cloud gérées, vous pouvez choisir d'exécuter le pipeline localement. L'exécution locale présente certains avantages pour tester, déboguer ou exécuter le pipeline sur de petits ensembles de données. Par exemple, l'exécution locale supprime la dépendance sur le service Dataflow distant et le projet Google Cloud associé.
Si vous optez pour une exécution locale, vous devez exécuter le pipeline avec des ensembles de données suffisamment petits pour tenir dans la mémoire locale. Vous pouvez créer un petit ensemble de données en mémoire à l'aide d'une transformation Create
, ou utiliser une transformation Read
pour travailler sur de petits fichiers locaux ou distants. Une exécution locale constitue généralement un moyen plus rapide et plus facile d'effectuer des tests et des opérations de débogage avec moins de dépendances externes. Cependant, ce type d'exécution est limité par la mémoire disponible dans l'environnement local.
L'exemple de code suivant montre comment construire un pipeline qui s'exécute dans l'environnement local.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
Une fois le pipeline construit, exécutez-le.
Créer des options de pipeline personnalisées
Vous pouvez ajouter vos propres options personnalisées en plus des options de pipeline PipelineOptions
standards. La ligne de commande d'Apache Beam peut également analyser des options personnalisées à l'aide d'arguments de ligne de commande spécifiés au même format.
Java
Pour ajouter vos propres options, définissez une interface avec les méthodes "getter" et "setter" pour chaque option, comme dans l'exemple suivant :
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
Pour ajouter vos propres options, utilisez la méthode add_argument()
(qui se comporte exactement comme le module argparse standard de Python), comme dans l'exemple suivant :
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
Pour ajouter vos propres options, utilisez le package d'options Go, comme illustré dans l'exemple suivant :
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
Vous pouvez également spécifier une description, qui apparaît lorsqu'un utilisateur transmet --help
en tant qu'argument de ligne de commande, ainsi qu'une valeur par défaut.
Java
Vous définissez la description et la valeur par défaut à l'aide d'annotations, comme suit :
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Nous vous recommandons d'enregistrer votre interface avec PipelineOptionsFactory
, puis de la transmettre lors de la création de l'objet PipelineOptions
. Lorsque vous enregistrez l'interface avec la fonction PipelineOptionsFactory
, la commande --help
vous permet de rechercher l'interface d'options personnalisées et de l'ajouter à la sortie de la commande --help
. PipelineOptionsFactory
vérifie que les options personnalisées sont compatibles avec toutes les autres options enregistrées.
L'exemple de code suivant montre comment enregistrer l'interface d'options personnalisées avec la fonction PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Le pipeline peut maintenant accepter --myCustomOption=value
en tant qu'argument de ligne de commande.
Python
Vous définissez la description et la valeur par défaut comme suit :
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
Vous définissez la description et la valeur par défaut comme suit :
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)