Modelo do Cloud Storage Text para o BigQuery (stream)

O pipeline do Cloud Storage Text para o BigQuery é um pipeline de streaming que transmite arquivos de texto armazenados no Cloud Storage, os transforma usando uma função definida pelo usuário (UDF) do JavaScript fornecida por você e anexa o resultado ao BigQuery.

O pipeline é executado de modo indefinido e precisa ser encerrado manualmente pelo comando cancelar e não drenar, porque ele usa a transformação Watch, que é um DoFn divisível sem suporte para drenagem.

Requisitos de pipeline

  • Crie um arquivo JSON que descreva o esquema da tabela de saída no BigQuery.

    Verifique se há uma matriz JSON de nível superior intitulada fields e se o conteúdo dela segue o padrão {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Por exemplo:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • Crie um arquivo JavaScript (.js) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.

    O exemplo a seguir divide cada linha de um arquivo CSV, cria um objeto JSON com os valores e retorna uma string JSON:

    function process(inJson) {
      val = inJson.split(",");
    
      const obj = {
        "name": val[0],
        "age": parseInt(val[1])
      };
      return JSON.stringify(obj);
    }

Parâmetros do modelo

Parâmetros obrigatórios

  • inputFilePattern: o caminho gs:// do texto no Cloud Storage que você quer processar. Por exemplo, gs://your-bucket/your-file.txt.
  • JSONPath: o caminho gs:// para o arquivo JSON que define o esquema do BigQuery, armazenado no Cloud Storage. Por exemplo, gs://your-bucket/your-schema.json.
  • outputTable: o local da tabela do BigQuery a ser usada para armazenar os dados processados. Se você reutilizar uma tabela, ela será substituída. Por exemplo, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para conferir exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo, transform_udf1.
  • bigQueryLoadingTemporaryDirectory: diretório temporário para o processo de carregamento do BigQuery. Por exemplo, gs://your-bucket/your-files/temp-dir.

Parâmetros opcionais

  • outputDeadletterTable: tabela de mensagens que não alcançaram a tabela de saída. Se uma tabela não existir, ela será criada durante a execução do pipeline. Se não for especificado, <outputTableSpec>_error_records será usado. Por exemplo, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • useStorageWriteApiAtLeastOnce: esse parâmetro só entra em vigor se Use BigQuery Storage Write API estiver ativado. Se ativada, a semântica do tipo "pelo menos uma vez" será usada para a API Storage Write. Caso contrário, a semântica "exatamente uma" será usada. O padrão é: falso.
  • useStorageWriteApi: se verdadeiro, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro. Padrão: 0.
  • storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro.
  • pythonExternalTextTransformGcsPath: o padrão de caminho do Cloud Storage para o código Python que contém as funções definidas pelo usuário. Por exemplo, gs://your-bucket/your-function.py.
  • javascriptTextTransformReloadIntervalMinutes: especifica a frequência de recarregamento da UDF em minutos. Se o valor for maior que 0, o Dataflow vai verificar periodicamente o arquivo da UDF no Cloud Storage e vai atualizar a UDF se o arquivo for modificado. Com esse parâmetro, é possível atualizar a UDF enquanto o pipeline está em execução, sem precisar reiniciar o job. Se o valor for 0, o recarregamento da UDF será desativado. O valor padrão é 0.

Função definida pelo usuário

Esse modelo requer uma UDF que analisa os arquivos de entrada, conforme descrito em Requisitos do pipeline. O modelo chama a UDF para cada linha de texto em cada arquivo de entrada. Para mais informações sobre como criar UDFs, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Especificação da função

A UDF tem a seguinte especificação:

  • Entrada: uma única linha de texto de um arquivo de entrada.
  • Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Storage Text to BigQuery (Stream) template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar

    Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquema
  • PATH_TO_JAVASCRIPT_UDF_FILE: O URI do Cloud Storage do arquivo .js que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: o caminho do Cloud Storage para o conjunto de dados de texto
  • BIGQUERY_TABLE: o nome da tabela do BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: o nome da tabela do BigQuery para mensagens não processadas
  • PATH_TO_TEMP_DIR_ON_GCS: o caminho do Cloud Storage para o diretório temporário

A seguir