Créer des modèles

Les modèles Cloud Dataflow exploitent des paramètres d'exécution pour accepter des valeurs qui ne sont disponibles qu'au moment de l'exécution du pipeline. Pour personnaliser l'exécution d'un pipeline modélisé, vous pouvez transmettre ces paramètres aux fonctions qui s'exécutent au sein du pipeline (telles que DoFn).

Si vous souhaitez créer un modèle à partir de votre pipeline Apache Beam, vous devez modifier le code de celui-ci pour qu'il soit compatible avec les paramètres d'exécution :

Ensuite, créez et préproduisez votre modèle.

Paramètres d'exécution et interface ValueProvider

L'interface ValueProvider permet aux pipelines d'accepter les paramètres d'exécution. Apache Beam fournit trois types d'objets ValueProvider.

Nom Description
RuntimeValueProvider

RuntimeValueProvider est le type ValueProvider par défaut. RuntimeValueProvider permet à votre pipeline d'accepter une valeur qui n'est disponible qu'au moment de son exécution. Comme la valeur n'est pas disponible lors de la construction du pipeline, vous ne pouvez pas l'exploiter pour modifier le graphique de workflow de votre pipeline.

Vous pouvez vérifier si la valeur d'un objet ValueProvider est disponible à l'aide de isAccessible(). Si vous appelez get() avant l'exécution du pipeline, Apache Beam renvoie l'erreur suivante :
Value only available at runtime, but accessed from a non-runtime context.

Utilisez RuntimeValueProvider lorsque vous ne connaissez pas la valeur à l'avance.

StaticValueProvider

StaticValueProvider vous permet de fournir une valeur statique à votre pipeline. Comme la valeur est disponible lors de la construction du pipeline, vous pouvez l'exploiter pour modifier le graphique de workflow de votre pipeline.

Utilisez StaticValueProvider lorsque vous connaissez la valeur à l'avance. Consultez la section sur StaticValueProvider pour obtenir des exemples.

NestedValueProvider

NestedValueProvider vous permet de calculer une valeur à partir d'un autre objet ValueProvider. NestedValueProvider encapsule l'objet ValueProvider. Le type de l'objet ValueProvider encapsulé détermine si la valeur est accessible ou non lors de la construction du pipeline.

Utilisez NestedValueProvider lorsque vous souhaitez exploiter la valeur pour calculer une autre valeur au moment de l'exécution du pipeline. Consultez la section sur NestedValueProvider pour obtenir des exemples.

Remarque : Le SDK Apache Beam pour Python n'est pas compatible avec NestedValueProvider.

Modifier le code pour utiliser des paramètres d'exécution

Cette section vous explique comment utiliser ValueProvider, StaticValueProvider et NestedValueProvider.

Utiliser ValueProvider dans les options de pipeline

Spécifiez ValueProvider pour toutes les options de pipeline que vous souhaitez définir ou utiliser au moment de l'exécution.

Par exemple, l'extrait de code WordCount suivant n'est pas compatible avec les paramètres d'exécution. Le code ajoute une option de fichier d'entrée, crée un pipeline et lit les lignes du fichier d'entrée :

Java : SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java : SDK 1.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()));
    ...

Pour ajouter la compatibilité avec les paramètres d'exécution, modifiez l'option du fichier d'entrée afin d'utiliser ValueProvider.

Java : SDK 2.x

Pour le type de l'option de fichier d'entrée, utilisez ValueProvider<String> au lieu de String.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Remplacez add_argument par add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Java : SDK 1.x

Pour le type de l'option de fichier d'entrée, utilisez ValueProvider<String> au lieu de String.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    // Add .withoutValidation() when you use a RuntimeValueProvider with SDK 1.x.
    // The value may not be available at validation time.
    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation());
    ...

Utiliser ValueProvider dans les fonctions

Pour utiliser des valeurs de paramètre d'exécution dans vos propres fonctions, mettez à jour ces dernières pour qu'elles puissent exploiter des paramètres ValueProvider.

L'exemple suivant contient une option ValueProvider (nombre entier) et une fonction simple effectuant une addition. La fonction dépend de l'entier ValueProvider. Lors de l'exécution, le pipeline applique MySumFn à chaque nombre entier d'une PCollection contenant [1, 2, 3]. Si la valeur d'exécution est égale à 10, la PCollection obtenue contient [11, 12, 13].

Java : SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.utils.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Java : SDK 1.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply(MapElements.via(
        new SimpleFunction<Integer, String>() {
          public String apply(Integer i) {
            return i.toString();
          }
        }))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Utiliser StaticValueProvider

Pour fournir une valeur statique à votre pipeline, utilisez StaticValueProvider.

