Dans ce document, vous allez apprendre à créer un modèle classique personnalisé à partir du code de votre pipeline Dataflow. Les modèles classiques regroupent divers pipelines Dataflow existants pour créer des modèles réutilisables que vous pouvez personnaliser pour chaque tâche en modifiant des paramètres de pipeline spécifiques. Plutôt que d'écrire le modèle, vous allez utiliser une commande pour générer le modèle à partir d'un pipeline existant.
Voici un bref aperçu du processus. Les détails de ce processus sont fournis dans les sections suivantes.
- Dans le code de votre pipeline, utilisez l'interface
ValueProvider
pour toutes les options de pipeline que vous souhaitez définir ou utiliser au moment de l'exécution. Utilisez des objetsDoFn
acceptant des paramètres d'exécution. - Étendez votre modèle avec des métadonnées supplémentaires afin que les paramètres personnalisés soient validés lors de l'exécution du modèle classique. Exemples de métadonnées : nom de votre modèle classique personnalisé et paramètres facultatifs.
- Vérifiez si les connecteurs d'E/S du pipeline acceptent les objets
ValueProvider
et apportez les modifications nécessaires. - Créez et mettez en préproduction le modèle classique personnalisé.
- Exécutez le modèle classique personnalisé.
Pour découvrir les différents types de modèles Dataflow, leurs avantages et savoir quand choisir un modèle classique, consultez la page Modèles Dataflow.
Autorisations requises pour exécuter un modèle classique
Les autorisations dont vous avez besoin pour exécuter le modèle classique Dataflow dépendent de l'emplacement où vous exécutez le modèle, et l'emplacement, dans un autre projet ou non, de la source et du récepteur du pipeline.
Pour en savoir plus sur l'exécution de pipelines Dataflow en local ou à l'aide de Google Cloud, consultez la page Sécurité et autorisations pour Dataflow.
Pour obtenir la liste des rôles et autorisations Dataflow, consultez la page Contrôle des accès Dataflow.
Limites
- L'option de pipeline suivante n'est pas compatible avec les modèles classiques. Si vous devez contrôler le nombre de threads par faisceau de nœud de calcul, utilisez des modèles Flex.
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- L'exécuteur Dataflow n'est pas compatible avec les options
ValueProvider
pour les sujets Pub/Sub et les paramètres d'abonnement. Si vous avez besoin des options Pub/Sub dans vos paramètres d'exécution, utilisez des modèles Flex.
À propos des paramètres d'exécution et de l'interface ValueProvider
L'interface ValueProvider
permet aux pipelines d'accepter les paramètres d'exécution. Apache Beam fournit trois types d'objets ValueProvider
.
Nom | Description |
---|---|
RuntimeValueProvider |
Vous pouvez vérifier si la valeur d'un objet Utilisez |
StaticValueProvider |
Utilisez |
NestedValueProvider |
Utilisez |
Utiliser des paramètres d'exécution dans le code de votre pipeline
Cette section vous explique comment utiliser ValueProvider
, StaticValueProvider
et NestedValueProvider
.
Utiliser ValueProvider
dans vos options de pipeline
Spécifiez ValueProvider
pour toutes les options de pipeline que vous souhaitez définir ou utiliser au moment de l'exécution.
Par exemple, l'extrait de code WordCount
suivant n'est pas compatible avec les paramètres d'exécution. Le code ajoute une option de fichier d'entrée, crée un pipeline et lit les lignes du fichier d'entrée :
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Pour ajouter la compatibilité avec les paramètres d'exécution, modifiez l'option du fichier d'entrée afin d'utiliser ValueProvider
.
Java
Utilisez ValueProvider<String>
au lieu de String
pour le type de l'option de fichier d'entrée.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Remplacez add_argument
par add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Utiliser ValueProvider
dans vos fonctions
Pour utiliser des valeurs de paramètre d'exécution dans vos propres fonctions, mettez à jour les fonctions pour qu'elles puissent exploiter les paramètres ValueProvider
.
L'exemple suivant contient une option ValueProvider
(nombre entier) et une fonction simple effectuant une addition. La fonction dépend de l'entier ValueProvider
. Lors de l'exécution, le pipeline applique MySumFn
à chaque nombre entier d'une PCollection
contenant [1, 2, 3]
. Si la valeur d'exécution est égale à 10, la PCollection
obtenue contient [11, 12, 13]
.
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Utiliser StaticValueProvider
Pour fournir une valeur statique à votre pipeline, utilisez StaticValueProvider
.
Cet exemple exploite MySumFn
, qui est une fonction DoFn
nécessitant un objet ValueProvider<Integer>
. Si vous connaissez la valeur du paramètre à l'avance, vous pouvez spécifier votre valeur statique en tant que StaticValueProvider
à l'aide de ValueProvider
.
Java
Le code ci-dessous récupère la valeur au moment de l'exécution du pipeline :
.apply(ParDo.of(new MySumFn(options.getInt())))
Une autre option consiste à utiliser StaticValueProvider
avec une valeur statique :
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Le code ci-dessous récupère la valeur au moment de l'exécution du pipeline :
beam.ParDo(MySumFn(user_options.templated_int))
Une autre option consiste à utiliser StaticValueProvider
avec une valeur statique :
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Vous pouvez également utiliser StaticValueProvider
lorsque vous mettez en œuvre un module d'E/S compatible avec les paramètres standards et les paramètres d'exécution.
StaticValueProvider
réduit la duplication de code liée à la mise en œuvre de deux méthodes semblables.
Java
Le code source utilisé dans cet exemple provient du fichier d'Apache Beam TextIO.java sur GitHub.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
Dans cet exemple, un constructeur unique accepte un argument string
ou ValueProvider
. Si l'argument est une chaîne string
, il est converti en StaticValueProvider
.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Utiliser NestedStaticValueProvider
Pour calculer une valeur à partir d'un autre objet ValueProvider
, utilisez NestedValueProvider
.
NestedValueProvider
exploite un objet ValueProvider
et un traducteur SerializableFunction
en tant qu'entrée. Lorsque vous appelez .get()
sur un objet NestedValueProvider
, le traducteur crée une valeur basée sur celle de ValueProvider
. Cette traduction vous permet de créer la valeur finale souhaitée à l'aide d'une valeur ValueProvider
:
Dans l'exemple suivant, l'utilisateur fournit le nom de fichier file.txt
. La transformation ajoute le chemin gs://directory_name/
devant le nom du fichier. L'appel .get()
renvoie gs://directory_name/file.txt
.
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Utiliser des métadonnées dans le code de votre pipeline
Vous pouvez étendre votre modèle en ajoutant des métadonnées afin de valider des paramètres personnalisés lors de l'exécution du modèle. Si vous souhaitez créer des métadonnées pour votre modèle, procédez comme suit :
- Créez un fichier au format JSON appelé
TEMPLATE_NAME_metadata
en utilisant les paramètres de la page Paramètres de métadonnées et le format indiqué dans Exemple de fichier de métadonnées. RemplacezTEMPLATE_NAME
par le nom de votre modèle.Vérifiez que le fichier de métadonnées ne comporte pas d'extension de nom de fichier. Par exemple, si le nom de votre modèle est
myTemplate
, son fichier de métadonnées doit êtremyTemplate_metadata
. - Stockez le fichier de métadonnées dans le même dossier Cloud Storage que le modèle.
Paramètres de métadonnées
Clé du paramètre | Obligatoire | Description de la valeur | |
---|---|---|---|
name |
Oui | Nom du modèle. | |
description |
Non | Court paragraphe de texte décrivant le modèle. | |
streaming |
Non | Si la valeur est true , ce modèle est compatible avec le streaming. La valeur par défaut est false . |
|
supportsAtLeastOnce |
Non | Si la valeur est true , ce modèle accepte le traitement de type "au moins une fois". La valeur par défaut est false . Définissez ce paramètre sur true si le modèle est conçu pour fonctionner avec le mode de traitement par flux "au moins une fois".
|
|
supportsExactlyOnce |
Non | Si la valeur est true , ce modèle est compatible avec le traitement de type "exactement une fois". La valeur par défaut est true . |
|
defaultStreamingMode |
Non | Mode de traitement par flux par défaut, pour les modèles compatibles avec les modes "au moins une fois" et "exactement une fois". Utilisez l'une des valeurs suivantes : "AT_LEAST_ONCE" ,
"EXACTLY_ONCE" . Si cette option n'est pas spécifiée, le mode de diffusion par défaut est "exactement une fois".
|
|
parameters |
Non | Tableau de paramètres supplémentaires utilisés par le modèle. Un tableau vide est utilisé par défaut. | |
name |
Oui | Nom du paramètre utilisé dans le modèle. | |
label |
Oui | Chaîne au format lisible utilisée dans la console Google Cloud pour étiqueter le paramètre. | |
helpText |
Oui | Court paragraphe de texte décrivant le paramètre. | |
isOptional |
Non | false si le paramètre est requis et true s'il est facultatif. À moins qu'une valeur ne soit définie, isOptional est défini par défaut sur false .
Si vous n'incluez pas cette clé de paramètre pour vos métadonnées, les métadonnées deviennent un paramètre obligatoire. |
|
regexes |
Non | Tableau d'expressions régulières POSIX-egrep sous forme de chaînes qui permettent de valider la valeur du paramètre. Par exemple : ["^[a-zA-Z][a-zA-Z0-9]+"] est une expression régulière unique qui vérifie que la valeur commence par une lettre, puis comporte un ou plusieurs caractères. Un tableau vide est utilisé par défaut. |
Exemple de fichier de métadonnées
Java
Le service Dataflow utilise les métadonnées suivantes pour valider les paramètres personnalisés du modèle WordCount :
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Le service Dataflow utilise les métadonnées suivantes pour valider les paramètres personnalisés du modèle WordCount :
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Vous pouvez télécharger des fichiers de métadonnées pour les modèles fournis par Google à partir du répertoire de modèles de Dataflow.
Connecteurs d'E/S de pipeline compatibles et ValueProvider
Java
Certains connecteurs d'E/S contiennent des méthodes qui acceptent les objets ValueProvider
. Pour déterminer la compatibilité d'un connecteur et d'une méthode spécifiques, consultez la documentation de référence de l'API pour le connecteur d'E/S. Les méthodes compatibles possèdent une surcharge avec ValueProvider
. Si une méthode ne possède pas de surcharge, elle n'est pas compatible avec les paramètres d'exécution. Les connecteurs d'E/S suivants sont au moins partiellement compatibles avec ValueProvider
:
- E/S orientées fichier :
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(nécessite la version 2.3.0 ou ultérieure du SDK)PubSubIO
SpannerIO
Python
Certains connecteurs d'E/S contiennent des méthodes qui acceptent les objets ValueProvider
. Pour déterminer la compatibilité de connecteurs d'E/S et de leurs méthodes, consultez la documentation de référence de l'API pour le connecteur. Les connecteurs d'E/S suivants acceptent les paramètres d'exécution :
- E/S orientées fichier :
textio
,avroio
,tfrecordio
Créer et préparer un modèle classique
Une fois votre pipeline conçu, vous devez créer et préproduire votre fichier de modèle. Une fois le modèle créé et préproduit, l'emplacement de préproduction contient des fichiers supplémentaires nécessaires à l'exécution du modèle. Si vous supprimez cet emplacement, l'exécution du modèle échoue. La tâche Dataflow ne s'exécute pas immédiatement après la préproduction du modèle. Pour exécuter une tâche Dataflow personnalisée basée sur un modèle, vous pouvez utiliser la console Google Cloud, l'API REST Dataflow ou gcloud CLI.
L'exemple suivant montre comment préparer un fichier de modèle :
Java
Cette commande Maven crée et prépare en préproduction un modèle à l'emplacement Cloud Storage spécifié avec --templateLocation
.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
Vérifiez que le chemin d'accès à templateLocation
est correct. Remplacez l'élément suivant :
com.example.myclass
: votre classe JavaPROJECT_ID
: ID de votre projetBUCKET_NAME
: nom du bucket Cloud StorageTEMPLATE_NAME
: nom du modèle.REGION
: région dans laquelle déployer votre job Dataflow
Python
Cette commande Python crée et prépare en préproduction un modèle à l'emplacement Cloud Storage spécifié avec --template_location
.
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
Vérifiez que le chemin d'accès à template_location
est correct. Remplacez l'élément suivant :
examples.mymodule
: votre module Python.PROJECT_ID
: ID de votre projetBUCKET_NAME
: nom du bucket Cloud StorageTEMPLATE_NAME
: nom du modèle.REGION
: région dans laquelle déployer votre job Dataflow
Une fois votre modèle créé et préproduit, la prochaine étape consiste à l'exécuter.