Modelo do BigQuery para o Elasticsearch

O modelo do BigQuery para o Elasticsearch é um pipeline em lote que introduz dados de uma tabela do BigQuery no Elasticsearch como documentos. O modelo pode ler toda a tabela ou ler registos específicos através de uma consulta fornecida.

Requisitos do pipeline

  • A tabela de origem do BigQuery tem de existir.
  • Um anfitrião do Elasticsearch numa instância da Google Cloud Platform ou no Elastic Cloud com a versão 7.0 ou posterior do Elasticsearch. Têm de ser acessíveis a partir das máquinas de trabalho do Dataflow.

Parâmetros de modelos

Parâmetros obrigatórios

  • connectionUrl: o URL do Elasticsearch no formato https://hostname:[port]. Se estiver a usar o Elastic Cloud, especifique o CloudID. Por exemplo, https://elasticsearch-host:9200.
  • apiKey: a chave da API codificada em Base64 a usar para autenticação.
  • index: o índice do Elasticsearch para o qual os pedidos são emitidos. Por exemplo, my-index.

Parâmetros opcionais

  • inputTableSpec: a tabela do BigQuery a partir da qual ler. Se especificar inputTableSpec, o modelo lê os dados diretamente do armazenamento do BigQuery através da API BigQuery Storage Read (https://cloud.google.com/bigquery/docs/reference/storage). Para obter informações sobre as limitações na API Storage Read, consulte https://cloud.google.com/bigquery/docs/reference/storage#limitations. Tem de especificar inputTableSpec ou query. Se definir ambos os parâmetros, o modelo usa o parâmetro query. Por exemplo, <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>.
  • outputDeadletterTable: a tabela do BigQuery para mensagens que não conseguiram alcançar a tabela de saída. Se uma tabela não existir, é criada durante a execução do pipeline. Se não for especificado, é usado <outputTableSpec>_error_records. Por exemplo, <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>.
  • query: a consulta SQL a usar para ler dados do BigQuery. Se o conjunto de dados do BigQuery estiver num projeto diferente do trabalho do Dataflow, especifique o nome completo do conjunto de dados na consulta SQL, por exemplo: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Por predefinição, o parâmetro query usa o GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), a menos que useLegacySql seja true. Tem de especificar inputTableSpec ou query. Se definir ambos os parâmetros, o modelo usa o parâmetro query. Por exemplo, select * from sampledb.sample_table.
  • useLegacySql: definido como true para usar o SQL antigo. Este parâmetro só se aplica quando usa o parâmetro query. A predefinição é false.
  • queryLocation: necessário quando lê a partir de uma vista autorizada sem a autorização da tabela subjacente. Por exemplo, US.
  • queryTempDataset: com esta opção, pode definir um conjunto de dados existente para criar a tabela temporária que armazena os resultados da consulta. Por exemplo, temp_dataset.
  • KMSEncryptionKey: se estiver a ler a partir do BigQuery através da origem de consulta, use esta chave do Cloud KMS para encriptar todas as tabelas temporárias criadas. Por exemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • elasticsearchUsername: o nome de utilizador do Elasticsearch para autenticação. Se for especificado, o valor de apiKey é ignorado.
  • elasticsearchPassword: a palavra-passe do Elasticsearch para autenticação. Se for especificado, o valor de apiKey é ignorado.
  • batchSize: o tamanho do lote em número de documentos. A predefinição é 1000.
  • batchSizeBytes: o tamanho do lote em número de bytes. A predefinição é 5242880 (5 MB).
  • maxRetryAttempts: o número máximo de tentativas. Tem de ser superior a zero. A predefinição é no retries.
  • maxRetryDuration: a duração máxima da repetição em milissegundos. Tem de ser superior a zero. A predefinição é no retries.
  • propertyAsIndex: a propriedade no documento a ser indexado cujo valor especifica os metadados _index a incluir com o documento em pedidos em massa. Tem precedência sobre uma FDU _index. A predefinição é none.
  • javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados _index a incluir com o documento em pedidos em massa. A predefinição é none.
  • javaScriptIndexFnName: o nome da função JavaScript da FDU que especifica os metadados _index a incluir no documento em pedidos em massa. A predefinição é none.
  • propertyAsId: uma propriedade no documento a ser indexado cujo valor especifica os metadados _id a incluir com o documento em pedidos em massa. Tem precedência sobre uma FDU _id. A predefinição é none.
  • javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript da função que especifica os metadados _id a incluir com o documento em pedidos em massa. A predefinição é none.
  • javaScriptIdFnName: o nome da função JavaScript de UDF que especifica os metadados _id a incluir no documento em pedidos em massa. A predefinição é none.
  • javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados _type a incluir com documentos em pedidos em massa. A predefinição é none.
  • javaScriptTypeFnName: o nome da função JavaScript da FDU que especifica os metadados _type a incluir no documento em pedidos em massa. A predefinição é none.
  • javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript para a função que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de true ou false. A predefinição é none.
  • javaScriptIsDeleteFnName: o nome da função JavaScript da FDU que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de true ou false. A predefinição é none.
  • usePartialUpdate: se deve usar atualizações parciais (atualizar em vez de criar ou indexar, permitindo documentos parciais) com pedidos do Elasticsearch. A predefinição é false.
  • bulkInsertMethod: se deve usar INDEX (index, permite inserções/atualizações) ou CREATE (create, erros em _id duplicados) com pedidos em massa do Elasticsearch. A predefinição é CREATE.
  • trustSelfSignedCerts: se deve ou não confiar no certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative esta opção como verdadeira para ignorar a validação do certificado SSL. (Predefinição: false).
  • disableCertificateValidation: se for true, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina este parâmetro como true. A predefinição é false.
  • apiKeyKMSEncryptionKey: a chave do Cloud KMS para desencriptar a chave da API. Este parâmetro é obrigatório se o parâmetro apiKeySource estiver definido como KMS. Se este parâmetro for fornecido, transmita uma string apiKey encriptada. Encriptar parâmetros através do ponto final de encriptação da API KMS. Para a chave, use o formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: o ID do segredo do Secret Manager para a apiKey. Se o parâmetro apiKeySource estiver definido como SECRET_MANAGER, forneça este parâmetro. Use o formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: a origem da chave da API. Os valores permitidos são PLAINTEXT, KMS e SECRET_MANAGER. Este parâmetro é obrigatório quando usa o Secret Manager ou o KMS. Se apiKeySource estiver definido como KMS, tem de fornecer apiKeyKMSEncryptionKey e a apiKey encriptada. Se apiKeySource estiver definido como SECRET_MANAGER, tem de fornecer apiKeySecretId. Se apiKeySource estiver definido como PLAINTEXT, tem de fornecer apiKey. A predefinição é: PLAINTEXT.
  • socketTimeout: se definido, substitui o tempo limite máximo de repetição predefinido e o tempo limite de socket predefinido (30 000 ms) no Elastic RestClient.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Funções definidas pelo utilizador

