Le pipeline de fichiers CSV Cloud Storage vers BigQuery est un pipeline par lots qui vous permet de lire des données à partir de fichiers CSV stockés dans Cloud Storage et d'ajouter le résultat à une table BigQuery.
Les fichiers CSV peuvent être non compressés ou compressés dans les formats répertoriés sur la page du SDK Enum Compression
.
Conditions requises pour ce pipeline
Pour utiliser ce modèle, votre pipeline doit répondre aux exigences suivantes.
Fichier JSON de schéma BigQuery
Créez un fichier JSON décrivant votre schéma BigQuery.
Assurez-vous que le schéma comporte un tableau JSON de niveau supérieur intitulé BigQuery Schema
et que son contenu suit le modèle {"name": "COLUMN_NAME", "type": "DATA_TYPE"}
.
Le modèle de traitement par lots des fichiers CSV Cloud Storage vers BigQuery n'est pas compatible avec l'importation de données dans les champs STRUCT
(Enregistrement) de la table BigQuery cible.
Le code JSON suivant décrit un exemple de schéma BigQuery :
{ "BigQuery Schema": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING" }, { "name": "coffee", "type": "STRING" } ] }
Schéma des tables d'erreurs
La table BigQuery qui stocke les enregistrements refusés à partir des fichiers CSV doit correspondre au schéma de table défini ici.
{ "BigQuery Schema": [ { "name": "RawContent", "type": "STRING" }, { "name": "ErrorMsg", "type": "STRING" } ] }
Paramètres de modèle
Paramètres obligatoires
- inputFilePattern : chemin d'accès Cloud Storage au fichier CSV contenant le texte à traiter. (Exemple: gs://your-bucket/path/*.csv).
- schemaJSONPath : chemin d'accès Cloud Storage au fichier JSON qui définit votre schéma BigQuery.
- outputTable : nom de la table BigQuery qui stocke vos données traitées. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination.
- bigQueryLoadingTemporaryDirectory : répertoire temporaire à utiliser lors du processus de chargement de BigQuery. (par exemple, gs://your-bucket/your-files/temp_dir).
- badRecordsOutputTable : nom de la table BigQuery à utiliser pour stocker les données refusées lors du traitement des fichiers CSV. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination. Le schéma de cette table doit correspondre au schéma de la table d'erreurs (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
- delimiter : délimiteur de colonne utilisé par le fichier CSV. (Exemple: ,).
- csvFormat : format CSV conforme au format CSV Apache Commons. La valeur par défaut est Default.
Paramètres facultatifs
- containsHeaders : indique si les en-têtes sont inclus dans le fichier CSV. La valeur par défaut est "false".
- csvFileEncoding : format d'encodage des caractères du fichier CSV. Les valeurs autorisées sont US-ASCII, ISO-8859-1, UTF-8 et UTF-16. La valeur par défaut est UTF8.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the CSV files on Cloud Storage to BigQuery (Batch) template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \ --region REGION_NAME \ --parameters \ inputFilePattern=PATH_TO_CSV_DATA,\ schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ outputTable=BIGQUERY_DESTINATION_TABLE,\ badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\ csvFormat=CSV_FORMAT,\ delimiter=DELIMITER,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\ containsHeaders=CONTAINS_HEADERS,\ csvFileEncoding=CSV_FILE_ENCODING
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
REGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
PATH_TO_CSV_DATA
: chemin d'accès Cloud Storage vers vos fichiers CSVPATH_TO_BIGQUERY_SCHEMA_JSON
: chemin d'accès Cloud Storage au fichier JSON contenant la définition du schémaBIGQUERY_DESTINATION_TABLE
: nom de la table de destination BigQueryBIGQUERY_BAD_RECORDS_TABLE
: nom de la table des enregistrements BigQuery incorrectsPATH_TO_TEMP_DIR_ON_GCS
: chemin d'accès Cloud Storage au répertoire temporaireDELIMITER
: délimiteur de fichier CSVCSV_FORMAT
: spécification du format CSV pour l'analyse des enregistrements.CONTAINS_HEADERS
: indique si les fichiers CSV contiennent des en-têtes.CSV_FILE_ENCODING
: encodage dans les fichiers CSV
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputFilePattern":"PATH_TO_CSV_DATA", "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "outputTable":"BIGQUERY_DESTINATION_TABLE", "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE", "csvFormat":"CSV_FORMAT", "delimiter":"DELIMITER", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS", "containsHeaders": "CONTAINS_HEADERS", "csvFileEncoding": "CSV_FILE_ENCODING" }, "environment": { "zone": "us-central1-f" } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
LOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
PATH_TO_CSV_DATA
: chemin d'accès Cloud Storage vers vos fichiers CSVPATH_TO_BIGQUERY_SCHEMA_JSON
: chemin d'accès Cloud Storage au fichier JSON contenant la définition du schémaBIGQUERY_DESTINATION_TABLE
: nom de la table de destination BigQueryBIGQUERY_BAD_RECORDS_TABLE
: nom de la table des enregistrements BigQuery incorrectsPATH_TO_TEMP_DIR_ON_GCS
: chemin d'accès Cloud Storage au répertoire temporaireDELIMITER
: délimiteur de fichier CSVCSV_FORMAT
: spécification du format CSV pour l'analyse des enregistrements.CONTAINS_HEADERS
: indique si les fichiers CSV contiennent des en-têtes.CSV_FILE_ENCODING
: encodage dans les fichiers CSV
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.