Modelo de arquivos CSV do Cloud Storage para BigQuery

O pipeline de arquivos CSV do Cloud Storage para BigQuery é um pipeline em lote que permite ler dados de arquivos CSV armazenados no Cloud Storage e anexar o resultado a uma tabela do BigQuery. Os arquivos CSV podem ser descompactados ou compactados nos formatos listados na página do SDK de tipo enumerado Compression.

Requisitos de pipeline

Para usar esse modelo, o pipeline precisa atender aos requisitos a seguir.

Arquivo JSON de esquema do BigQuery

Crie um arquivo JSON que descreva seu esquema do BigQuery. Verifique se o esquema tem uma matriz JSON de nível superior intitulada BigQuery Schema e se o conteúdo dela segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

O modelo de lote do Cloud Storage Text para BigQuery não é compatível com a importação de dados para os campos STRUCT (Registro) na tabela de destino do BigQuery.

Veja no JSON a seguir um exemplo de esquema do BigQuery:

{
  "BigQuery Schema": [
    {
      "name": "location",
      "type": "STRING"
    },
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "age",
      "type": "STRING"
    },
    {
      "name": "color",
      "type": "STRING"
    },
    {
      "name": "coffee",
      "type": "STRING"
    }
  ]
}

Esquema da tabela de erro

A tabela do BigQuery que armazena os registros rejeitados de arquivos CSV precisa corresponder ao esquema de tabela definido aqui.

{
  "BigQuery Schema": [
    {
      "name": "RawContent",
      "type": "STRING"
    },
    {
      "name": "ErrorMsg",
      "type": "STRING"
    }
  ]
}

Parâmetros do modelo

Parâmetros obrigatórios

  • inputFilePattern: o caminho do Cloud Storage para o arquivo CSV que contém o texto a ser processado. Por exemplo, gs://your-bucket/path/*.csv.
  • schemaJSONPath: o caminho do Cloud Storage para o arquivo JSON que define o esquema do BigQuery.
  • outputTable: o nome da tabela do BigQuery que armazena os dados processados. Se você reutilizar uma tabela atual do BigQuery, os dados serão anexados à tabela de destino.
  • bigQueryLoadingTemporaryDirectory: o diretório temporário a ser usado durante o processo de carregamento do BigQuery. Por exemplo, gs://your-bucket/your-files/temp_dir.
  • badRecordsOutputTable: o nome da tabela do BigQuery a ser usada para armazenar os dados rejeitados ao processar os arquivos CSV. Se você reutilizar uma tabela atual do BigQuery, os dados serão anexados à tabela de destino. O esquema desta tabela precisa corresponder ao esquema da tabela de erros (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema ).
  • delimitador: o delimitador de coluna usado pelo arquivo CSV. Por exemplo, ,.
  • csvFormat: o formato CSV de acordo com o formato Apache Commons CSV. O padrão é Default.

Parâmetros opcionais

  • containsHeaders: indica se os cabeçalhos estão incluídos no arquivo CSV. O padrão é false.
  • csvFileEncoding: o formato de codificação de caracteres do arquivo CSV. Os valores permitidos são US-ASCII, ISO-8859-1, UTF-8 e UTF-16. O padrão é UTF-8.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the CSV files on Cloud Storage to BigQuery (Batch) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_CSV_to_BigQuery \
    --region REGION_NAME \
    --parameters \
inputFilePattern=PATH_TO_CSV_DATA,\
schemaJSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
outputTable=BIGQUERY_DESTINATION_TABLE,\
badRecordsOutputTable=BIGQUERY_BAD_RECORDS_TABLE,\
csvFormat=CSV_FORMAT,\
delimiter=DELIMITER,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
containsHeaders=CONTAINS_HEADERS,\
csvFileEncoding=CSV_FILE_ENCODING

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • PATH_TO_CSV_DATA: o caminho do Cloud Storage para os arquivos CSV.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • BIGQUERY_DESTINATION_TABLE: o nome da tabela de destino do BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: o nome da tabela de registros inválidos do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
  • DELIMITER: delimitador de arquivo CSV
  • CSV_FORMAT: especificação de formato CSV para análise de registros
  • CONTAINS_HEADERS: se os arquivos CSV contêm cabeçalhos
  • CSV_FILE_ENCODING: codificação nos arquivos CSV.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_CSV_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern":"PATH_TO_CSV_DATA",
       "schemaJSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "outputTable":"BIGQUERY_DESTINATION_TABLE",
       "badRecordsOutputTable":"BIGQUERY_BAD_RECORDS_TABLE",
       "csvFormat":"CSV_FORMAT",
       "delimiter":"DELIMITER",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "containsHeaders": "CONTAINS_HEADERS",
       "csvFileEncoding": "CSV_FILE_ENCODING"
   },
   "environment": { "zone": "us-central1-f" }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • PATH_TO_CSV_DATA: o caminho do Cloud Storage para os arquivos CSV.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • BIGQUERY_DESTINATION_TABLE: o nome da tabela de destino do BigQuery
  • BIGQUERY_BAD_RECORDS_TABLE: o nome da tabela de registros inválidos do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário
  • DELIMITER: delimitador de arquivo CSV
  • CSV_FORMAT: especificação de formato CSV para análise de registros
  • CONTAINS_HEADERS: se os arquivos CSV contêm cabeçalhos
  • CSV_FILE_ENCODING: codificação nos arquivos CSV.

A seguir