Le modèle Pub/Sub Avro vers BigQuery est un pipeline de streaming qui ingère les données Avro d'un abonnement Pub/Sub dans une table BigQuery. Toute erreur survenant lors de l'écriture dans la table BigQuery est traitée dans un sujet Pub/Sub non traité.
Conditions requises pour ce pipeline
- L'abonnement Pub/Sub d'entrée doit exister.
- Le fichier de schéma des enregistrements Avro doit exister dans Cloud Storage.
- Le sujet Pub/Sub non traité doit exister.
- L'ensemble de données BigQuery de sortie doit exister.
Paramètres de modèle
Paramètres obligatoires
- schemaPath : emplacement Cloud Storage du fichier de schéma Avro. Exemple :
gs://path/to/my/schema.avsc
- inputSubscription : abonnement d'entrée Pub/Sub à lire. (Exemple : projects/<ID_PROJET>/subscription/<ID_ABONNEMENT>).
- outputTableSpec : emplacement de la table de sortie BigQuery dans lequel écrire la sortie. Par exemple,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. Selon la propriétécreateDisposition
spécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur. - outputTopic : sujet Pub/Sub à utiliser pour les enregistrements non traités. (Exemple : projects/<ID_PROJET>/topics/<NOM_SUJET>).
Paramètres facultatifs
- useStorageWriteApiAtLeastOnce : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur "true". Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur
false
. Ce paramètre ne s'applique que lorsque la valeur deuseStorageWriteApi
est définie surtrue
. La valeur par défaut estfalse
. - writeDisposition : valeur WriteDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Par exemple,
WRITE_APPEND
,WRITE_EMPTY
ouWRITE_TRUNCATE
. La valeur par défaut estWRITE_APPEND
. - createDisposition : valeur CreateDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Par exemple,
CREATE_IF_NEEDED
etCREATE_NEVER
. La valeur par défaut estCREATE_IF_NEEDED
. - useStorageWriteApi : si cette valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est
false
. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApi
est défini surtrue
etuseStorageWriteApiAtLeastOnce
surfalse
, vous devez définir ce paramètre. La valeur par défaut est 0. - storageWriteApiTriggeringFrequencySec : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si
useStorageWriteApi
est défini surtrue
etuseStorageWriteApiAtLeastOnce
surfalse
, vous devez définir ce paramètre.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Avro to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryDEADLETTER_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Avro (par exemple,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryDEADLETTER_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.