Cloud Storage Text para o BigQuery (stream) com modelo UDF em Python

O pipeline do Cloud Storage Text para o BigQuery é um pipeline de streaming que transmite arquivos de texto armazenados no Cloud Storage, os transforma usando uma função definida pelo usuário (UDF) do Python fornecida por você e anexa o resultado ao BigQuery.

O pipeline é executado de modo indefinido e precisa ser encerrado manualmente pelo comando cancelar e não drenar, porque ele usa a transformação Watch, que é um DoFn divisível sem suporte para drenagem.

Requisitos de pipeline

  • Crie um arquivo JSON que descreva o esquema da tabela de saída no BigQuery.

    Verifique se há uma matriz JSON de nível superior intitulada fields e se o conteúdo dela segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Exemplo:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
    
  • Crie um arquivo Python (.py) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.

    O exemplo a seguir divide cada linha de um arquivo CSV, cria um objeto JSON com os valores e retorna uma string JSON:

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)
    

Parâmetros do modelo

Parâmetro Descrição
pythonExternalTextTransformGcsPath O URI do Cloud Storage do arquivo de código Python que define a função definida pelo usuário (UDF) que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName O nome da função definida pelo usuário (UDF) do Python que você quer usar.
JSONPath Local do Cloud Storage do arquivo de esquema do BigQuery, descrito como um JSON. Por exemplo, gs://path/to/my/schema.json.
outputTable A tabela do BigQuery totalmente qualificada. Exemplo: my-project:dataset.table
inputFilePattern Local do Cloud Storage do texto que você quer processar. Por exemplo, gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Diretório temporário para o processo de carregamento do BigQuery. Exemplo: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Tabela de mensagens que não alcançaram a tabela de saída. Por exemplo, my-project:dataset.my-unprocessed-table. Se não existir, será criada durante a execução do pipeline. Se não for especificada, será usada <outputTableSpec>_error_records.

Função definida pelo usuário

Esse modelo requer uma UDF que analisa os arquivos de entrada, conforme descrito em Requisitos do pipeline. O modelo chama a UDF para cada linha de texto em cada arquivo de entrada. Para mais informações sobre como criar UDFs, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: uma única linha de texto de um arquivo de entrada.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Storage Text to BigQuery (Stream) with Python UDF template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
  • PYTHON_FUNCTION: o nome da função definida pelo usuário (UDF) do Python que você quer usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_PYTHON_UDF_FILE: o URI do Cloud Storage do arquivo de código Python que define a função definida pelo usuário (UDF) que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
  • PYTHON_FUNCTION: o nome da função definida pelo usuário (UDF) do Python que você quer usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_PYTHON_UDF_FILE: o URI do Cloud Storage do arquivo de código Python que define a função definida pelo usuário (UDF) que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

A seguir