Modelo de texto do Cloud Storage para o BigQuery com UDF do Python

O pipeline de texto do Cloud Storage para o BigQuery com UDF Python é um pipeline em lote que lê ficheiros de texto armazenados no Cloud Storage, transforma-os através de uma função definida pelo utilizador (UDF) Python e anexa o resultado a uma tabela do BigQuery.

Requisitos do pipeline

  • Crie um ficheiro JSON que descreva o seu esquema do BigQuery.

    Certifique-se de que existe uma matriz JSON de nível superior com o título BigQuery Schema e que o respetivo conteúdo segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    O modelo de lote de texto do Cloud Storage para o BigQuery não suporta a importação de dados para campos de STRUCT (registo) na tabela do BigQuery de destino.

    O JSON seguinte descreve um exemplo de esquema do BigQuery:

    {
      "BigQuery Schema": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        },
      ]
    }
  • Crie um ficheiro Python (.py) com a função UDF que fornece a lógica para transformar as linhas de texto. A sua função tem de devolver uma string JSON.

    Por exemplo, esta função divide cada linha de um ficheiro CSV e devolve uma string JSON após transformar os valores.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Parâmetros de modelos

Parâmetro Descrição
JSONPath O caminho gs:// para o ficheiro JSON que define o seu esquema do BigQuery, armazenado no Cloud Storage. Por exemplo, gs://path/to/my/schema.json.
pythonExternalTextTransformGcsPath O URI do Cloud Storage do ficheiro de código Python que define a função definida pelo utilizador (FDU) que quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName O nome da função definida pelo utilizador (UDF) do Python que quer usar.
inputFilePattern O caminho gs:// para o texto no Cloud Storage que quer processar. Por exemplo, gs://path/to/my/text/data.txt.
outputTable O nome da tabela do BigQuery que quer criar para armazenar os seus dados processados. Se reutilizar uma tabela do BigQuery existente, os dados são anexados à tabela de destino. Por exemplo, my-project-name:my-dataset.my-table.
bigQueryLoadingTemporaryDirectory O diretório temporário para o processo de carregamento do BigQuery. Por exemplo, gs://my-bucket/my-files/temp_dir.
useStorageWriteApi Opcional: se true, o pipeline usa a API BigQuery Storage Write. O valor predefinido é false. Para mais informações, consulte o artigo Usar a API Storage Write.
useStorageWriteApiAtLeastOnce Opcional: quando usar a API Storage Write, especifica a semântica de escrita. Para usar a semântica de, pelo menos, uma vez, defina este parâmetro como true. Para usar a semântica exatamente uma vez, defina o parâmetro como false. Este parâmetro aplica-se apenas quando useStorageWriteApi é true. O valor predefinido é false.

Função definida pelo utilizador

Opcionalmente, pode estender este modelo escrevendo uma função definida pelo utilizador (FDU). O modelo chama a FDU para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: uma linha de texto de um ficheiro de entrada do Cloud Storage.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • PYTHON_FUNCTION: O nome da função definida pelo utilizador (FDU) do Python que quer usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquema
  • PATH_TO_PYTHON_UDF_FILE: O URI do Cloud Storage do ficheiro de código Python que define a função definida pelo utilizador (FDU) que quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o seu conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "inputFilePattern":"PATH_TO_TEXT_DATA",
        "outputTable":"BIGQUERY_TABLE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",
   }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • PYTHON_FUNCTION: O nome da função definida pelo utilizador (FDU) do Python que quer usar.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquema
  • PATH_TO_PYTHON_UDF_FILE: O URI do Cloud Storage do ficheiro de código Python que define a função definida pelo utilizador (FDU) que quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o seu conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

O que se segue?