Utiliser des modèles Flex

Ce tutoriel explique comment créer et exécuter une tâche de modèle Flex Dataflow avec une image Docker personnalisée à l'aide de l'outil de ligne de commande gcloud. Il présente un exemple de pipeline de traitement par flux qui lit les messages encodés en JSON à partir de Pub/Sub, transforme les données des messages avec Beam SQL et écrit les résultats dans une table BigQuery.

Objectifs

  • Créer une image de conteneur Docker
  • Créer et exécuter un modèle Flex Dataflow

Coûts

Ce tutoriel utilise des composants facturables de Google Cloud, dont :

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Utilisez le Simulateur de coût pour générer une estimation des coûts en fonction de votre utilisation prévue.

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder à la page de sélection du projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Activer les API

  5. Configurer l'authentification :
    1. Dans Cloud Console, accédez à la page Créer une clé de compte de service.

      Accéder à la page "Créer une clé de compte de service"
    2. Dans la liste Compte de service, sélectionnez Nouveau compte de service.
    3. Dans le champ Nom du compte de service, saisissez un nom.
    4. Dans la liste Rôle, sélectionnez Projet > Propriétaire

    5. Cliquez sur Créer. Un fichier JSON contenant votre clé est téléchargé sur votre ordinateur.
  6. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Nettoyer.

Créer l'exemple de source et de récepteur

Cette section explique comment créer les éléments suivants :

  • Une source de données par flux utilisant Pub/Sub
  • Un ensemble de données pour charger les données dans BigQuery

Créer un bucket Cloud Storage

Exécutez la commande gsutil mb :

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Créer un sujet Pub/Sub et un abonnement à ce sujet

Utilisez l'outil de ligne de commande gcloud :

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Créer une tâche Cloud Scheduler

Dans cette étape, nous utilisons l'outil de ligne de commande gcloud pour créer et exécuter une tâche Cloud Scheduler qui publie des "avis positifs" et des "avis négatifs".

  1. Créez une tâche Cloud Scheduler pour ce projet Google Cloud.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. Cela permet de créer et d'exécuter un éditeur pour les "avis positifs" qui publie un message par minute.
  3. Démarrez la tâche Cloud Scheduler.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Créez et exécutez un autre éditeur similaire pour les "avis négatifs", qui publie un message toutes les deux minutes.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

Créer un ensemble de données BigQuery

Utilisez la commande bq mk :

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Télécharger l'exemple de code

  1. Téléchargez l'exemple de code.

    Java

    Clonez le dépôt java-docs-samples et accédez à l'exemple de code utilisé pour ce tutoriel.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Clonez le dépôt python-docs-samples et accédez à l'exemple de code utilisé pour ce tutoriel.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Exportez le TEMPLATE_IMAGE pour ce tutoriel.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Configurer votre environnement de développement

