File CSV di Cloud Storage nel modello BigQuery

La pipeline dei file CSV di Cloud Storage in BigQuery è una pipeline batch che consente di leggere i dati dai file CSV archiviati in Cloud Storage e di accodare il risultato a una tabella BigQuery. I file CSV possono essere non compressi o compressi nei formati elencati nella pagina SDK Enum Compression.

Requisiti della pipeline

Per utilizzare questo modello, la pipeline deve soddisfare i seguenti requisiti.

File JSON dello schema BigQuery

Crea un file JSON che descriva lo schema BigQuery. Assicurati che lo schema contenga un array JSON di primo livello denominato BigQuery Schema e che i suoi contenuti rispettino il pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

Il modello batch per i file CSV di Cloud Storage in BigQuery non supporta l'importazione dei dati nei campi STRUCT (Record) della tabella BigQuery di destinazione.

Il seguente JSON descrive uno schema BigQuery di esempio:

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Schema della tabella degli errori

La tabella BigQuery che memorizza i record rifiutati dai file CSV deve corrispondere allo schema della tabella definito qui.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Parametri del modello

Parametri obbligatori

  • inputFilePattern: il percorso Cloud Storage del file CSV contenente il testo da elaborare. Ad esempio, gs://your-bucket/path/*.csv.
  • schemaJSONPath: il percorso in Cloud Storage del file JSON che definisce lo schema BigQuery.
  • outputTable: il nome della tabella BigQuery che memorizza i dati elaborati. Se riutilizzi una tabella BigQuery esistente, i dati vengono aggiunti alla tabella di destinazione.
  • bigQueryLoadingTemporaryDirectory: la directory temporanea da utilizzare durante il processo di caricamento di BigQuery. Ad esempio, gs://your-bucket/your-files/temp_dir.
  • badRecordsOutputTable: il nome della tabella BigQuery da utilizzare per archiviare i dati rifiutati durante l'elaborazione dei file CSV. Se riutilizzi una tabella BigQuery esistente, i dati vengono aggiunti alla tabella di destinazione. Lo schema di questa tabella deve corrispondere a quello della tabella degli errori (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
  • delimiter: il delimitatore di colonna utilizzato dal file CSV. Ad esempio, ,.
  • csvFormat: il formato CSV secondo il formato CSV di Apache Commons. Il valore predefinito è Default.

Parametri facoltativi

  • containsHeaders: indica se le intestazioni sono incluse nel file CSV. Il valore predefinito è false.
  • csvFileEncoding: il formato di codifica dei caratteri del file CSV. I valori consentiti sono US-ASCII, ISO-8859-1, UTF-8 e UTF-16. Il valore predefinito è UTF-8.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • PATH_TO_CSV_DATA: il percorso Cloud Storage dei file CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON: il percorso di Cloud Storage al file JSON contenente la definizione dello schema
  • BIGQUERY_DESTINATION_TABLE: il nome della tabella di destinazione BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: il nome della tabella dei record errati di BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: il percorso di Cloud Storage alla directory temporanea
  • DELIMITER: delimitatore del file CSV
  • CSV_FORMAT: specifica del formato CSV per l'analisi dei record
  • CONTAINS_HEADERS: indica se i file CSV contengono intestazioni
  • CSV_FILE_ENCODING: codifica nei file CSV

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • PATH_TO_CSV_DATA: il percorso Cloud Storage dei file CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON: il percorso di Cloud Storage al file JSON contenente la definizione dello schema
  • BIGQUERY_DESTINATION_TABLE: il nome della tabella di destinazione BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: il nome della tabella dei record errati di BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: il percorso di Cloud Storage alla directory temporanea
  • DELIMITER: delimitatore del file CSV
  • CSV_FORMAT: specifica del formato CSV per l'analisi dei record
  • CONTAINS_HEADERS: indica se i file CSV contengono intestazioni
  • CSV_FILE_ENCODING: codifica nei file CSV

Passaggi successivi