Le modèle proto Pub/Sub vers BigQuery est un pipeline de streaming qui ingère les données proto d'un abonnement Pub/Sub dans une table BigQuery.
Les erreurs qui se produisent lors de l'écriture dans la table BigQuery sont insérées en flux continu dans un sujet Pub/Sub non traité.
Une fonction définie par l'utilisateur (UDF) JavaScript peut être fournie pour transformer les données. Les erreurs lors de l'exécution de l'UDF peuvent être envoyées à un sujet Pub/Sub distinct ou au même sujet non traité que les erreurs BigQuery.
Avant d'exécuter un pipeline Dataflow pour ce scénario, déterminez si un abonnement Pub/Sub BigQuery avec une UDF répond à vos besoins.
Conditions requises pour ce pipeline
- L'abonnement Pub/Sub d'entrée doit exister.
- Le fichier de schéma des enregistrements proto doit exister dans Cloud Storage.
- Le sujet Pub/Sub de sortie doit exister.
- L'ensemble de données BigQuery de sortie doit exister.
- Si la table BigQuery existe, elle doit posséder un schéma correspondant aux données proto, quelle que soit la valeur de
createDisposition
.
Paramètres de modèle
Paramètres obligatoires
- protoSchemaPath (chemin d'accès Cloud Storage au fichier de schéma Proto) : chemin d'accès Cloud Storage à un fichier de jeu de descripteurs autonome. Exemple : gs://MyBucket/schema.pb.
schema.pb
peut être généré en ajoutant--descriptor_set_out=schema.pb
à la commandeprotoc
qui compile les protos. L'option--include_imports
permet de garantir que le fichier est autonome. - fullMessageName (nom complet du message Proto) : nom complet du message (exemple : package.name.MessageName). Si le message est imbriqué dans un autre message, incluez tous les messages avec le délimiteur "." (par exemple, package.name.OuterMessage.InnerMessage). "package.name" doit provenir de l'instruction
package
, et non de l'instructionjava_package
. - inputSubscription (abonnement d'entrée Pub/Sub) : abonnement Pub/Sub à partir duquel lire l'entrée, au format "projects/your-project-id/subscriptions/your-subscription-name" (exemple : projects/your-project-id/subscriptions/your-subscription-name).
- outputTableSpec (table de sortie BigQuery) : emplacement de la table BigQuery dans laquelle écrire la sortie. Le nom doit être au format
<project>:<dataset>.<table_name>
. Le schéma de la table doit correspondre aux objets d'entrée. - outputTopic (sujet Pub/Sub de sortie) : nom du sujet dans lequel les données doivent être publiées, au format "projects/your-project-id/topics/your-topic-name" (exemple : projects/your-project-id/topics/your-topic-name).
Paramètres facultatifs
- preserveProtoFieldNames (Conserver les noms de champ Proto) : indicateur permettant de déterminer si les noms de champ Proto doivent être conservés ou convertis en lowerCamelCase. Si la table existe déjà, cette valeur doit être basée sur ce qui correspond au schéma de la table. Sinon, il déterminera les noms de colonnes de la table créée. La valeur est "True" pour conserver le snake_case proto. "False" convertit les champs en lowerCamelCase. Valeur par défaut : "false".
- bigQueryTableSchemaPath (chemin d'accès au schéma de table BigQuery) : chemin d'accès Cloud Storage au fichier JSON du schéma BigQuery. S'il n'est pas défini, le schéma est obtenu à partir du schéma Proto. (Exemple : gs://MyBucket/bq_schema.json).
- udfOutputTopic (sujet de sortie Pub/Sub pour les échecs d'UDF) : sujet de sortie facultatif auquel envoyer les échecs d'UDF. Si cette option n'est pas définie, les échecs seront écrits dans le même sujet que les échecs BigQuery. (par exemple, projects/votre-id-projet/topics/nom-de-votre-sujet).
- writeDisposition (disposition d'écriture à utiliser pour BigQuery) : BigQuery WriteDisposition. Par exemple, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. La valeur par défaut est WRITE_APPEND.
- createDisposition (propriété CreateDisposition à utiliser pour BigQuery) : BigQuery CreateDisposition. Par exemple, CREATE_IF_NEEDED, CREATE_NEVER. La valeur par défaut est CREATE_IF_NEEDED.
- javascriptTextTransformGcsPath (chemin d'accès Cloud Storage à la source UDF JavaScript) : modèle de chemin d'accès Cloud Storage pour le code JavaScript contenant vos fonctions définies par l'utilisateur. (par exemple : gs://votre-bucket/votre-fonction.js).
- javascriptTextTransformFunctionName (nom de la fonction JavaScript définie par l'utilisateur) : nom de la fonction à appeler à partir de votre fichier JavaScript. N'utilisez que des lettres, des chiffres et des traits de soulignement. (exemple : "transform" ou "transform_udf1").
- javascriptTextTransformReloadIntervalMinutes (Intervalle d'actualisation automatique des UDF JavaScript (minutes)) : définissez l'intervalle que les nœuds de calcul peuvent vérifier pour les modifications des UDF JavaScript afin d'actualiser les fichiers. La valeur par défaut est 0.
- useStorageWriteApi (Utiliser l'API BigQuery Storage Write) : si la valeur est définie sur "true", le pipeline utilise l'API Storage Write lors de l'écriture des données dans BigQuery (voir https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). La valeur par défaut est "false". Lorsque vous utilisez l'API Storage Write en mode "exactement une fois", vous devez définir les paramètres suivants : "Nombre de flux pour l'API BigQuery Storage Write" et "Fréquence de déclenchement en secondes pour l'API BigQuery Storage Write". Si vous activez le mode "au moins une fois" de Dataflow ou si vous définissez le paramètre useStorageWriteApiAtLeastOnce sur "true", vous n'avez pas besoin de définir le nombre de flux ni la fréquence de déclenchement.
- useStorageWriteApiAtLeastOnce (Utiliser la sémantique "au moins une fois" dans l'API BigQuery Storage Write) : ce paramètre ne prend effet que si l'option "Utiliser l'API BigQuery Storage Write" est activée. Si cette option est activée, la sémantique de type "au moins une fois" est utilisée pour l'API Storage Write. Sinon, la sémantique de type "exactement une fois" est utilisée. La valeur par défaut est "false".
- numStorageWriteApiStreams (Nombre de flux pour l'API BigQuery Storage Write) : le nombre de flux définit le parallélisme de la transformation d'écriture de BigQueryIO et correspond à peu près au nombre de flux de l'API Storage Write qui seront utilisés par le pipeline. Consultez la page https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api pour connaître les valeurs recommandées. La valeur par défaut est 0.
- storageWriteApiTriggeringFrequencySec (fréquence de déclenchement en secondes pour l'API BigQuery Storage Write) : la fréquence de déclenchement détermine la date à laquelle les données seront visibles pour les requêtes dans BigQuery. Consultez la page https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api pour connaître les valeurs recommandées.
Fonction définie par l'utilisateur
Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Spécification de la fonction
La spécification de l'UDF se présente comme suit :
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub Proto to BigQuery template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Remplacez les éléments suivants :
JOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: nom du message Proto (par exemple,package.name.MessageName
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryUNPROCESSED_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: chemin d'accès Cloud Storage au fichier de schéma Proto (par exemple,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: nom du message Proto (par exemple,package.name.MessageName
)SUBSCRIPTION_NAME
: nom de l'abonnement d'entrée Pub/SubBIGQUERY_TABLE
: nom de la table de sortie BigQueryUNPROCESSED_TOPIC
: sujet Pub/Sub à utiliser pour la file d'attente non traitée
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.