Modelo do BigQuery para Elasticsearch

O modelo do BigQuery para Elasticsearch é um pipeline em lote que ingere dados de uma tabela do BigQuery no Elasticsearch como documentos. O modelo pode ler toda a tabela ou ler registros específicos usando uma consulta fornecida.

Requisitos de pipeline

  • A tabela de origem do BigQuery precisa existir.
  • Um host do Elasticsearch em uma instância do Google Cloud ou no Elastic Cloud com o Elasticsearch versão 7.0 ou mais recente. Precisa ser acessível nas máquinas de worker do Dataflow.

Parâmetros do modelo

Parâmetro Descrição
connectionUrl URL do Elasticsearch no formato https://hostname:[port] ou especifique o CloudID se estiver usando o Elastic Cloud.
apiKey Chave da API codificada em Base64 usada para autenticação.
index O índice do Elasticsearch para onde as solicitações serão emitidas, por exemplo, my-index.
inputTableSpec (Opcional) Tabela do BigQuery de onde ler para inserir no Elasticsearch. Forneça a tabela ou a consulta. Por exemplo, projectId:datasetId.tablename
query (Opcional) Consulta SQL para extrair dados do BigQuery. Forneça a tabela ou a consulta.
useLegacySql (Opcional) Defina como "true" para usar o SQL legado (somente aplicável se a consulta for fornecida). Padrão: false.
batchSize (Opcional) Tamanho do lote em número de documentos. Padrão: 1000.
batchSizeBytes (Opcional) Tamanho do lote em número de bytes. Padrão: 5242880 (5 mb).
maxRetryAttempts (Opcional) Máximo de tentativas de repetição. Precisa ser > 0. Padrão: no retries.
maxRetryDuration (Opcional) A duração máxima da nova tentativa em milissegundos precisa ser maior que 0. Padrão: no retries.
propertyAsIndex (Opcional) Uma propriedade no documento que está sendo indexado com o valor que especificará os metadados de _index a serem incluídos com o documento na solicitação em massa (tem precedência sobre uma UDF _index). Padrão: none.
propertyAsId (Opcional) Uma propriedade no documento que está sendo indexado com o valor que especificará os metadados de _id a serem incluídos com o documento na solicitação em massa (tem precedência sobre uma UDF _id). Padrão: none.
javaScriptIndexFnGcsPath (Opcional) O caminho do Cloud Storage para a origem UDF em JavaScript de uma função que especificará os metadados de _index a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptIndexFnName (Opcional) Nome da função UDF em JavaScript para a função que especificará os metadados de _index a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptIdFnGcsPath (Opcional) O caminho do Cloud Storage para a origem UDF em JavaScript de uma função que especificará os metadados de _id a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptIdFnName (Opcional) Nome da função UDF em JavaScript para a função que especificará os metadados de _id a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptTypeFnGcsPath (Opcional) O caminho do Cloud Storage para a origem UDF em JavaScript de uma função que especificará os metadados de _type a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptTypeFnName (Opcional) Nome da função UDF em JavaScript para a função que especificará os metadados de _type a serem incluídos com o documento na solicitação em massa. Padrão: none.
javaScriptIsDeleteFnGcsPath (Opcional) O caminho do Cloud Storage para a origem UDF em JavaScript de uma função que determina se o documento deve ser excluído em vez de inserido ou atualizado. A função precisa retornar o valor de string "true" ou "false". Padrão: none.
javaScriptIsDeleteFnName (Opcional) Nome da função UDF em JavaScript de uma função que vai determinar se o documento deve ser excluído em vez de inserido ou atualizado. A função precisa retornar o valor de string "true" ou "false". Padrão: none.
usePartialUpdate (Opcional) Indica se as atualizações parciais vão ser usadas (atualizar em vez de criar ou indexar, permitindo documentos parciais) com solicitações Elasticsearch. Padrão: false.
bulkInsertMethod (Opcional) Indica se é necessário usar INDEX (índice, permite ajustes) ou CREATE (criar, erros em _id duplicados) com solicitações em massa do Elasticsearch. Padrão: CREATE.

Funções definidas pelo usuário

Esse modelo é compatível com funções definidas pelo usuário (UDFs) em vários pontos do pipeline, descritas abaixo. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Função de índice

Retorna o índice ao qual o documento pertence.

Parâmetros do modelo:

  • javaScriptIndexFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIndexFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _index do documento.

Função ID do documento

Retorna o ID do documento.

Parâmetros do modelo:

  • javaScriptIdFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIdFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _id do documento.

Função de exclusão de documentos

Especifica se um documento deve ser excluído. Para usar essa função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.

Parâmetros do modelo:

  • javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIsDeleteFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: retorna a string "true" para excluir o documento ou "false" para manter o documento.

Função do tipo de mapeamento

Retorna o tipo de mapeamento do documento.

Parâmetros do modelo:

  • javaScriptTypeFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptTypeFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _type do documento.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the BigQuery to Elasticsearch template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • INPUT_TABLE_SPEC: o nome da tabela do BigQuery
  • CONNECTION_URL: seu URL do Elasticsearch
  • APIKEY: sua chave de API codificada em base64 para autenticação.
  • INDEX: seu índice do Elasticsearch.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • INPUT_TABLE_SPEC: o nome da tabela do BigQuery
  • CONNECTION_URL: seu URL do Elasticsearch
  • APIKEY: sua chave de API codificada em base64 para autenticação.
  • INDEX: seu índice do Elasticsearch.

A seguir