Cet exemple exploite MySumFn, qui est une fonction DoFn nécessitant un objet ValueProvider<Integer>. Si vous connaissez la valeur du paramètre à l'avance, vous pouvez spécifier votre valeur statique en tant que ValueProvider à l'aide de StaticValueProvider.

Java : SDK 2.x

Le code ci-dessous récupère la valeur au moment de l'exécution du pipeline :

  .apply(ParDo.of(new MySumFn(options.getInt())))

Une autre option consiste à utiliser StaticValueProvider avec une valeur statique :

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Le code ci-dessous récupère la valeur au moment de l'exécution du pipeline :

  beam.ParDo(MySumFn(user_options.templated_int))

Une autre option consiste à utiliser StaticValueProvider avec une valeur statique :

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Java : SDK 1.x

Le code ci-dessous récupère la valeur au moment de l'exécution du pipeline :

  .apply(ParDo.of(new MySumFn(options.getInt())))

Une autre option consiste à utiliser StaticValueProvider avec une valeur statique :

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Vous pouvez également utiliser StaticValueProvider lorsque vous mettez en œuvre un module d'E/S compatible avec les paramètres standards et les paramètres d'exécution. StaticValueProvider réduit la duplication de code liée à la mise en œuvre de deux méthodes semblables.

Java : SDK 2.x

Le code source utilisé dans cet exemple provient du fichier d'Apache Beam TextIO.java sur GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

Dans cet exemple, un constructeur unique accepte un argument string ou ValueProvider. Si l'argument est une chaîne string, il est converti en StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Java : SDK 1.x

Le code source utilisé dans cet exemple provient du fichier d'Apache Beam TextIO.java sur GitHub.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Utiliser NestedValueProvider

Remarque : Le SDK Apache Beam pour Python n'est pas compatible avec NestedValueProvider.

Pour calculer une valeur à partir d'un autre objet ValueProvider, utilisez NestedValueProvider.

NestedValueProvider exploite un objet ValueProvider et un traducteur SerializableFunction en tant qu'entrée. Lorsque vous appelez .get() sur un objet NestedValueProvider, le traducteur crée une valeur basée sur celle de ValueProvider. Cette traduction vous permet de créer la valeur finale souhaitée à l'aide d'une valeur ValueProvider :

  • Exemple 1 : l'utilisateur fournit un nom de fichier file.txt. La transformation ajoute le chemin gs://directory_name/ devant le nom du fichier. L'appel .get() renvoie gs://directory_name/file.txt.
  • Exemple 2 : l'utilisateur fournit une sous-chaîne (telle qu'une date spécifique) pour une requête BigQuery. La transformation crée la requête complète à l'aide de la sous-chaîne. L'appel .get() renvoie la requête complète.

Remarque : NestedValueProvider n'accepte qu'une seule valeur d'entrée. Vous ne pouvez pas combiner deux valeurs différentes à l'aide de NestedValueProvider.

Le code ci-dessous met en œuvre le premier exemple à l'aide de NestedValueProvider. L'utilisateur fournit un nom de fichier, puis la transformation ajoute le chemin devant le nom de fichier.

Java : SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Python

Le SDK Apache Beam pour Python n'est pas compatible avec NestedValueProvider.

Java : SDK 1.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply(TextIO.Write.named("OutputNums").to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Métadonnées

Vous pouvez étendre vos modèles en ajoutant des métadonnées afin de valider des paramètres personnalisés lors de l'exécution du modèle. Si vous souhaitez créer des métadonnées pour votre modèle, procédez comme suit :

  1. Créez un fichier au format JSON appelé <template-name>_metadata à l'aide des paramètres décrits dans le tableau ci-dessous.

    Remarque : N'appelez pas le fichier que vous créez <template-name>_metadata.json. Même si le fichier contient du JSON, il ne doit pas se terminer par l'extension .json.

  2. Stockez le fichier JSON dans le même dossier Cloud Storage que le modèle.

    Remarque : Le modèle doit être stocké dans <template-name> et les métadonnées dans <template-name>_metadata.

Paramètres de métadonnées

Clé du paramètre Obligatoire Description de la valeur
name Oui Nom du modèle.
description Non Court paragraphe de texte décrivant les modèles.
parameters Non. Par défaut, un tableau vide. Tableau de paramètres supplémentaires qui seront utilisés par le modèle.
name Oui Nom du paramètre utilisé dans le modèle.
label Oui Libellé lisible qui permettra de classer le paramètre dans l'interface utilisateur.
help_text Oui Court paragraphe de texte décrivant le paramètre.
is_optional Non. La valeur par défaut est "false". true si le paramètre est requis et false s'il est facultatif.
regexes Non. Par défaut, un tableau vide. Tableau d'expressions régulières POSIX-egrep sous forme de chaînes qui permettront de valider la valeur du paramètre. Par exemple, ["^[a-zA-Z][a-zA-Z0-9]+"] est une expression régulière qui valide le fait que la valeur commence par une lettre suivie d'un ou plusieurs caractères.

Exemple de fichier de métadonnées

Le service Cloud Dataflow utilise les métadonnées suivantes pour valider les paramètres personnalisés du modèle WordCount :

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input GCS file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output GCS file(s)"
    }
  ]
}

