Modelo de ficheiros CSV do Cloud Storage para o BigQuery

O pipeline de ficheiros CSV do Cloud Storage para o BigQuery é um pipeline em lote que lhe permite ler dados de ficheiros CSV armazenados no Cloud Storage e anexar o resultado a uma tabela do BigQuery. Os ficheiros CSV podem ser não comprimidos ou comprimidos nos formatos indicados na página do SDK de enumeração.Compression

Requisitos do pipeline

Para usar este modelo, o seu pipeline tem de cumprir os seguintes requisitos.

Ficheiro JSON do esquema do BigQuery

Crie um ficheiro JSON que descreva o seu esquema do BigQuery. Certifique-se de que o esquema tem uma matriz JSON de nível superior com o título BigQuery Schema e que o respetivo conteúdo segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

O modelo de lote de ficheiros CSV do Cloud Storage para o BigQuery não suporta a importação de dados para campos STRUCT (registo) na tabela do BigQuery de destino.

O JSON seguinte descreve um exemplo de esquema do BigQuery:

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Esquema da tabela de erros

A tabela do BigQuery que armazena os registos rejeitados de ficheiros CSV tem de corresponder ao esquema da tabela definido aqui.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Parâmetros de modelos

Parâmetros obrigatórios

  • inputFilePattern: o caminho do Cloud Storage para o ficheiro CSV que contém o texto a processar. Por exemplo, gs://your-bucket/path/*.csv.
  • schemaJSONPath: o caminho do Cloud Storage para o ficheiro JSON que define o seu esquema do BigQuery.
  • outputTable: o nome da tabela do BigQuery que armazena os seus dados processados. Se reutilizar uma tabela do BigQuery existente, os dados são anexados à tabela de destino.
  • bigQueryLoadingTemporaryDirectory: o diretório temporário a usar durante o processo de carregamento do BigQuery. Por exemplo, gs://your-bucket/your-files/temp_dir.
  • badRecordsOutputTable: o nome da tabela do BigQuery a usar para armazenar os dados rejeitados durante o processamento dos ficheiros CSV. Se reutilizar uma tabela do BigQuery existente, os dados são anexados à tabela de destino. O esquema desta tabela tem de corresponder ao esquema da tabela de erros (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
  • delimitador: o delimitador de colunas que o ficheiro CSV usa. Por exemplo, ,.
  • csvFormat: o formato CSV de acordo com o formato Apache Commons CSV. Predefinição: Default.

Parâmetros opcionais

  • containsHeaders: indica se os cabeçalhos estão incluídos no ficheiro CSV. Predefinição: false.
  • csvFileEncoding: o formato de codificação de carateres do ficheiro CSV. Os valores permitidos são US-ASCII, ISO-8859-1, UTF-8 e UTF-16. A predefinição é: UTF-8.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • PATH_TO_CSV_DATA: o caminho do Cloud Storage para os seus ficheiros CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquema
  • BIGQUERY_DESTINATION_TABLE: o nome da tabela de destino do BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: o nome da tabela de registos inválidos do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
  • DELIMITER: delimitador do ficheiro CSV
  • CSV_FORMAT: especificação do formato CSV para analisar registos
  • CONTAINS_HEADERS: se os ficheiros CSV contêm cabeçalhos
  • CSV_FILE_ENCODING: codificação nos ficheiros CSV

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • PATH_TO_CSV_DATA: o caminho do Cloud Storage para os seus ficheiros CSV
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquema
  • BIGQUERY_DESTINATION_TABLE: o nome da tabela de destino do BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: o nome da tabela de registos inválidos do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
  • DELIMITER: delimitador do ficheiro CSV
  • CSV_FORMAT: especificação do formato CSV para analisar registos
  • CONTAINS_HEADERS: se os ficheiros CSV contêm cabeçalhos
  • CSV_FILE_ENCODING: codificação nos ficheiros CSV

O que se segue?