Modelo do Cloud Storage Text para BigQuery com UDF em Python

O pipeline do Cloud Storage Text para BigQuery com UDF para Python é um pipeline em lote que lê arquivos de texto armazenados no Cloud Storage, os transforma usando uma função definida pelo usuário (UDF) do Python e anexa o resultado a uma tabela do BigQuery.

Requisitos de pipeline

  • Crie um arquivo JSON que descreva seu esquema do BigQuery.

    Verifique se há uma matriz JSON de nível superior intitulada BigQuery Schema e se o conteúdo dela segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    O modelo de lote do Cloud Storage Text para BigQuery não é compatível com a importação de dados para os campos STRUCT (Registro) na tabela de destino do BigQuery.

    Veja no JSON a seguir um exemplo de esquema do BigQuery:

    {
      "BigQuery Schema": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        },
      ]
    }
  • Crie um arquivo Python (.py) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.

    Por exemplo, esta função divide cada linha de um arquivo CSV e retorna uma string JSON depois de transformar os valores.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Parâmetros do modelo

Parâmetro Descrição
JSONPath O caminho gs:// para o arquivo JSON que define o esquema do BigQuery, armazenado no Cloud Storage. Por exemplo, gs://path/to/my/schema.json.
pythonExternalTextTransformGcsPath O URI do Cloud Storage do arquivo de código Python que define a função definida pelo usuário (UDF) que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName O nome da função definida pelo usuário (UDF) do Python que você quer usar.
inputFilePattern O caminho gs:// do texto no Cloud Storage que você quer processar. Por exemplo, gs://path/to/my/text/data.txt.
outputTable O nome da tabela do BigQuery que você quer criar para armazenar seus dados processados. Se você reutilizar uma tabela atual do BigQuery, os dados serão anexados à tabela de destino. Por exemplo, my-project-name:my-dataset.my-table
bigQueryLoadingTemporaryDirectory O diretório temporário do processo de carregamento do BigQuery. Por exemplo, gs://my-bucket/my-files/temp_dir
useStorageWriteApi Opcional: se true, o pipeline usa a API BigQuery Storage Write. O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write.
useStorageWriteApiAtLeastOnce Opcional: ao usar a API Storage Write, especifica a semântica de gravação. Para usar semântica pelo menos uma vez, defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.

Função definida pelo usuário

Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: uma linha de texto de um arquivo de entrada do Cloud Storage.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • PYTHON_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_PYTHON_UDF_FILE: o URI do Cloud Storage do arquivo de código Python que define a função definida pelo usuário (UDF) que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "inputFilePattern":"PATH_TO_TEXT_DATA",
        "outputTable":"BIGQUERY_TABLE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • PYTHON_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_PYTHON_UDF_FILE: o URI do Cloud Storage do arquivo de código Python que define a função definida pelo usuário (UDF) que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

A seguir