Le modèle Pub/Sub Avro vers BigQuery est un pipeline de streaming qui ingère les données Avro d'un abonnement Pub/Sub dans une table BigQuery. Toute erreur survenant lors de l'écriture dans la table BigQuery est traitée dans un sujet Pub/Sub non traité.
Conditions requises pour ce pipeline
- L'abonnement Pub/Sub d'entrée doit exister.
- Le fichier de schéma des enregistrements Avro doit exister dans Cloud Storage.
- Le sujet Pub/Sub non traité doit exister.
- L'ensemble de données BigQuery de sortie doit exister.
Paramètres de modèle
Paramètres | Description |
---|---|
schemaPath |
Emplacement Cloud Storage du fichier de schéma Avro. Par exemple, gs://path/to/my/schema.avsc . |
inputSubscription |
Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription> . |
outputTopic |
Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
Emplacement de la table de sortie BigQuery. Par exemple, <my-project>:<my-dataset>.<my-table> .
Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur. |
writeDisposition |
Facultatif : La propriété WriteDisposition de BigQuery.
Par exemple, WRITE_APPEND , WRITE_EMPTY ou WRITE_TRUNCATE . Par défaut, WRITE_APPEND . |
createDisposition |
Facultatif : La propriété CreateDisposition de BigQuery.
Par exemple, CREATE_IF_NEEDED et CREATE_NEVER . Par défaut, CREATE_IF_NEEDED . |
useStorageWriteApi |
Facultatif : si la valeur est true , le pipeline utilise l'API BigQuery Storage Write. La valeur par défaut est false . Pour en savoir plus, consultez la page Utiliser l'API Storage Write.
|
useStorageWriteApiAtLeastOnce |
Facultatif : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois", définissez ce paramètre sur true . Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false . Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true . La valeur par défaut est false .
|
numStorageWriteApiStreams |
Facultatif : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false , vous devez définir ce paramètre.
|
storageWriteApiTriggeringFrequencySec |
Facultatif : spécifie la fréquence de déclenchement, en secondes, Lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false , vous devez définir ce paramètre.
|
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Avro to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryDEADLETTER_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryDEADLETTER_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.