Modèle de fichiers CSV Cloud Storage vers BigQuery

Le pipeline de fichiers CSV Cloud Storage vers BigQuery est un pipeline par lots qui vous permet de lire des données à partir de fichiers CSV stockés dans Cloud Storage et d'ajouter le résultat à une table BigQuery. Les fichiers CSV peuvent être non compressés ou compressés dans les formats répertoriés sur la page du SDK Enum Compression.

Conditions requises pour ce pipeline

Pour utiliser ce modèle, votre pipeline doit répondre aux exigences suivantes.

Fichier JSON de schéma BigQuery

Créez un fichier JSON décrivant votre schéma BigQuery. Assurez-vous que le schéma comporte un tableau JSON de niveau supérieur intitulé BigQuery Schema et que son contenu suit le modèle {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

Le modèle de traitement par lots des fichiers CSV Cloud Storage vers BigQuery n'est pas compatible avec l'importation de données dans les champs STRUCT (Enregistrement) de la table BigQuery cible.

Le code JSON suivant décrit un exemple de schéma BigQuery :

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Schéma des tables d'erreurs

La table BigQuery qui stocke les enregistrements refusés des fichiers CSV doit correspondre au schéma de table défini ici.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Paramètres de modèle

Paramètres Description
inputFilePattern Chemin d'accès Cloud Storage au fichier CSV contenant le texte à traiter. Par exemple, gs://path/to/my/text/data.csv.
schemaJSONPath Chemin d'accès Cloud Storage au fichier JSON qui définit votre schéma BigQuery. Par exemple, gs://path/to/my/schema.json.
outputTable Nom de la table BigQuery qui stocke vos données traitées. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination. Par exemple, my-project-name:my-dataset.my-table.
bigQueryLoadingTemporaryDirectory Répertoire temporaire à utiliser lors du processus de chargement BigQuery. Par exemple, gs://my-bucket/my-files/temp_dir.
badRecordsOutputTable Nom de la table BigQuery à utiliser pour stocker les données refusées lors du traitement des fichiers CSV. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination. Par exemple, my-project-name:my-dataset.my-bad-records-table. Le schéma de cette table doit correspondre au schéma de la table d'erreur.
delimiter Séparateur de colonne des fichiers CSV d'entrée. Par exemple, ",".
csvFormat Spécification du format CSV à utiliser pour l'analyse des enregistrements. Par exemple, Default. Cette valeur doit correspondre exactement aux noms de format de l'énumération CSVFormat.Predefined.
containsHeaders Indique si les fichiers CSV d'entrée contiennent un enregistrement d'en-tête. La valeur par défaut est false.
csvFileEncoding Format d'encodage des caractères de fichier CSV. Les valeurs autorisées sont US-ASCII, ISO-8859-1, UTF-8 et UTF-16. La valeur par défaut est UTF-8.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom unique de la tâche de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • PATH_TO_CSV_DATA : chemin d'accès Cloud Storage vers vos fichiers CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • BIGQUERY_DESTINATION_TABLE : nom de la table de destination BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE : nom de la table des enregistrements BigQuery incorrects
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
  • DELIMITER : délimiteur de fichier CSV
  • CSV_FORMAT : spécification du format CSV pour l'analyse des enregistrements.
  • CONTAINS_HEADERS : indique si les fichiers CSV contiennent des en-têtes.
  • CSV_FILE_ENCODING : encodage dans les fichiers CSV

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom unique de la tâche de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • PATH_TO_CSV_DATA : chemin d'accès Cloud Storage vers vos fichiers CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • BIGQUERY_DESTINATION_TABLE : nom de la table de destination BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE : nom de la table des enregistrements BigQuery incorrects
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
  • DELIMITER : délimiteur de fichier CSV
  • CSV_FORMAT : spécification du format CSV pour l'analyse des enregistrements.
  • CONTAINS_HEADERS : indique si les fichiers CSV contiennent des en-têtes.
  • CSV_FILE_ENCODING : encodage dans les fichiers CSV

Étapes suivantes