Modèle Pub/Sub vers BigQuery avec un UDF Python

Le modèle Pub/Sub vers BigQuery avec Python UDF est un pipeline de streaming qui lit les messages au format JSON de Pub/Sub et les écrit dans une table BigQuery. Vous pouvez éventuellement fournir une fonction définie par l'utilisateur (UDF) écrite en Python pour traiter les messages entrants.

Conditions requises pour ce pipeline

  • La table BigQuery doit exister et posséder un schéma.
  • Les données de messages Pub/Sub doivent utiliser le format JSON, ou vous devez fournir une fonction définie par l'utilisateur qui convertit les données du message au format JSON. Les données JSON doivent correspondre au schéma de la table BigQuery. Par exemple, si les charges utiles JSON sont au format {"k1":"v1", "k2":"v2"}, la table BigQuery doit comporter deux colonnes de chaîne nommées k1 et k2.
  • Spécifiez le paramètre inputSubscription ou inputTopic, mais pas les deux.

Paramètres de modèle

Paramètre Description
outputTableSpec Table BigQuery dans laquelle écrire, au format "PROJECT_ID:DATASET_NAME.TABLE_NAME".
inputSubscription Facultatif : Abonnement Pub/Sub à lire, au format "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME".
inputTopic Facultatif : Sujet Pub/Sub à lire, au format "projects/PROJECT_ID/topics/TOPIC_NAME".
outputDeadletterTable Table BigQuery des messages qui n'ont pas pu atteindre la table de sortie, au format "PROJECT_ID:DATASET_NAME.TABLE_NAME". Si la table n'existe pas, elle est créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, la valeur "OUTPUT_TABLE_SPEC_error_records" est utilisée à la place.
pythonExternalTextTransformGcsPath Facultatif : URI Cloud Storage du fichier de code Python qui définit la fonction définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName (Facultatif) : Nom de la fonction Python définie par l'utilisateur (UDF) que vous souhaitez utiliser.
useStorageWriteApi Facultatif : si la valeur est true, le pipeline utilise l'API BigQuery Storage Write. La valeur par défaut est false. Pour en savoir plus, consultez la page Utiliser l'API Storage Write.
useStorageWriteApiAtLeastOnce Facultatif : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois", définissez ce paramètre sur true. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.
numStorageWriteApiStreams Facultatif : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.
storageWriteApiTriggeringFrequencySec Facultatif : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : champ de données du message Pub/Sub, sérialisé en tant que chaîne JSON.
  • Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.
  • Exécuter le modèle

    Console

    1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
    2. Accéder à la page Créer un job à partir d'un modèle
    3. Dans le champ Nom du job, saisissez un nom de job unique.
    4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

      Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

    5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to BigQuery with Python UDF template.
    6. Dans les champs fournis, saisissez vos valeurs de paramètres.
    7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
    8. Cliquez sur Run Job (Exécuter la tâche).

    gcloud

    Dans le shell ou le terminal, exécutez le modèle :

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Remplacez les éléments suivants :

    • JOB_NAME : nom de job unique de votre choix
    • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION : version du modèle que vous souhaitez utiliser

      Vous pouvez utiliser les valeurs suivantes :

    • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
    • TOPIC_NAME : nom de votre sujet Pub/Sub
    • DATASET : votre ensemble de données BigQuery.
    • TABLE_NAME : nom de votre table BigQuery.

    API

    Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
    • JOB_NAME : nom de job unique de votre choix
    • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION : version du modèle que vous souhaitez utiliser

      Vous pouvez utiliser les valeurs suivantes :

    • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
    • TOPIC_NAME : nom de votre sujet Pub/Sub
    • DATASET : votre ensemble de données BigQuery.
    • TABLE_NAME : nom de votre table BigQuery.

    Étape suivante