Modelo de texto do Cloud Storage para o BigQuery (stream)

O pipeline de texto do Cloud Storage para o BigQuery é um pipeline de streaming que faz stream de ficheiros de texto armazenados no Cloud Storage, transforma-os através de uma função definida pelo utilizador (UDF) em JavaScript que fornece e anexa o resultado ao BigQuery.

O pipeline é executado indefinidamente e tem de ser terminado manualmente através de um cancelamento e não de uma drenagem, devido à sua utilização da transformação Watch, que é uma DoFn divisível que não suporta drenagem.

Requisitos do pipeline

  • Crie um ficheiro JSON que descreva o esquema da tabela de resultados no BigQuery.

    Certifique-se de que existe uma matriz JSON de nível superior com o título fields e que o respetivo conteúdo segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Por exemplo:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • Crie um ficheiro JavaScript (.js) com a função FDU que fornece a lógica para transformar as linhas de texto. A sua função tem de devolver uma string JSON.

    O exemplo seguinte divide cada linha de um ficheiro CSV, cria um objeto JSON com os valores e devolve uma string JSON:

    function process(inJson) {
      val = inJson.split(",");
    
      const obj = {
        "name": val[0],
        "age": parseInt(val[1])
      };
      return JSON.stringify(obj);
    }

Parâmetros de modelos

Parâmetros obrigatórios

  • inputFilePattern: o caminho gs:// para o texto no Cloud Storage que quer processar. Por exemplo, gs://your-bucket/your-file.txt.
  • JSONPath: o caminho gs:// para o ficheiro JSON que define o seu esquema do BigQuery, armazenado no Cloud Storage. Por exemplo, gs://your-bucket/your-schema.json.
  • outputTable: a localização da tabela do BigQuery a usar para armazenar os dados processados. Se reutilizar uma tabela existente, esta é substituída. Por exemplo, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript que quer usar. Por exemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript que quer usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo, transform_udf1.
  • bigQueryLoadingTemporaryDirectory: diretório temporário para o processo de carregamento do BigQuery. Por exemplo, gs://your-bucket/your-files/temp-dir.

Parâmetros opcionais

  • outputDeadletterTable: tabela para mensagens que não conseguiram alcançar a tabela de resultados. Se uma tabela não existir, é criada durante a execução do pipeline. Se não for especificado, é usado <outputTableSpec>_error_records. Por exemplo, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • useStorageWriteApiAtLeastOnce: este parâmetro só tem efeito se Use BigQuery Storage Write API estiver ativado. Se estiver ativada, a semântica de pelo menos uma vez é usada para a API Storage Write. Caso contrário, é usada a semântica de exatamente uma vez. A predefinição é: false.
  • useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é false. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro. A predefinição é: 0.
  • storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro.
  • pythonExternalTextTransformGcsPath: o padrão do caminho do Cloud Storage para o código Python que contém as suas funções definidas pelo utilizador. Por exemplo, gs://your-bucket/your-function.py.
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência com que o UDF é recarregado, em minutos. Se o valor for superior a 0, o Dataflow verifica periodicamente o ficheiro UDF no Cloud Storage e recarrega a UDF se o ficheiro for modificado. Este parâmetro permite-lhe atualizar a UDF enquanto o pipeline está em execução, sem ter de reiniciar a tarefa. Se o valor for 0, o recarregamento das FDU está desativado. O valor predefinido é 0.

Função definida pelo utilizador

Este modelo requer uma FDU que analise os ficheiros de entrada, conforme descrito nos requisitos do pipeline. O modelo chama a UDF para cada linha de texto em cada ficheiro de entrada. Para mais informações sobre a criação de UDFs, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Especificação da função

A FDU tem a seguinte especificação:

  • Entrada: uma única linha de texto de um ficheiro de entrada.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Cloud Storage Text to BigQuery (Stream) template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo utilizador (FDU) JavaScript que quer usar

    Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte os exemplos de UDFs.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript que quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o seu conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da sua tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex",
   }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo utilizador (FDU) JavaScript que quer usar

    Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte os exemplos de UDFs.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript que quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o seu conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da sua tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

O que se segue?