Este modelo suporta funções definidas pelo utilizador (UDFs) em vários pontos do pipeline, descritos abaixo. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Função ÍNDICE

Devolve o índice ao qual o documento pertence.

Parâmetros de modelo:

  • javaScriptIndexFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIndexFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados do documento._index

Função ID do documento

Devolve o ID do documento.

Parâmetros de modelo:

  • javaScriptIdFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIdFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados do documento._id

Função de eliminação de documentos

Especifica se um documento deve ser eliminado. Para usar esta função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.

Parâmetros de modelo:

  • javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIsDeleteFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: devolve a string "true" para eliminar o documento ou "false" para inserir/atualizar o documento.

Função de tipo de mapeamento

Devolve o tipo de mapeamento do documento.

Parâmetros de modelo:

  • javaScriptTypeFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptTypeFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados do documento._type

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the BigQuery to Elasticsearch template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • INPUT_TABLE_SPEC: o nome da tabela do BigQuery.
  • CONNECTION_URL: o URL do Elasticsearch.
  • APIKEY: a sua chave da API codificada em base64 para autenticação.
  • INDEX: o seu índice do Elasticsearch.

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • INPUT_TABLE_SPEC: o nome da tabela do BigQuery.
  • CONNECTION_URL: o URL do Elasticsearch.
  • APIKEY: a sua chave da API codificada em base64 para autenticação.
  • INDEX: o seu índice do Elasticsearch.

O que se segue?