Utiliser des modèles Flex

Ce tutoriel explique comment créer et exécuter une tâche de modèle Flex Dataflow avec une image Docker personnalisée à l'aide de l'outil de ligne de commande gcloud. Il présente un exemple de pipeline de traitement par flux qui lit les messages encodés en JSON à partir de Pub/Sub, transforme les données des messages avec Beam SQL et écrit les résultats dans une table BigQuery. Dans ce tutoriel, vous allez stocker votre image de conteneur dans Container Registry, mais Dataflow accepte également de stocker des images de modèle Flex dans Artifact Registry.

Pour en savoir plus sur les modèles Flex, consultez la page Modèles Dataflow.

Objectifs

  • Créer une image de conteneur Docker
  • Créer et exécuter un modèle Flex Dataflow

Coûts

Ce tutoriel utilise des composants facturables de Google Cloud, dont :

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

Utilisez le Simulateur de coût pour générer une estimation des coûts en fonction de votre utilisation prévue.

Avant de commencer

Cette section explique comment activer des API, créer un compte de service et attribuer un rôle de propriétaire à ce compte. Dans un environnement de production, n'accordez pas le rôle Propriétaire. Utilisez plutôt les autorisations et les rôles spécifiques Dataflow appropriés. Pour en savoir plus, consultez la section Comprendre les autorisations liées aux modèles Flex.

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activer les API Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Activer les API

  5. Créez un compte de service :

    1. Dans Cloud Console, accédez à la page Créer un compte de service.

      Accéder à la page "Créer un compte de service"
    2. Sélectionnez un projet.
    3. Dans le champ Nom du compte de service, saisissez un nom. Cloud Console remplit le champ ID du compte de service en fonction de ce nom.

      Dans le champ Description du compte de service, saisissez une description. Exemple : Service account for quickstart.

    4. Cliquez sur Créer et continuer.
    5. Cliquez sur le champ Sélectionner un rôle.

      Dans la section Accès rapide, cliquez sur Basique, puis sur Propriétaire.

    6. Cliquez sur Continuer.
    7. Cliquez sur OK pour terminer la création du compte de service.

      Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

  6. Créez une clé de compte de service :

    1. Dans Cloud Console, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
    2. Cliquez sur Clés.
    3. Cliquez sur Add key (Ajouter une clé), puis sur Create new key (Créer une clé).
    4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.
    5. Cliquez sur Close (Fermer).
  7. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

  8. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  9. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  10. Activer les API Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build.

    Activer les API

  11. Créez un compte de service :

    1. Dans Cloud Console, accédez à la page Créer un compte de service.

      Accéder à la page "Créer un compte de service"
    2. Sélectionnez un projet.
    3. Dans le champ Nom du compte de service, saisissez un nom. Cloud Console remplit le champ ID du compte de service en fonction de ce nom.

      Dans le champ Description du compte de service, saisissez une description. Exemple : Service account for quickstart.

    4. Cliquez sur Créer et continuer.
    5. Cliquez sur le champ Sélectionner un rôle.

      Dans la section Accès rapide, cliquez sur Basique, puis sur Propriétaire.

    6. Cliquez sur Continuer.
    7. Cliquez sur OK pour terminer la création du compte de service.

      Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

  12. Créez une clé de compte de service :

    1. Dans Cloud Console, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
    2. Cliquez sur Clés.
    3. Cliquez sur Add key (Ajouter une clé), puis sur Create new key (Créer une clé).
    4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.
    5. Cliquez sur Close (Fermer).
  13. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

Une fois que vous avez terminé ce tutoriel, évitez de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Nettoyer.

Créer l'exemple de source et de récepteur

Cette section explique comment créer les éléments suivants :

  • Une source de données par flux utilisant Pub/Sub
  • Un ensemble de données pour charger les données dans BigQuery

Créer un bucket Cloud Storage

Exécutez la commande gsutil mb :

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Créer un sujet Pub/Sub et un abonnement à ce sujet

Utilisez l'outil de ligne de commande gcloud :

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Créer une tâche Cloud Scheduler

