Modelo do Cloud Storage para o Elasticsearch

O modelo do Cloud Storage para o Elasticsearch é um pipeline em lote que lê dados de ficheiros CSV armazenados num contentor do Cloud Storage e escreve os dados no Elasticsearch como documentos JSON.

Requisitos do pipeline

  • O contentor do Cloud Storage tem de existir.
  • Tem de existir um anfitrião do Elasticsearch numa instância da Google Cloud Platform ou no Elasticsearch Cloud que seja acessível a partir do Dataflow.
  • Tem de existir uma tabela do BigQuery para a saída de erros.

Esquema CSV

Se os ficheiros CSV contiverem cabeçalhos, defina o parâmetro containsHeaderstemplate como true.

Caso contrário, crie um ficheiro de esquema JSON que descreva os dados. Especifique o URI do Cloud Storage do ficheiro de esquema no parâmetro jsonSchemaPathtemplate. O exemplo seguinte mostra um esquema JSON:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

Em alternativa, pode fornecer uma função definida pelo utilizador (FDU) que analise o texto CSV e produza documentos do Elasticsearch.

Parâmetros de modelos

Parâmetros obrigatórios

  • deadletterTable: a tabela de mensagens rejeitadas do BigQuery para a qual enviar as inserções com falhas. Por exemplo, your-project:your-dataset.your-table-name.
  • inputFileSpec: o padrão de ficheiro do Cloud Storage para pesquisar ficheiros CSV. Por exemplo, gs://mybucket/test-*.csv.
  • connectionUrl: o URL do Elasticsearch no formato https://hostname:[port]. Se estiver a usar o Elastic Cloud, especifique o CloudID. Por exemplo, https://elasticsearch-host:9200.
  • apiKey: a chave da API codificada em Base64 a usar para autenticação.
  • index: o índice do Elasticsearch para o qual os pedidos são emitidos. Por exemplo, my-index.

Parâmetros opcionais

  • inputFormat: o formato do ficheiro de entrada. A predefinição é CSV.
  • containsHeaders: os ficheiros CSV de entrada contêm um registo de cabeçalho (verdadeiro/falso). Só é necessário se estiver a ler ficheiros CSV. A predefinição é: false.
  • delimitador: o delimitador de colunas dos ficheiros de texto de entrada. Predefinição: ,. Por exemplo, ,.
  • csvFormat: especificação do formato CSV a usar para analisar registos. A predefinição é: Default. Consulte https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html para mais detalhes. Tem de corresponder exatamente aos nomes dos formatos encontrados em: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html.
  • jsonSchemaPath: o caminho para o esquema JSON. A predefinição é null. Por exemplo, gs://path/to/schema.
  • largeNumFiles: defina como verdadeiro se o número de ficheiros for de dezenas de milhares. A predefinição é false.
  • csvFileEncoding: o formato de codificação de carateres do ficheiro CSV. Os valores permitidos são US-ASCII, ISO-8859-1, UTF-8 e UTF-16. A predefinição é: UTF-8.
  • logDetailedCsvConversionErrors: defina como true para ativar o registo de erros detalhado quando a análise CSV falha. Tenha em atenção que isto pode expor dados confidenciais nos registos (por exemplo, se o ficheiro CSV contiver palavras-passe). Predefinição: false.
  • elasticsearchUsername: o nome de utilizador do Elasticsearch para autenticação. Se for especificado, o valor de apiKey é ignorado.
  • elasticsearchPassword: a palavra-passe do Elasticsearch para autenticação. Se for especificado, o valor de apiKey é ignorado.
  • batchSize: o tamanho do lote em número de documentos. A predefinição é 1000.
  • batchSizeBytes: o tamanho do lote em número de bytes. A predefinição é 5242880 (5 MB).
  • maxRetryAttempts: o número máximo de tentativas. Tem de ser superior a zero. A predefinição é no retries.
  • maxRetryDuration: a duração máxima da repetição em milissegundos. Tem de ser superior a zero. A predefinição é no retries.
  • propertyAsIndex: a propriedade no documento a ser indexado cujo valor especifica os metadados _index a incluir com o documento em pedidos em massa. Tem precedência sobre uma FDU _index. A predefinição é none.
  • javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados _index a incluir com o documento em pedidos em massa. A predefinição é none.
  • javaScriptIndexFnName: o nome da função JavaScript da FDU que especifica os metadados _index a incluir no documento em pedidos em massa. A predefinição é none.
  • propertyAsId: uma propriedade no documento a ser indexado cujo valor especifica os metadados _id a incluir com o documento em pedidos em massa. Tem precedência sobre uma FDU _id. A predefinição é none.
  • javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript da função que especifica os metadados _id a incluir com o documento em pedidos em massa. A predefinição é none.
  • javaScriptIdFnName: o nome da função JavaScript de UDF que especifica os metadados _id a incluir no documento em pedidos em massa. A predefinição é none.
  • javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados _type a incluir com documentos em pedidos em massa. A predefinição é none.
  • javaScriptTypeFnName: o nome da função JavaScript da FDU que especifica os metadados _type a incluir no documento em pedidos em massa. A predefinição é none.
  • javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript para a função que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de true ou false. A predefinição é none.
  • javaScriptIsDeleteFnName: o nome da função JavaScript da FDU que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de true ou false. A predefinição é none.
  • usePartialUpdate: se deve usar atualizações parciais (atualizar em vez de criar ou indexar, permitindo documentos parciais) com pedidos do Elasticsearch. A predefinição é false.
  • bulkInsertMethod: se deve usar INDEX (index, permite inserções/atualizações) ou CREATE (create, erros em _id duplicados) com pedidos em massa do Elasticsearch. A predefinição é CREATE.
  • trustSelfSignedCerts: se deve ou não confiar no certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative esta opção como verdadeira para ignorar a validação do certificado SSL. (Predefinição: false).
  • disableCertificateValidation: se for true, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina este parâmetro como true. A predefinição é false.
  • apiKeyKMSEncryptionKey: a chave do Cloud KMS para desencriptar a chave da API. Este parâmetro é obrigatório se o parâmetro apiKeySource estiver definido como KMS. Se este parâmetro for fornecido, transmita uma string apiKey encriptada. Encriptar parâmetros através do ponto final de encriptação da API KMS. Para a chave, use o formato projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: o ID do segredo do Secret Manager para a apiKey. Se o parâmetro apiKeySource estiver definido como SECRET_MANAGER, forneça este parâmetro. Use o formato projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: a origem da chave da API. Os valores permitidos são PLAINTEXT, KMS e SECRET_MANAGER. Este parâmetro é obrigatório quando usa o Secret Manager ou o KMS. Se apiKeySource estiver definido como KMS, tem de fornecer apiKeyKMSEncryptionKey e a apiKey encriptada. Se apiKeySource estiver definido como SECRET_MANAGER, tem de fornecer apiKeySecretId. Se apiKeySource estiver definido como PLAINTEXT, tem de fornecer apiKey. A predefinição é: PLAINTEXT.
  • socketTimeout: se definido, substitui o tempo limite máximo de repetição predefinido e o tempo limite de socket predefinido (30 000 ms) no Elastic RestClient.
  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for myTransform(inJson) { /*...do stuff...*/ }, o nome da função é myTransform. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Funções definidas pelo utilizador

