Fichiers CSV Cloud Storage vers BigQuery

Le pipeline de fichiers CSV Cloud Storage vers BigQuery est un pipeline par lots qui vous permet de lire des données à partir de fichiers CSV stockés dans Cloud Storage et d'ajouter le résultat à une table BigQuery. Les fichiers CSV peuvent être non compressés ou compressés dans les formats répertoriés sur la page du SDK Enum Compression.

Conditions requises pour ce pipeline

Pour utiliser ce modèle, votre pipeline doit répondre aux exigences suivantes.

Fichier JSON de schéma BigQuery

Créez un fichier JSON décrivant votre schéma BigQuery. Assurez-vous que le schéma comporte un tableau JSON de niveau supérieur intitulé BigQuery Schema et que son contenu suit le modèle {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

Le modèle de traitement par lots des fichiers CSV Cloud Storage vers BigQuery n'est pas compatible avec l'importation de données dans les champs STRUCT (Enregistrement) de la table BigQuery cible.

Le code JSON suivant décrit un exemple de schéma BigQuery :

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Schéma des tables d'erreurs

La table BigQuery qui stocke les enregistrements refusés à partir des fichiers CSV doit correspondre au schéma de table défini ici.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Paramètres de modèle

Paramètres obligatoires

  • inputFilePattern : chemin d'accès Cloud Storage au fichier CSV contenant le texte à traiter. (Exemple: gs://your-bucket/path/*.csv).
  • schemaJSONPath : chemin d'accès Cloud Storage au fichier JSON qui définit votre schéma BigQuery.
  • outputTable : nom de la table BigQuery qui stocke vos données traitées. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination.
  • bigQueryLoadingTemporaryDirectory : répertoire temporaire à utiliser lors du processus de chargement de BigQuery. (par exemple, gs://your-bucket/your-files/temp_dir).
  • badRecordsOutputTable : nom de la table BigQuery à utiliser pour stocker les données refusées lors du traitement des fichiers CSV. Si vous réutilisez une table BigQuery existante, les données sont ajoutées à la table de destination. Le schéma de cette table doit correspondre au schéma de la table d'erreurs (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
  • delimiter : délimiteur de colonne utilisé par le fichier CSV. (Exemple: ,).
  • csvFormat : format CSV conforme au format CSV Apache Commons. La valeur par défaut est Default.

Paramètres facultatifs

  • containsHeaders : indique si les en-têtes sont inclus dans le fichier CSV. La valeur par défaut est "false".
  • csvFileEncoding : format d'encodage des caractères du fichier CSV. Les valeurs autorisées sont US-ASCII, ISO-8859-1, UTF-8 et UTF-16. La valeur par défaut est UTF8.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • PATH_TO_CSV_DATA : chemin d'accès Cloud Storage vers vos fichiers CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • BIGQUERY_DESTINATION_TABLE : nom de la table de destination BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE : nom de la table des enregistrements BigQuery incorrects
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
  • DELIMITER : délimiteur de fichier CSV
  • CSV_FORMAT : spécification du format CSV pour l'analyse des enregistrements.
  • CONTAINS_HEADERS : indique si les fichiers CSV contiennent des en-têtes.
  • CSV_FILE_ENCODING : encodage dans les fichiers CSV

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • PATH_TO_CSV_DATA : chemin d'accès Cloud Storage vers vos fichiers CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • BIGQUERY_DESTINATION_TABLE : nom de la table de destination BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE : nom de la table des enregistrements BigQuery incorrects
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
  • DELIMITER : délimiteur de fichier CSV
  • CSV_FORMAT : spécification du format CSV pour l'analyse des enregistrements.
  • CONTAINS_HEADERS : indique si les fichiers CSV contiennent des en-têtes.
  • CSV_FILE_ENCODING : encodage dans les fichiers CSV

Étapes suivantes