Dans cette étape, nous utilisons l'outil de ligne de commande gcloud pour créer et exécuter une tâche Cloud Scheduler qui publie des "avis positifs" et des "avis négatifs".

  1. Créez une tâche Cloud Scheduler pour ce projet Google Cloud.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. Cela permet de créer et d'exécuter un éditeur pour les "avis positifs" qui publie un message par minute.
  3. Démarrez la tâche Cloud Scheduler.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. Créez et exécutez un autre éditeur similaire pour les "avis négatifs", qui publie un message toutes les deux minutes.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

Créer un ensemble de données BigQuery

Utilisez la commande bq mk :

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

Télécharger l'exemple de code

  1. Téléchargez l'exemple de code.

    Java

    Clonez le dépôt java-docs-samples et accédez à l'exemple de code utilisé pour ce tutoriel.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    Clonez le dépôt python-docs-samples et accédez à l'exemple de code utilisé pour ce tutoriel.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. Exportez le TEMPLATE_IMAGE pour ce tutoriel.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

Configurer votre environnement de développement

Java

  1. Téléchargez et installez le kit de développement Java (JDK) version 11. Vérifiez que la variable d'environnement JAVA_HOME est définie et qu'elle pointe vers votre installation JDK.
  2. Téléchargez et installez Apache Maven en suivant les instructions du guide d'installation de Maven spécifique à votre système d'exploitation.
  3. (Facultatif) Exécutez le pipeline Apache Beam en local pour le développement.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. Créez le projet Java dans un fichier Uber JAR.
      mvn clean package
  5. (Facultatif) Notez la taille du fichier Uber JAR par rapport au fichier d'origine.
      ls -lh target/*.jar
    Toutes les dépendances sont intégrées dans ce fichier Uber JAR. Vous pouvez exécuter ce fichier en tant qu'application autonome sans dépendances externes sur d'autres bibliothèques.

Python

Utilisez le SDK Apache Beam pour Python.

Python uniquement : créer et compiler une image de conteneur

Cette section contient les étapes destinées aux utilisateurs de Python. Si vous utilisez Java, ignorez les étapes ci-dessous.

Si l'exécution de votre tâche échoue et que le message d'erreur A Timeout in polling error message s'affiche, consultez la procédure de dépannage.

  1. (Facultatif) Activez l'utilisation du cache Kaniko par défaut.
    gcloud config set builds/use_kaniko True
    
    Comme Kaniko met en cache les artefacts de compilation des conteneurs, l'utilisation de cette option accélère les compilations suivantes.
  2. (Facultatif) Créez le fichier Dockerfile. Vous pouvez le personnaliser à l'aide de ce tutoriel. Le fichier de démarrage se présente comme suit :

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      # Do not include `apache-beam` in requirements.txt
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      # Install apache-beam and other dependencies to launch the pipeline
      RUN pip install apache-beam[gcp]
      RUN pip install -U -r ./requirements.txt

    Ce fichier Dockerfile contient les commandes FROM, ENV et COPY, qui sont présentées dans la documentation de référence sur Dockerfile.

    Les images commençant par gcr.io/PROJECT/ sont enregistrées dans le registre Container Registry de votre projet, où l'image est accessible aux autres produits Google Cloud. Bien que ce ne soit pas illustré dans ce tutoriel, vous pouvez également stocker les images dans Artifact Registry.
  3. Créez l'image Docker à l'aide d'un fichier Dockerfile avec Cloud Build.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Métadonnées

Vous pouvez étendre votre modèle en ajoutant des métadonnées afin de valider des paramètres personnalisés lors de l'exécution du modèle. Si vous souhaitez créer des métadonnées pour votre modèle, procédez comme suit :

  1. Créez un fichier metadata.json à l'aide des paramètres de Paramètres de métadonnées.

    Pour afficher un exemple, consultez la section Exemple de fichier de métadonnées.

  2. Stockez le fichier de métadonnées dans le même dossier Cloud Storage que le modèle.

Paramètres de métadonnées

Clé du paramètre Requis Description de la valeur
name Oui Nom du modèle.
description Non Court paragraphe de texte décrivant le modèle.
parameters Non Tableau de paramètres supplémentaires utilisés par le modèle. Un tableau vide est utilisé par défaut.
name Oui Nom du paramètre utilisé dans le modèle.
label Oui Chaîne au format lisible utilisée dans Cloud Console pour étiqueter le paramètre.
helpText Oui Court paragraphe de texte décrivant le paramètre.
isOptional Non false si le paramètre est requis et true s'il est facultatif. À moins qu'une valeur ne soit définie, isOptional est défini par défaut sur false. Si vous n'incluez pas cette clé de paramètre pour vos métadonnées, les métadonnées deviennent un paramètre obligatoire.
regexes Non Tableau d'expressions régulières POSIX-egrep sous forme de chaînes qui permettent de valider la valeur du paramètre. Par exemple : ["^[a-zA-Z][a-zA-Z0-9]+"] est une expression régulière unique qui vérifie que la valeur commence par une lettre, puis comporte un ou plusieurs caractères. Un tableau vide est utilisé par défaut.

Exemple de fichier de métadonnées

Java

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Vous pouvez télécharger des fichiers de métadonnées pour les modèles fournis par Google à partir du répertoire de modèles de Dataflow.

Créer un modèle Flex

Pour exécuter un modèle, vous devez créer un fichier de spécification de modèle dans Cloud Storage contenant toutes les informations nécessaires à l'exécution de la tâche, telles que les informations et les métadonnées du SDK.

Ce tutoriel utilise l'exemple de fichier de métadonnées, qui contient des informations supplémentaires pour le modèle, telles que les champs name, description et le champ d'entrée parameters.

  1. Créez un fichier de spécification de modèle contenant toutes les informations nécessaires à l'exécution de la tâche, telles que les informations et les métadonnées du SDK.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Créez le modèle Flex.

    Java

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

Il est désormais disponible via le fichier de modèle à l'emplacement Cloud Storage que vous avez spécifié.

Exécuter un pipeline de modèle Flex

Vous pouvez maintenant exécuter le pipeline Apache Beam dans Dataflow. Pour ce faire, faites référence au fichier de modèle et transmettez les paramètres de modèle requis par le pipeline.

  1. Dans l'interface système ou le terminal, exécutez le modèle :

    Java

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="projects/$PROJECT/subscriptions/$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    Vous pouvez également exécuter le modèle avec une requête API REST :

    Java

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'$PROJECT'/subscriptions/'$SUBSCRIPTION'",
            "output_table": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
  2. Une fois que vous avez exécuté la commande pour exécuter le modèle Flex, Dataflow renvoie un ID de tâche avec l'état de tâche En file d'attente. Plusieurs minutes peuvent s'écouler avant que l'état de la tâche ne devienne En cours d'exécution. Vous pouvez alors accéder au graphique de la tâche.
  3. Vérifiez les résultats dans BigQuery en exécutant la requête suivante :
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    Pendant l'exécution de ce pipeline, de nouvelles lignes sont ajoutées chaque minute à la table BigQuery.

Nettoyer

Une fois que vous avez terminé le tutoriel, vous pouvez nettoyer les ressources que vous avez créées sur Google Cloud pour qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

Nettoyer les ressources du modèle Flex

  1. Arrêtez le pipeline Dataflow.
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Supprimez le fichier de spécification de modèle de Cloud Storage.
    gsutil rm $TEMPLATE_PATH
    
  3. Supprimez l'image de conteneur du modèle Flex de Container Registry.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Nettoyer les ressources du projet Google Cloud

  1. Supprimez les tâches Cloud Scheduler.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Supprimez l'abonnement et le sujet Pub/Sub.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. Supprimez la table BigQuery.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. Supprimez l'ensemble de données BigQuery, ce qui ne génère aucuns frais.

    La commande suivante supprime également toutes les tables de l'ensemble de données. Les tables et les données ne peuvent pas être récupérées.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Supprimez le bucket Cloud Storage, ce qui ne génère aucuns frais.

    La commande suivante supprime également tous les objets du bucket. Ces objets ne peuvent pas être récupérés.

    gsutil rm -r gs://$BUCKET
    

Limites

Les limites suivantes s'appliquent aux tâches de modèles Flex :

  • Vous devez utiliser une image de base fournie par Google pour empaqueter vos conteneurs à l'aide de Docker. Pour obtenir la liste des images applicables, consultez la section Images de base des modèles Flex.
  • Le programme qui construit le pipeline doit se fermer après l'appel de run afin que le pipeline puisse démarrer.
  • waitUntilFinish (Java) et wait_until_finish (Python) ne sont pas acceptés.

Étape suivante