Java

  1. Téléchargez et installez le kit de développement Java (JDK) version 11. Vérifiez que la variable d'environnement JAVA_HOME est définie et qu'elle pointe vers votre installation JDK.
  2. Téléchargez et installez Apache Maven en suivant les instructions du guide d'installation de Maven spécifique à votre système d'exploitation.
  3. (Facultatif) Exécutez le pipeline Apache Beam en local pour le développement.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Créez le projet Java dans un fichier Uber JAR.
      mvn clean package
  5. (Facultatif) Notez la taille du fichier Uber JAR par rapport au fichier d'origine.
      ls -lh target/*.jar
    Toutes les dépendances sont intégrées dans ce fichier Uber JAR. Vous pouvez exécuter ce fichier en tant qu'application autonome sans dépendances externes sur d'autres bibliothèques.

Python

Utilisez le SDK Apache Beam pour Python avec pip et Python version 2.7, 3.5, 3.6 ou 3.7. Vérifiez que votre installation de Python et pip fonctionnent correctement en exécutant la commande suivante :

    python --version
    python -m pip --version

Si vous ne disposez pas de Python, recherchez la procédure d'installation pour votre système d'exploitation sur la page Installer Python.

Python uniquement : créer et compiler une image de conteneur

Cette section contient les étapes destinées aux utilisateurs de Python. Si vous utilisez Java, ignorez les étapes ci-dessous.

  1. (Facultatif) Activez l'utilisation du cache Kaniko par défaut.
    gcloud config set builds/use_kaniko True
    
    Comme Kaniko met en cache les artefacts de compilation des conteneurs, l'utilisation de cette option accélère les compilations suivantes.
  2. (Facultatif) Créez le fichier Dockerfile. Vous pouvez le personnaliser à l'aide de ce tutoriel. Le fichier de démarrage se présente comme suit :

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      # Due to a change in the Apache Beam base image in version 2.24, you must to install
      # libffi-dev manually as a dependency. For more information:
      #   https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891
      RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/*
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    Ce fichier Dockerfile contient les commandes FROM, ENV et COPY, qui sont présentées dans la documentation de référence sur Dockerfile.

    Les images commençant par gcr.io/PROJECT/ sont enregistrées dans le registre Container Registry de votre projet, où l'image est accessible aux autres produits Google Cloud.
  3. Créez l'image Docker à l'aide d'un fichier Dockerfile avec Cloud Build.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Créer un modèle Flex

Pour exécuter un modèle, vous devez créer un fichier de spécification de modèle dans Cloud Storage contenant toutes les informations nécessaires à l'exécution de la tâche, telles que les informations et les métadonnées du SDK.

Le fichier metadata.json de cet exemple contient des informations supplémentaires pour le modèle, telles que les champs name, description et parameters d'entrée.

  1. Créez un fichier de spécification de modèle contenant toutes les informations nécessaires à l'exécution de la tâche, telles que les informations et les métadonnées du SDK.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Créez le modèle Flex.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSQL"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

Il est désormais disponible via le fichier de modèle à l'emplacement Cloud Storage que vous avez spécifié.

Exécuter un pipeline de modèle Flex

Vous pouvez maintenant exécuter le pipeline Apache Beam dans Dataflow. Pour ce faire, faites référence au fichier de modèle et transmettez les paramètres de modèle requis par le pipeline.

  1. Exécutez le modèle.

    Java

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Vous pouvez également exécuter le modèle avec une requête API REST.
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/us-central1/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
    
  2. Une fois que vous avez exécuté la commande pour exécuter le modèle Flex, Dataflow renvoie un ID de tâche avec l'état de tâche En file d'attente. Plusieurs minutes peuvent s'écouler avant que l'état de la tâche ne devienne En cours d'exécution. Vous pouvez alors accéder au graphique de la tâche.
  3. Vérifiez les résultats dans BigQuery en exécutant la requête suivante :
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    Pendant l'exécution de ce pipeline, de nouvelles lignes sont ajoutées chaque minute à la table BigQuery.

Nettoyer

Une fois que vous avez terminé le tutoriel, vous pouvez nettoyer les ressources que vous avez créées sur Google Cloud pour qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

Nettoyer les ressources du modèle Flex

  1. Arrêtez le pipeline Dataflow.
    gcloud dataflow jobs list \
      --filter 'NAME:streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Supprimez le fichier de spécification de modèle de Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Supprimez l'image de conteneur du modèle Flex de Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Nettoyer les ressources du projet Google Cloud

  1. Supprimez les tâches Cloud Scheduler.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Supprimez l'abonnement et le sujet Pub/Sub.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Supprimez la table BigQuery.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Supprimez l'ensemble de données BigQuery, ce qui ne génère aucuns frais.

    La commande suivante supprime également toutes les tables de l'ensemble de données. Les tables et les données ne peuvent pas être récupérées.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Supprimez le bucket Cloud Storage, ce qui ne génère aucuns frais.

    La commande suivante supprime également tous les objets du bucket. Ces objets ne peuvent pas être récupérés.

    gsutil rm -r gs://$BUCKET
    

Limites

Les limites suivantes s'appliquent aux tâches de modèles Flex :

  • Vous devez utiliser une image de base fournie par Google pour empaqueter vos conteneurs à l'aide de Docker.
  • waitUntilFinish (Java) et wait_until_finish (Python) ne sont pas acceptés.

Étape suivante