Modèle Pub/Sub Avro vers BigQuery

Le modèle Pub/Sub Avro vers BigQuery est un pipeline de streaming qui ingère les données Avro d'un abonnement Pub/Sub dans une table BigQuery. Toute erreur survenant lors de l'écriture dans la table BigQuery est traitée dans un sujet Pub/Sub non traité.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub d'entrée doit exister.
  • Le fichier de schéma des enregistrements Avro doit exister dans Cloud Storage.
  • Le sujet Pub/Sub non traité doit exister.
  • L'ensemble de données BigQuery de sortie doit exister.

Paramètres de modèle

Paramètres Description
schemaPath Emplacement Cloud Storage du fichier de schéma Avro. Par exemple, gs://path/to/my/schema.avsc.
inputSubscription Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription>.
outputTopic Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name>.
outputTableSpec Emplacement de la table de sortie BigQuery. Par exemple, <my-project>:<my-dataset>.<my-table>. Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur.
writeDisposition Facultatif : La propriété WriteDisposition de BigQuery. Par exemple, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. Par défaut, WRITE_APPEND.
createDisposition Facultatif : La propriété CreateDisposition de BigQuery. Par exemple, CREATE_IF_NEEDED et CREATE_NEVER. Par défaut, CREATE_IF_NEEDED.
useStorageWriteApi Facultatif : si la valeur est true, le pipeline utilise l'API BigQuery Storage Write. La valeur par défaut est false. Pour en savoir plus, consultez la page Utiliser l'API Storage Write.
useStorageWriteApiAtLeastOnce Facultatif : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois", définissez ce paramètre sur true. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.
numStorageWriteApiStreams Facultatif : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.
storageWriteApiTriggeringFrequencySec Facultatif : spécifie la fréquence de déclenchement, en secondes, Lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Avro to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
  • BIGQUERY_TABLE : nom de la table de sortie BigQuery
  • DEADLETTER_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • SCHEMA_PATH : chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME : nom de l'abonnement d'entrée Pub/Sub
  • BIGQUERY_TABLE : nom de la table de sortie BigQuery
  • DEADLETTER_TOPIC : sujet Pub/Sub à utiliser pour la file d'attente non traitée

Étapes suivantes