Utiliser Pub/Sub Lite avec Dataflow

Au lieu d'écrire et d'exécuter vos propres programmes de traitement de données, vous pouvez utiliser Dataflow avec le connecteur d'E/S Pub/Sub Lite pour Apache Beam. s'affiche en haut de l'écran. Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données par flux (en temps réel) et par lots avec un niveau identique de fiabilité et d'expressivité. Il exécute de manière fiable des programmes développés à l'aide du SDK Apache Beam, qui offre un ensemble extensible de puissantes abstractions de traitement avec état et des connecteurs d'E/S vers d'autres systèmes de traitement par flux et par lot.

Ce guide de démarrage rapide explique comment écrire un pipeline Apache Beam qui:

  • Lire des messages depuis Pub/Sub Lite
  • Fenêtrage (ou groupe) des messages par horodatage de publication
  • Écrire les messages dans Cloud Storage

Il vous montre également comment:

  • Envoyer votre pipeline pour l'exécuter sur Dataflow
  • Créer un modèle Dataflow Flex à partir de votre pipeline

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API Pub/Sub Lite, Dataflow, Cloud Storage, Logging .

    Activer les API

  5. Créez un compte de service :

    1. Dans Cloud Console, accédez à la page Créer un compte de service.

      Accéder à la page "Créer un compte de service"
    2. Sélectionnez un projet.
    3. Dans le champ Nom du compte de service, saisissez un nom. Cloud Console remplit le champ ID du compte de service en fonction de ce nom.

      Dans le champ Description du compte de service, saisissez une description. Exemple : Service account for quickstart.

    4. Cliquez sur Create (Créer).
    5. Cliquez sur le champ Sélectionner un rôle.

      Dans la section Accès rapide, cliquez sur Basique, puis sur Propriétaire.

    6. Cliquez sur Continuer.
    7. Cliquez sur OK pour terminer la création du compte de service.

      Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

  6. Créez une clé de compte de service :

    1. Dans Cloud Console, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
    2. Cliquez sur Clés.
    3. Cliquez sur Ajouter une clé, puis sur Créer une clé.
    4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.
    5. Cliquez sur Close (Fermer).
  7. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

  8. Installez et initialisez le SDK Cloud.

Configurer

  1. Créez des variables pour votre projet et votre bucket. Les noms des buckets Cloud Storage doivent être uniques. Créez un bucket Cloud Storage.

    export PROJECT_ID=$(gcloud config get-value project)
    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  2. Sélectionnez une zone Pub/Sub Lite dans laquelle vous prévoyez d'exécuter votre tâche Dataflow. Créer un sujet et un abonnement Pub/Sub Lite

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export LITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --zone=$LITE_LOCATION \
        --partitions=1 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --zone=$LITE_LOCATION \
        --topic=$TOPIC \
        --starting-offset=beginning
    
  3. Sélectionnez une région Dataflow dans laquelle vous prévoyez d'exécuter votre tâche Dataflow.

    export DATAFLOW_REGION=your-dataflow-region
    

Démarrer le streaming

Clonez le dépôt de démarrage rapide et accédez au répertoire de l'exemple de code:

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :

  • Lecture des messages d'un abonnement Pub/Sub Lite en tant que source illimitée
  • Regroupez les messages dans des intervalles de taille fixe selon leur horodatage de publication.
  • Écrire les messages regroupés dans des fichiers stockés sur Cloud Storage.

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.beam.PubsubLiteIO;
import com.google.cloud.pubsublite.beam.SubscriberOptions;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOpitons =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // TODO: Replace the I/O connector with the one released with Apache Beam when it's stable.
        // https://issues.apache.org/jira/browse/BEAM-10114
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOpitons))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp. The element timestamp is the publish time associated with a message.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))
                // Fire a trigger every 30 seconds after receiving the first element.
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(Duration.standardSeconds(30))))
                // Ignore late elements.
                .withAllowedLateness(Duration.ZERO)
                // Accumulate elements in fired panes. This will make sure that elements collected
                // in an earlier pane by an earlier trigger will not be overwritten by those
                // arriving later due to a later trigger fired in the same window.
                .accumulatingFiredPanes())
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Pour démarrer le pipeline dans Dataflow, exécutez la commande suivante:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner"

La commande précédente lance une tâche Dataflow. Cliquez sur le lien dans le résultat de la console pour accéder à la tâche dans la console de surveillance Dataflow.

Observer la progression de la tâche

Observez la progression de la tâche dans la console Dataflow.

Accéder à la console Dataflow

Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :

  • Graphique de tâche
  • Détails de l'exécution
  • Métriques de tâche

Publiez des messages sur votre sujet Lite.

gcloud beta pubsub lite-topics publish $TOPIC \
    --zone=$LITE_LOCATION \
    --message="Hello World!"

Vous devrez peut-être attendre quelques minutes pour que les messages s'affichent dans vos journaux de travail.

Utilisez la commande ci-dessous pour vérifier les fichiers qui ont été écrits dans Cloud Storage.

gsutil ls "gs://$BUCKET/samples/"

Le résultat doit se présenter sous la forme suivante :

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Utilisez la commande ci-dessous pour examiner le contenu d'un fichier:

gsutil cat "gs://$BUCKET/samples/your-filename"

Créer un modèle

Vous pouvez éventuellement créer un modèle Dataflow Flex personnalisé en fonction de votre pipeline. Les modèles Dataflow vous permettent d'exécuter des tâches avec des paramètres d'entrée différents depuis Cloud Console ou la ligne de commande, sans avoir à configurer un environnement de développement Java complet.

  1. Créez un fichier JAR Fun qui inclut toutes les dépendances de votre pipeline. target/pubsublite-streaming-bundled-1.0.jar devrait s'afficher après l'exécution de la commande.

    mvn clean package -DskipTests=true
    
  2. Indiquez les noms et les emplacements de votre fichier de modèle et de votre image de conteneur.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. Créez un modèle Flex personnalisé. Un fichier metadata.json obligatoire, qui contient la spécification nécessaire à l'exécution de la tâche, a été fourni dans l'exemple.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. Exécuter une tâche à l'aide du modèle flexible personnalisé.

Console

  1. Créer une tâche à partir d'un modèle.

  2. Saisissez un Job name (Nom de tâche).

  3. Saisissez votre région Dataflow.

  4. Choisissez votre Modèle personnalisé.

  5. Saisissez le chemin d'accès au modèle.

  6. Saisissez les paramètres obligatoires.

  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION

Nettoyer

Pour éviter que les ressources utilisées dans ce guide démarrage rapide soient facturées sur votre compte Google Cloud :

  1. Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline au lieu de le drainer.

  2. Supprimez le sujet et l'abonnement.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. Supprimez les fichiers créés par le pipeline.

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. Supprimez l'image du modèle et le fichier modèle s'il existe.

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. Supprimez le bucket Cloud Storage.

    gsutil rb gs://$BUCKET
    

Étape suivante