Modelo do BigQuery para Elasticsearch

O modelo do BigQuery para Elasticsearch é um pipeline em lote que ingere dados de uma tabela do BigQuery no Elasticsearch como documentos. O modelo pode ler toda a tabela ou ler registros específicos usando uma consulta fornecida.

Requisitos de pipeline

  • A tabela de origem do BigQuery precisa existir.
  • Um host do Elasticsearch em uma instância do Google Cloud ou no Elastic Cloud com o Elasticsearch versão 7.0 ou mais recente. Precisa ser acessível nas máquinas de worker do Dataflow.

Parâmetros do modelo

Parâmetros obrigatórios

  • connectionUrl: o URL do Elasticsearch no formato https://hostname:[port]. Se estiver usando o Elastic Cloud, especifique o CloudID. Por exemplo, https://elasticsearch-host:9200.
  • apiKey: a chave de API codificada em Base64 que será usada para autenticação.
  • index: o índice do Elasticsearch para o qual as solicitações são emitidas. Por exemplo, my-index.

Parâmetros opcionais

  • inputTableSpec: a tabela do BigQuery a ser lida. Se você especificar inputTableSpec, o modelo vai ler os dados diretamente do armazenamento do BigQuery usando a API BigQuery Storage Read (https://cloud.google.com/bigquery/docs/reference/storage). Para informações sobre limitações na API Storage Read, consulte https://cloud.google.com/bigquery/docs/reference/storage#limitations. É preciso especificar inputTableSpec ou query. Se você definir os dois parâmetros, o modelo vai usar o parâmetro query. Exemplo: <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>.
  • outputDeadletterTable: a tabela do BigQuery para mensagens que não alcançaram a tabela de saída. Se uma tabela não existir, ela será criada durante a execução do pipeline. Se não for especificado, <outputTableSpec>_error_records será usado. Por exemplo, <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>.
  • consulta: a consulta SQL a ser usada para extrair dados do BigQuery. Se o conjunto de dados do BigQuery estiver em um projeto diferente do job do Dataflow, especifique o nome completo do conjunto de dados na consulta SQL. Por exemplo: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Por padrão, o parâmetro query usa o GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), a menos que useLegacySql seja true. É preciso especificar inputTableSpec ou query. Se você definir os dois parâmetros, o modelo vai usar o query. Por exemplo, select * from sampledb.sample_table.
  • useLegacySql: defina como true para usar o SQL legado. Esse parâmetro só se aplica ao usar o parâmetro query. O padrão é false.
  • queryLocation: necessário ao ler uma visualização autorizada sem a permissão da tabela subjacente. Por exemplo, US.
  • queryTempDataset: com essa opção, é possível definir um conjunto de dados para criar a tabela temporária que armazena os resultados da consulta. Por exemplo, temp_dataset.
  • KMSEncryptionKey: se a leitura for feita do BigQuery usando a origem da consulta, use essa chave do Cloud KMS para criptografar todas as tabelas temporárias criadas. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • elasticsearchUsername: o nome de usuário do Elasticsearch usado para autenticação. Se especificado, o valor de apiKey será ignorado.
  • elasticsearchPassword: a senha do Elasticsearch para autenticação. Se especificado, o valor de apiKey será ignorado.
  • batchSize: o tamanho do lote em número de documentos. O padrão é 1000.
  • batchSizeBytes: o tamanho do lote em número de bytes. O padrão é 5242880 (5 MB).
  • maxRetryAttempts: o número máximo de novas tentativas. Precisa ser maior que zero. O padrão é no retries.
  • maxRetryDuration: a duração máxima da nova tentativa em milissegundos. Precisa ser maior que zero. O padrão é no retries.
  • propertyAsIndex: uma propriedade no documento que está sendo indexada com um valor que especifica os metadados _index a serem incluídos com o documento em solicitações em massa. Tem precedência sobre uma UDF _index. O padrão é none.
  • javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript de uma função que especifica os metadados _index a serem incluídos no documento em solicitações em massa. O padrão é none.
  • javaScriptIndexFnName: o nome da função JavaScript da UDF que especifica os metadados _index a serem incluídos no documento em solicitações em massa. O padrão é none.
  • propertyAsId: uma propriedade no documento que está sendo indexada com um valor que especifica metadados _id a serem incluídos com o documento em solicitações em massa. Tem precedência sobre uma UDF _id. O padrão é none.
  • javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript da função que especifica os metadados _id a serem incluídos no documento em solicitações em massa. O padrão é none.
  • javaScriptIdFnName: o nome da função JavaScript da UDF que especifica os metadados _id a serem incluídos com o documento em solicitações em massa. O padrão é none.
  • javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript de uma função que especifica os metadados _type a serem incluídos com documentos em solicitações em massa. O padrão é none.
  • javaScriptTypeFnName: o nome da função JavaScript da UDF que especifica os metadados _type a serem incluídos com o documento em solicitações em massa. O padrão é none.
  • javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da UDF em JavaScript da função que determina se o documento precisa ser excluído em vez de inserido ou atualizado. A função retorna um valor de string de true ou false. O padrão é none.
  • javaScriptIsDeleteFnName: o nome da função JavaScript da UDF que determina se é necessário excluir o documento em vez de inserir ou atualizar. A função retorna um valor de string de true ou false. O padrão é none.
  • usePartialUpdate: indica se as atualizações parciais vão ser usadas (atualizar em vez de criar ou indexar, permitindo documentos parciais) com solicitações Elasticsearch. O padrão é false.
  • bulkInsertMethod: indica se é necessário usar INDEX (índice, permite ajustes) ou CREATE (criar, erros em _id duplicados) com solicitações em massa do Elasticsearch. O padrão é CREATE.
  • trustSelfSignedCerts: se é possível ou não confiar em um certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative essa opção como "True" para ignorar a validação no certificado SSL. O padrão é false.
  • disableCertificateValidation: se for true, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina esse parâmetro como true. O padrão é false.
  • apiKeyKMSEncryptionKey: a chave do Cloud KMS para descriptografar a chave de API. Este parâmetro será obrigatório se apiKeySource for definido como KMS. Se esse parâmetro for fornecido, transmita uma string apiKey criptografada. Criptografe parâmetros usando o endpoint de criptografia da API KMS. Para a chave, use o formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: o ID do secret do Secret Manager para a chave de API. Se apiKeySource estiver definido como SECRET_MANAGER, forneça esse parâmetro. Use o formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: a origem da chave de API. Os valores permitidos são PLAINTEXT, KMS e SECRET_MANAGER. Esse parâmetro é obrigatório quando você usa o Secret Manager ou o KMS. Se apiKeySource estiver definido como KMS, apiKeyKMSEncryptionKey e a chave apiKey criptografada precisar ser fornecida. Se apiKeySource estiver definido como SECRET_MANAGER, apiKeySecretId precisará ser fornecido. Se apiKeySource estiver definido como PLAINTEXT, apiKey precisará ser fornecido. O padrão é: PLAINTEXT.
  • socketTimeout: se definido, substitui o tempo limite máximo de novas tentativas e o tempo limite de soquete padrão (30000ms) no RestClient do Elastic.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo .js que define a função JavaScript definida pelo usuário (UDF) a ser usada. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo usuário (UDF) do JavaScript a ser usada. Por exemplo, se o código de função do JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função será myTransform. Para ver exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Funções definidas pelo usuário

