Diffuser des messages Pub/Sub Lite à l'aide de Dataflow

Au lieu d'écrire et d'exécuter vos propres programmes de traitement de données, vous pouvez utiliser Dataflow avec le connecteur d'E/S Pub/Sub Lite pour Apache Beam. Cloud Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données en streaming (en temps réel) et par lot avec un niveau identique de fiabilité et d'expressivité. Il exécute de manière fiable des programmes développés à l'aide du SDK Apache Beam, qui offre un ensemble extensible d'abstractions de traitement avec état puissantes et de connecteurs d'E/S à d'autres systèmes de traitement par flux et par lot.

Ce guide de démarrage rapide explique comment écrire un pipeline Apache Beam pour :

  • Lire les messages de Pub/Sub Lite
  • Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
  • Écrire les messages dans Cloud Storage

Il vous indique également comment :

  • Envoyer votre pipeline pour exécution sur Dataflow
  • Créer un modèle Flex Dataflow à partir de votre pipeline

Ce tutoriel nécessite Maven, mais il est également possible de convertir l'exemple de projet Maven vers Gradle. Pour en savoir plus, consultez Facultatif: convertir de Maven à Gradle.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Installez Google Cloud CLI.
  3. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  4. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  5. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  6. Activer les API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging :

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Configurez l'authentification :

    1. Créez le compte de service :

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Remplacez SERVICE_ACCOUNT_NAME par le nom que vous souhaitez donner au compte de service.

    2. Attribuez des rôles au compte de service. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
      • ROLE : rôle à accorder
    3. Attribuez à votre compte Google un rôle vous permettant d'utiliser les rôles du compte de service et d'associer le compte de service à d'autres ressources :

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
      • USER_EMAIL: adresse e-mail de votre compte Google
  8. Installez Google Cloud CLI.
  9. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  10. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  11. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  12. Activer les API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging :

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Configurez l'authentification :

    1. Créez le compte de service :

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Remplacez SERVICE_ACCOUNT_NAME par le nom que vous souhaitez donner au compte de service.

    2. Attribuez des rôles au compte de service. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin :

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
      • ROLE : rôle à accorder
    3. Attribuez à votre compte Google un rôle vous permettant d'utiliser les rôles du compte de service et d'associer le compte de service à d'autres ressources :

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Remplacez les éléments suivants :

      • SERVICE_ACCOUNT_NAME : nom du compte de service.
      • PROJECT_ID : ID du projet dans lequel vous avez créé le compte de service.
      • USER_EMAIL: adresse e-mail de votre compte Google
  14. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login

Configurer votre projet Pub/Sub Lite

  1. Créez des variables pour votre bucket, votre projet et votre région Dataflow dans Cloud Storage. Les noms des buckets Cloud Storage doivent être uniques. La région Dataflow doit être une région valide dans laquelle vous pouvez exécuter votre job. Pour en savoir plus sur les régions et les emplacements, consultez la page Emplacements Dataflow.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
    
  2. Créez un bucket Cloud Storage appartenant à ce projet :

       gsutil mb gs://$BUCKET
    

Créer un sujet Lite et un abonnement Pub/Sub Lite zonaux

Créez un sujet Lite Pub/Sub Lite et un abonnement Lite zonaux.

Pour l'emplacement Lite, choisissez un emplacement Pub/Sub Lite compatible. Vous devez également spécifier une zone pour la région. Par exemple, us-central1-a.

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Diffuser des messages vers Dataflow

Télécharger l'exemple de code du guide de démarrage rapide

Clonez le dépôt de démarrage rapide et accédez au répertoire de l'exemple de code.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Exemple de code

Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :

  • Lire les messages d'un abonnement Pub/Sub Lite en tant que source illimitée.
  • Regroupez les messages en fonction de leur horodatage de publication, à l'aide de périodes fixes et du déclencheur par défaut.
  • Écrire les messages groupés dans des fichiers Cloud Storage.

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java décrites dans les bibliothèques clientes Pub/Sub Lite.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Démarrer le pipeline Dataflow

Pour démarrer le pipeline dans Dataflow, exécutez la commande suivante :

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

La commande précédente lance une tâche Dataflow. Suivez le lien dans la sortie de la console pour accéder à la tâche dans la console de surveillance Dataflow.

Observer la progression de la tâche

Observez la progression de la tâche dans la console Dataflow.

Accéder à la console Dataflow

Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :

  • Graphique de la tâche
  • Détails de l'exécution
  • Métriques de tâche

Publiez des messages sur votre sujet Lite.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

Vous devrez peut-être patienter quelques minutes avant que les messages ne s'affichent dans vos journaux de nœud de calcul.

Utilisez la commande ci-dessous pour vérifier quels fichiers ont été écrits dans Cloud Storage.

gsutil ls "gs://$BUCKET/samples/"

Le résultat doit se présenter sous la forme suivante :

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Utilisez la commande ci-dessous pour examiner le contenu d'un fichier :

gsutil cat "gs://$BUCKET/samples/your-filename"

Créer un modèle Dataflow (facultatif)

Vous pouvez éventuellement créer un modèle Flex Dataflow personnalisé basé sur votre pipeline. Les modèles Dataflow vous permettent d'exécuter des tâches avec différents paramètres d'entrée à partir de la console Google Cloud ou de la ligne de commande, sans avoir à configurer un environnement de développement Java complet.

  1. Créez un fichier Fat JAR qui inclut toutes les dépendances de votre pipeline. Une fois la commande exécutée, le fichier target/pubsublite-streaming-bundled-1.0.jar devrait s'afficher.

    mvn clean package -DskipTests=true
    
  2. Indiquez les noms et les emplacements de votre fichier de modèle et de votre image de conteneur de modèle.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. Créer un modèle Flex personnalisé Un fichier metadata.json obligatoire, qui contient les spécifications nécessaires à l'exécution de la tâche, a été fourni avec l'exemple.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. Exécutez une tâche à l'aide du modèle Flex personnalisé.

Console

  1. Créez une tâche à partir d'un modèle.

  2. Saisissez le nom de la tâche.

  3. Saisissez votre région Dataflow.

  4. Choisissez votre modèle personnalisé.

  5. Saisissez le chemin d'accès du modèle.

  6. Saisissez les paramètres requis.

  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \
     --serviceAccount=$SERVICE_ACCOUNT

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.

  1. Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline au lieu de le drainer.

  2. Supprimez le sujet et l'abonnement.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. Supprimez les fichiers créés par le pipeline.

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. Supprimez l'image du modèle et le fichier de modèle, s'ils existent.

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. Supprimez le bucket Cloud Storage.

    gsutil rb gs://$BUCKET
    

  6. Supprimez le compte de service :
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Facultatif : Révoquez les identifiants d'authentification que vous avez créés et supprimez le fichier d'identifiants local.

    gcloud auth application-default revoke
  8. Facultatif : Révoquez les identifiants de la CLI gcloud.

    gcloud auth revoke

Étapes suivantes