Diffuser des messages Pub/Sub Lite à l'aide de Dataflow

Au lieu d'écrire et d'exécuter vos propres programmes de traitement de données, vous pouvez utiliser Dataflow avec le connecteur d'E/S Pub/Sub Lite pour Apache Beam. Cloud Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données en streaming (en temps réel) et par lot avec un niveau identique de fiabilité et d'expressivité. Il exécute de manière fiable des programmes développés à l'aide du SDK Apache Beam, qui offre un ensemble extensible d'abstractions de traitement avec état puissantes et de connecteurs d'E/S à d'autres systèmes de traitement par flux et par lot.

Ce guide de démarrage rapide explique comment écrire un pipeline Apache Beam pour :

  • Lire les messages de Pub/Sub Lite
  • Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
  • Écrire les messages dans Cloud Storage

Il vous indique également comment :

  • Envoyer votre pipeline pour exécution sur Dataflow
  • Créer un modèle Flex Dataflow à partir de votre pipeline

Ce tutoriel nécessite Maven, mais il est également possible de convertir l'exemple de projet de Maven vers Gradle. Pour en savoir plus, consultez Facultatif: passer de Maven à Gradle.

Avant de commencer

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login

Configurer votre projet Pub/Sub Lite

  1. Créer des variables pour votre bucket Cloud Storage, projet et Région Dataflow. Les noms des buckets Cloud Storage doivent être uniques. La région Dataflow doit être une région valide dans laquelle vous pouvez exécuter votre tâche. Pour en savoir plus sur les régions et les emplacements, consultez la page Emplacements Dataflow.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
  2. Créez un bucket Cloud Storage appartenant à ce projet :

       gcloud storage buckets create gs://$BUCKET

Créer un sujet et un abonnement zonaux Pub/Sub Lite

Créez un sujet Pub/Sub Lite zonal et un abonnement Lite.

Pour l'emplacement Lite, choisissez un emplacement Pub/Sub Lite compatible. Vous devez également spécifier une zone pour la région. Exemple :us-central1-a

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Diffuser des messages vers Dataflow

Télécharger l'exemple de code du guide de démarrage rapide

Clonez le dépôt du guide de démarrage rapide et accédez au répertoire de l'exemple de code.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Exemple de code

Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :

  • Lire les messages d'un abonnement Pub/Sub Lite en tant que source illimitée.
  • Regroupez les messages en fonction de leur code temporel de publication, à l'aide de fenêtres temporelles fixes et du déclencheur par défaut.
  • Écrire les messages groupés dans des fichiers Cloud Storage.

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Démarrer le pipeline Dataflow

Pour démarrer le pipeline dans Dataflow, exécutez la commande suivante :

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

La commande précédente lance une tâche Dataflow. Suivez le lien dans la sortie de la console pour accéder à la tâche dans la console de surveillance Dataflow.

Observer la progression de la tâche

Observez la progression de la tâche dans la console Dataflow.

Accéder à la console Dataflow

Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :

  • Graphique de la tâche
  • Détails de l'exécution
  • Métriques de tâche

Publiez des messages sur votre sujet Lite.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

Vous devrez peut-être patienter quelques minutes avant que les messages ne s'affichent dans vos journaux de nœud de calcul.

Utilisez la commande ci-dessous pour vérifier quels fichiers ont été écrits dans Cloud Storage.

gcloud storage ls "gs://$BUCKET/samples/"

Le résultat doit se présenter sous la forme suivante :

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Utilisez la commande ci-dessous pour examiner le contenu d'un fichier :

gcloud storage cat "gs://$BUCKET/samples/your-filename"

Facultatif: Créer un modèle Dataflow

Vous pouvez éventuellement créer un modèle Flex Dataflow personnalisé basé sur votre pipeline. Les modèles Dataflow vous permettent d'exécuter des tâches avec différents paramètres d'entrée à partir de la console Google Cloud ou de la ligne de commande sans avoir à configurer un environnement de développement Java complet.

  1. Créez un fichier Fat JAR qui inclut toutes les dépendances de votre pipeline. Une fois la commande exécutée, le fichier target/pubsublite-streaming-bundled-1.0.jar devrait s'afficher.

    mvn clean package -DskipTests=true
  2. Indiquez les noms et les emplacements de votre fichier de modèle et de votre image de conteneur de modèle.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
  3. Créer un modèle Flex personnalisé Un fichier metadata.json obligatoire, qui contient les spécifications nécessaires à l'exécution de la tâche, a été fourni avec l'exemple.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
  4. Exécutez une tâche à l'aide du modèle Flex personnalisé.

Console

  1. Créez une tâche à partir d'un modèle.

  2. Saisissez le nom de la tâche.

  3. Saisissez votre région Dataflow.

  4. Choisissez votre modèle personnalisé.

  5. Saisissez le chemin d'accès du modèle.

  6. Saisissez les paramètres requis.

  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \ 
     --serviceAccount=$SERVICE_ACCOUNT

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.

  1. Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline au lieu de le drainer.

  2. Supprimez le sujet et l'abonnement.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
  3. Supprimez les fichiers créés par le pipeline.

    gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
    gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
  4. Supprimez l'image du modèle et le fichier de modèle, s'ils existent.

    gcloud container images delete $TEMPLATE_IMAGE
    gcloud storage rm $TEMPLATE_PATH
  5. Supprimez le bucket Cloud Storage.

    gcloud storage rm gs://$BUCKET --recursive

  6. Supprimez le compte de service :
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Étape suivante