Esse modelo é compatível com funções definidas pelo usuário (UDFs) em vários pontos do pipeline, descritas abaixo. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.

Função de índice

Retorna o índice ao qual o documento pertence.

Parâmetros do modelo:

  • javaScriptIndexFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIndexFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _index do documento.

Função ID do documento

Retorna o ID do documento.

Parâmetros do modelo:

  • javaScriptIdFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIdFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _id do documento.

Função de exclusão de documentos

Especifica se um documento deve ser excluído. Para usar essa função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.

Parâmetros do modelo:

  • javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptIsDeleteFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: retorna a string "true" para excluir o documento ou "false" para manter o documento.

Função do tipo de mapeamento

Retorna o tipo de mapeamento do documento.

Parâmetros do modelo:

  • javaScriptTypeFnGcsPath: o URI do Cloud Storage do arquivo JavaScript.
  • javaScriptTypeFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados _type do documento.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the BigQuery to Elasticsearch template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • INPUT_TABLE_SPEC: o nome da tabela do BigQuery
  • CONNECTION_URL: seu URL do Elasticsearch
  • APIKEY: sua chave de API codificada em base64 para autenticação.
  • INDEX: seu índice do Elasticsearch.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • INPUT_TABLE_SPEC: o nome da tabela do BigQuery
  • CONNECTION_URL: seu URL do Elasticsearch
  • APIKEY: sua chave de API codificada em base64 para autenticação.
  • INDEX: seu índice do Elasticsearch.

A seguir