Este modelo suporta funções definidas pelo utilizador (UDFs) em vários pontos do pipeline, descritos abaixo. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.

Função de transformação de texto

Transforma os dados CSV num documento do Elasticsearch.

Parâmetros de modelo:

  • javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javascriptTextTransformFunctionName: o nome da função JavaScript.

Especificação da função:

  • Entrada: uma única linha de um ficheiro CSV de entrada.
  • Saída: um documento JSON convertido em string para inserir no Elasticsearch.

Função ÍNDICE

Devolve o índice ao qual o documento pertence.

Parâmetros de modelo:

  • javaScriptIndexFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIndexFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados do documento._index

Função ID do documento

Devolve o ID do documento.

Parâmetros de modelo:

  • javaScriptIdFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIdFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados do documento._id

Função de eliminação de documentos

Especifica se um documento deve ser eliminado. Para usar esta função, defina o modo de inserção em massa como INDEX e forneça uma função de ID do documento.

Parâmetros de modelo:

  • javaScriptIsDeleteFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptIsDeleteFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: devolve a string "true" para eliminar o documento ou "false" para inserir/atualizar o documento.

Função de tipo de mapeamento

Devolve o tipo de mapeamento do documento.

Parâmetros de modelo:

  • javaScriptTypeFnGcsPath: o URI do Cloud Storage do ficheiro JavaScript.
  • javaScriptTypeFnName: o nome da função JavaScript.

Especificação da função:

  • Entrada: o documento do Elasticsearch, serializado como uma string JSON.
  • Saída: o valor do campo de metadados do documento._type

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Cloud Storage to Elasticsearch template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • INPUT_FILE_SPEC: o seu padrão de ficheiros do Cloud Storage.
  • CONNECTION_URL: o URL do Elasticsearch.
  • APIKEY: a sua chave da API codificada em base64 para autenticação.
  • INDEX: o seu índice do Elasticsearch.
  • DEADLETTER_TABLE: a sua tabela do BigQuery.

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • INPUT_FILE_SPEC: o seu padrão de ficheiros do Cloud Storage.
  • CONNECTION_URL: o URL do Elasticsearch.
  • APIKEY: a sua chave da API codificada em base64 para autenticação.
  • INDEX: o seu índice do Elasticsearch.
  • DEADLETTER_TABLE: a sua tabela do BigQuery.

O que se segue?