Modèle de fichiers texte sur Cloud Storage vers Pub/Sub (Flux)

Le modèle crée un pipeline de streaming qui interroge en permanence les nouveaux fichiers texte chargés dans Cloud Storage, lit chaque fichier ligne par ligne et publie des chaînes dans un sujet Pub/Sub. Le modèle publie les enregistrements dans un fichier délimité par une nouvelle ligne contenant des enregistrements JSON ou un fichier CSV dans un sujet Pub/Sub pour un traitement en temps réel. Vous pouvez utiliser ce modèle pour relire les données dans Pub/Sub.

Le pipeline s'exécute indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation "Watch", qui est une fonction "SplittableDoFn" qui n'est pas compatible avec le drainage.

Actuellement, l'intervalle d'interrogation est fixé à 10 secondes. Ce modèle ne définit aucun horodatage sur les enregistrements individuels. Par conséquent, l'heure de l'événement est égale à l'heure de publication pendant l'exécution. Si le traitement de votre pipeline dépend d'une heure d'événement précise, nous vous conseillons de ne pas utiliser ce pipeline.

Conditions requises pour ce pipeline

  • Les fichiers d'entrée doivent être au format CSV ou JSON délimité par une nouvelle ligne. Les enregistrements couvrant plusieurs lignes dans les fichiers sources peuvent entraîner des problèmes en aval, car chaque ligne dans les fichiers est publiée sous forme de message à Pub/Sub.
  • Le sujet Pub/Sub doit exister avant l'exécution.
  • Le pipeline fonctionne indéfiniment et doit être terminé manuellement.

Paramètres de modèle

Paramètres obligatoires

  • inputFilePattern: modèle de fichier d'entrée à lire. Exemple :gs://bucket-name/files/*.json
  • outputTopic: sujet d'entrée Pub/Sub dans lequel écrire. Le nom doit être au format projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Par exemple, projects/your-project-id/topics/your-topic-name.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
  8. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • BUCKET_NAME : nom de votre bucket Cloud Storage
  • FILE_PATTERN : modèle de fichier glob à lire dans le bucket Cloud Storage (par exemple, path/*.csv)

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • TOPIC_NAME : nom de votre sujet Pub/Sub
  • BUCKET_NAME : nom de votre bucket Cloud Storage
  • FILE_PATTERN : modèle de fichier glob à lire dans le bucket Cloud Storage (par exemple, path/*.csv)
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.TextToPubsub.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Watch;
import org.joda.time.Duration;

/**
 * The {@code TextToPubsubStream} is a streaming version of {@code TextToPubsub} pipeline that
 * publishes records to Cloud Pub/Sub from a set of files. The pipeline continuously polls for new
 * files, reads them row-by-row and publishes each record as a string message. The polling interval
 * is fixed and equals to 10 seconds. At the moment, publishing messages with attributes is
 * unsupported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Stream_GCS_Text_to_Cloud_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Stream_GCS_Text_to_Cloud_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Text Files on Cloud Storage to Pub/Sub",
    description = {
      "This template creates a streaming pipeline that continuously polls for new text files uploaded to Cloud Storage, reads each file line by line, and publishes strings to a Pub/Sub topic. "
          + "The template publishes records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. "
          + "You can use this template to replay data to Pub/Sub.\n",
      "The pipeline runs indefinitely and needs to be terminated manually via a <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the <code>Watch</code> transform, which is a <code>SplittableDoFn</code> that does not support draining.\n",
      "Currently, the polling interval is fixed and set to 10 seconds. This template does not set any timestamp on the individual records, so the event time is equal to the publishing time during execution. "
          + "If your pipeline relies on an accurate event time for processing, you should not use this pipeline."
    },
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-pubsub-stream",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "Input files must be in newline-delimited JSON or CSV format. Records that span multiple lines in the source files can cause issues downstream, because each line within the files is published as a message to Pub/Sub.",
      "The Pub/Sub topic must exist prior to execution.",
      "The pipeline runs indefinitely and needs to be terminated manually.",
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class TextToPubsubStream {
  private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);

  /**
   * Main entry-point for the pipeline. Reads in the command-line arguments, parses them, and
   * executes the pipeline.
   *
   * @param args Arguments passed in from the command-line.
   */
  public static void main(String[] args) {

    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    run(options);
  }

  /**
   * Executes the pipeline with the provided execution parameters.
   *
   * @param options The execution parameters.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps:
     *  1) Read from the text source.
     *  2) Write each text record to Pub/Sub
     */
    pipeline
        .apply(
            "Read Text Data",
            TextIO.read()
                .from(options.getInputFilePattern())
                .watchForNewFiles(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
        .apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

Étape suivante