Le modèle proto Pub/Sub vers BigQuery est un pipeline de streaming qui ingère les données proto d'un abonnement Pub/Sub dans une table BigQuery.
Les erreurs qui se produisent lors de l'écriture dans la table BigQuery sont insérées en flux continu dans un sujet Pub/Sub non traité.
Une fonction définie par l'utilisateur (UDF) JavaScript peut être fournie pour transformer les données. Les erreurs lors de l'exécution de l'UDF peuvent être envoyées à un sujet Pub/Sub distinct ou au même sujet non traité que les erreurs BigQuery.
Conditions requises pour ce pipeline
- L'abonnement Pub/Sub d'entrée doit exister.
- Le fichier de schéma des enregistrements proto doit exister dans Cloud Storage.
- Le sujet Pub/Sub de sortie doit exister.
- L'ensemble de données BigQuery de sortie doit exister.
- Si la table BigQuery existe, elle doit posséder un schéma correspondant aux données proto, quelle que soit la valeur de
createDisposition
.
Paramètres de modèle
Paramètre | Description |
---|---|
protoSchemaPath |
Emplacement Cloud Storage du fichier de schéma proto autonome. Par exemple, gs://path/to/my/file.pb .
Ce fichier peut être généré avec l'option --descriptor_set_out de la commande protoc .
L'option --include_imports garantit que le fichier est autonome. |
fullMessageName |
Nom complet du message proto. Par exemple, package.name.MessageName , où package.name est la valeur fournie pour l'instruction package , et non pour l'instruction java_package . |
inputSubscription |
Abonnement en entrée Pub/Sub à lire. Par exemple, projects/<project>/subscriptions/<subscription> . |
outputTopic |
Sujet Pub/Sub à utiliser pour les enregistrements non traités. Par exemple, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
Emplacement de la table de sortie BigQuery. Par exemple, my-project:my_dataset.my_table .
Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du fichier de schéma d'entrée. |
preserveProtoFieldNames |
Facultatif : true pour conserver le nom du champ Proto d'origine au format JSON. false pour utiliser des noms JSON plus standards.
Par exemple, false remplace field_name par fieldName . (Par défaut : false ) |
bigQueryTableSchemaPath |
Facultatif : chemin d'accès Cloud Storage vers le chemin d'accès du schéma BigQuery. Par exemple, gs://path/to/my/schema.json . S'il n'est pas fourni, le schéma est obtenu à partir du schéma Proto. |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur (UDF) que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
javascriptTextTransformReloadIntervalMinutes |
Facultatif : spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est 0, l'actualisation de l'UDF est désactivée. La valeur par défaut est 0. |
udfOutputTopic |
Facultatif : sujet Pub/Sub stockant les erreurs UDF. Par exemple : projects/<project-id>/topics/<topic-name> Si cet élément n'est pas fourni, les erreurs UDF sont envoyées au même sujet que outputTopic . |
writeDisposition |
Facultatif : disposition WriteDisposition de BigQuery.
Par exemple, WRITE_APPEND , WRITE_EMPTY ou WRITE_TRUNCATE . Valeur par défaut : WRITE_APPEND . |
createDisposition |
Facultatif : disposition CreateDisposition de BigQuery.
Par exemple, CREATE_IF_NEEDED et CREATE_NEVER . Valeur par défaut : CREATE_IF_NEEDED . |
useStorageWriteApi |
Facultatif : si la valeur est true , le pipeline utilise l'API BigQuery Storage Write. La valeur par défaut est false . Pour en savoir plus, consultez la page Utiliser l'API Storage Write.
|
useStorageWriteApiAtLeastOnce |
Facultatif : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois", définissez ce paramètre sur true . Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false . Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true . La valeur par défaut est false .
|
numStorageWriteApiStreams |
Facultatif : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false , vous devez définir ce paramètre.
|
storageWriteApiTriggeringFrequencySec |
Facultatif : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false , vous devez définir ce paramètre.
|
Fonction définie par l'utilisateur
Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Spécification de la fonction
La spécification de l'UDF se présente comme suit :
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Proto to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: nom du message Proto (par exemple,package.name.MessageName
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryUNPROCESSED_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: nom du message Proto (par exemple,package.name.MessageName
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryUNPROCESSED_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
Étape suivante
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.