Vous pouvez télécharger ce fichier de métadonnées à partir du répertoire de modèles de Cloud Dataflow.

E/S de pipeline et paramètres d'exécution

Java : SDK 2.x

Certains connecteurs d'E/S contiennent des méthodes qui acceptent les objets ValueProvider. Pour déterminer la compatibilité d'un connecteur et d'une méthode spécifiques, consultez la documentation de référence de l'API relative au connecteur d'E/S. Les méthodes compatibles possèdent une surcharge avec ValueProvider. Si une méthode ne possède pas de surcharge, elle n'est pas compatible avec les paramètres d'exécution. Les connecteurs d'E/S suivants sont au moins partiellement compatibles avec ValueProvider :

  • E/S orientées fichier : TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (nécessite la version 2.3.0 ou ultérieure du SDK)
  • PubSubIO
  • SpannerIO

* Remarque : Si vous souhaitez exécuter un pipeline de traitement par lot qui lit des données depuis BigQuery, vous devez utiliser .withTemplateCompatibility() sur toutes les lectures BigQuery.

Python

Certains connecteurs d'E/S contiennent des méthodes qui acceptent les objets ValueProvider. Pour déterminer la compatibilité de connecteurs d'E/S et de leurs méthodes, consultez la documentation de référence de l'API relative au connecteur. Les connecteurs d'E/S suivants acceptent les paramètres d'exécution :

  • E/S orientées fichier : textio, avroio, tfrecordio

Java : SDK 1.x

Le tableau suivant contient la liste complète des méthodes qui acceptent les paramètres d'exécution.

Entrées/Sorties Méthode
BigQuery* BigQueryIO.Read.from()*
BigQueryIO.Read.fromQuery()*
BigQueryIO.Write.to()*
BigQueryIO.Write.withSchema()*
Cloud Pub/Sub PubsubIO.Read.subscription()
PubsubIO.Read.topic()
PubsubIO.Write.topic()
TextIO TextIO.Read.from()
TextIO.Write.to()

* Vous ne pouvez exécuter des modèles de pipeline de traitement par lots BigQuery qu'une seule fois, car l'ID de la tâche BigQuery est défini au moment de la création du modèle.

Créer et préproduire des modèles

Une fois votre pipeline conçu, vous devez créer et préproduire votre fichier de modèle. Utilisez la commande associée à votre version du SDK.

Remarque : Une fois le modèle créé et préproduit, l'emplacement de préproduction contient des fichiers supplémentaires nécessaires à l'exécution du modèle. Si vous supprimez cet emplacement, l'exécution du modèle échoue.

Java : SDK 2.x

La commande Maven ci-dessous crée et préproduit un modèle à l'emplacement Cloud Storage spécifié dans --templateLocation.

  • Remplacez YOUR_PROJECT_ID par votre ID de projet.

  • Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.

  • Remplacez YOUR_TEMPLATE_NAME par le nom de votre modèle.

  • Remplacez com.example.myclass par votre classe Java.

  • Vérifiez que le chemin d'accès à templateLocation est correct.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
    

Python

La commande Python ci-dessous crée et préproduit un modèle à l'emplacement Cloud Storage spécifié dans --template_location. Apportez les modifications suivantes à la commande :

  • Remplacez YOUR_PROJECT_ID par votre ID de projet.

  • Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.

  • Remplacez YOUR_TEMPLATE_NAME par le nom de votre modèle.

  • Remplacez examples.mymodule par votre module Python.

  • Vérifiez que le chemin d'accès à template_location est correct.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

Java : SDK 1.x

La commande Maven ci-dessous crée et préproduit un modèle à l'emplacement Cloud Storage spécifié dans --dataflowJobFile.

  • Remplacez YOUR_PROJECT_ID par votre ID de projet.

  • Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.

  • Remplacez YOUR_TEMPLATE_NAME par le nom de votre modèle.

  • Remplacez com.example.myclass par votre classe Java.

  • Vérifiez que le chemin d'accès à dataflowJobFile est correct.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=TemplatingDataflowPipelineRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --dataflowJobFile=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
    

Une fois votre modèle créé et préproduit, la prochaine étape consiste à l'exécuter.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.