Modèle Abonnement Pub/Sub vers BigQuery

Le modèle Abonnement Pub/Sub vers BigQuery est un pipeline de streaming qui lit les messages au format JSON d'un abonnement Pub/Sub et les écrit dans une table BigQuery. Vous pouvez utiliser ce modèle comme solution rapide pour déplacer des données Pub/Sub vers BigQuery. Le modèle lit les messages au format JSON de Pub/Sub et les convertit en éléments BigQuery.

Conditions requises pour ce pipeline

  • Le champ data des messages Pub/Sub doit utiliser le format JSON, décrit dans ce guide JSON. Par exemple, vous pouvez insérer des messages contenant les valeurs du champ data au format {"k1":"v1", "k2":"v2"} dans une table BigQuery comportant deux colonnes nommées k1 et k2, en utilisant un type de données de chaîne ("string").
  • La table de sortie doit exister avant l'exécution du pipeline. Le schéma de la table doit correspondre aux objets JSON d'entrée.

Paramètres de modèle

Paramètres obligatoires

  • outputTableSpec : emplacement de la table de sortie BigQuery, au format <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription : abonnement Pub/Sub en entrée à lire, au format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Paramètres facultatifs

  • outputDeadletterTable : table BigQuery à utiliser pour les messages qui n'atteignent pas la table de sortie, au format <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Si la table n'existe pas, elle est créée lors de l'exécution du pipeline. Si aucune valeur n'est spécifiée, OUTPUT_TABLE_SPEC_error_records est utilisé.
  • javascriptTextTransformGcsPath : URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName : nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.
  • javascriptTextTransformReloadIntervalMinutes : définissez l'intervalle que les nœuds de calcul peuvent vérifier pour les modifications des UDF JavaScript pour actualiser les fichiers. La valeur par défaut est 0.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : champ de données du message Pub/Sub, sérialisé en tant que chaîne JSON.
  • Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.
  • Exécuter le modèle

    Console

    1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
    2. Accéder à la page Créer un job à partir d'un modèle
    3. Dans le champ Nom du job, saisissez un nom de job unique.
    4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

      Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

    5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Subscription to BigQuery template.
    6. Dans les champs fournis, saisissez vos valeurs de paramètres.
    7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement par flux de type "au moins une fois", sélectionnez Au moins une fois.
    8. Cliquez sur Run Job (Exécuter la tâche).

    gcloud

    Dans le shell ou le terminal, exécutez le modèle :

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    Remplacez les éléments suivants :

    • JOB_NAME : nom de job unique de votre choix
    • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION : version du modèle que vous souhaitez utiliser

      Vous pouvez utiliser les valeurs suivantes :

    • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
    • DATASET : votre ensemble de données BigQuery.
    • TABLE_NAME : nom de votre table BigQuery.

    API

    Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
    • JOB_NAME : nom de job unique de votre choix
    • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
    • VERSION : version du modèle que vous souhaitez utiliser

      Vous pouvez utiliser les valeurs suivantes :

    • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
    • DATASET : votre ensemble de données BigQuery.
    • TABLE_NAME : nom de votre table BigQuery.

    Étapes suivantes