O modelo do Cloud Storage para o Elasticsearch é um pipeline em lote que lê dados de ficheiros CSV armazenados num contentor do Cloud Storage e escreve os dados no Elasticsearch como documentos JSON.
Requisitos do pipeline
- O contentor do Cloud Storage tem de existir.
- Tem de existir um anfitrião do Elasticsearch numa instância da Google Cloud Platform ou no Elasticsearch Cloud que seja acessível a partir do Dataflow.
- Tem de existir uma tabela do BigQuery para a saída de erros.
Esquema CSV
Se os ficheiros CSV contiverem cabeçalhos, defina o parâmetro containsHeaders
template como true
.
Caso contrário, crie um ficheiro de esquema JSON que descreva os dados. Especifique o URI do Cloud Storage do ficheiro de esquema no parâmetro jsonSchemaPath
template. O exemplo seguinte mostra um esquema JSON:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Em alternativa, pode fornecer uma função definida pelo utilizador (FDU) que analise o texto CSV e produza documentos do Elasticsearch.
Parâmetros de modelos
Parâmetros obrigatórios
- deadletterTable: a tabela de mensagens rejeitadas do BigQuery para a qual enviar as inserções com falhas. Por exemplo,
your-project:your-dataset.your-table-name
. - inputFileSpec: o padrão de ficheiro do Cloud Storage para pesquisar ficheiros CSV. Por exemplo,
gs://mybucket/test-*.csv
. - connectionUrl: o URL do Elasticsearch no formato
https://hostname:[port]
. Se estiver a usar o Elastic Cloud, especifique o CloudID. Por exemplo,https://elasticsearch-host:9200
. - apiKey: a chave da API codificada em Base64 a usar para autenticação.
- index: o índice do Elasticsearch para o qual os pedidos são emitidos. Por exemplo,
my-index
.
Parâmetros opcionais
- inputFormat: o formato do ficheiro de entrada. A predefinição é
CSV
. - containsHeaders: os ficheiros CSV de entrada contêm um registo de cabeçalho (verdadeiro/falso). Só é necessário se estiver a ler ficheiros CSV. A predefinição é: false.
- delimitador: o delimitador de colunas dos ficheiros de texto de entrada. Predefinição:
,
. Por exemplo,,
. - csvFormat: especificação do formato CSV a usar para analisar registos. A predefinição é:
Default
. Consulte https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html para mais detalhes. Tem de corresponder exatamente aos nomes dos formatos encontrados em: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html. - jsonSchemaPath: o caminho para o esquema JSON. A predefinição é
null
. Por exemplo,gs://path/to/schema
. - largeNumFiles: defina como verdadeiro se o número de ficheiros for de dezenas de milhares. A predefinição é
false
. - csvFileEncoding: o formato de codificação de carateres do ficheiro CSV. Os valores permitidos são
US-ASCII
,ISO-8859-1
,UTF-8
eUTF-16
. A predefinição é: UTF-8. - logDetailedCsvConversionErrors: defina como
true
para ativar o registo de erros detalhado quando a análise CSV falha. Tenha em atenção que isto pode expor dados confidenciais nos registos (por exemplo, se o ficheiro CSV contiver palavras-passe). Predefinição:false
. - elasticsearchUsername: o nome de utilizador do Elasticsearch para autenticação. Se for especificado, o valor de
apiKey
é ignorado. - elasticsearchPassword: a palavra-passe do Elasticsearch para autenticação. Se for especificado, o valor de
apiKey
é ignorado. - batchSize: o tamanho do lote em número de documentos. A predefinição é
1000
. - batchSizeBytes: o tamanho do lote em número de bytes. A predefinição é
5242880
(5 MB). - maxRetryAttempts: o número máximo de tentativas. Tem de ser superior a zero. A predefinição é
no retries
. - maxRetryDuration: a duração máxima da repetição em milissegundos. Tem de ser superior a zero. A predefinição é
no retries
. - propertyAsIndex: a propriedade no documento a ser indexado cujo valor especifica os metadados
_index
a incluir com o documento em pedidos em massa. Tem precedência sobre uma FDU_index
. A predefinição énone
. - javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados
_index
a incluir com o documento em pedidos em massa. A predefinição énone
. - javaScriptIndexFnName: o nome da função JavaScript da FDU que especifica os metadados
_index
a incluir no documento em pedidos em massa. A predefinição énone
. - propertyAsId: uma propriedade no documento a ser indexado cujo valor especifica os metadados
_id
a incluir com o documento em pedidos em massa. Tem precedência sobre uma FDU_id
. A predefinição énone
. - javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript da função que especifica os metadados
_id
a incluir com o documento em pedidos em massa. A predefinição énone
. - javaScriptIdFnName: o nome da função JavaScript de UDF que especifica os metadados
_id
a incluir no documento em pedidos em massa. A predefinição énone
. - javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados
_type
a incluir com documentos em pedidos em massa. A predefinição énone
. - javaScriptTypeFnName: o nome da função JavaScript da FDU que especifica os metadados
_type
a incluir no documento em pedidos em massa. A predefinição énone
. - javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript para a função que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de
true
oufalse
. A predefinição énone
. - javaScriptIsDeleteFnName: o nome da função JavaScript da FDU que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de
true
oufalse
. A predefinição énone
. - usePartialUpdate: se deve usar atualizações parciais (atualizar em vez de criar ou indexar, permitindo documentos parciais) com pedidos do Elasticsearch. A predefinição é
false
. - bulkInsertMethod: se deve usar
INDEX
(index, permite inserções/atualizações) ouCREATE
(create, erros em _id duplicados) com pedidos em massa do Elasticsearch. A predefinição éCREATE
. - trustSelfSignedCerts: se deve ou não confiar no certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative esta opção como verdadeira para ignorar a validação do certificado SSL. (Predefinição:
false
). - disableCertificateValidation: se for
true
, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina este parâmetro comotrue
. A predefinição éfalse
. - apiKeyKMSEncryptionKey: a chave do Cloud KMS para desencriptar a chave da API. Este parâmetro é obrigatório se o parâmetro
apiKeySource
estiver definido comoKMS
. Se este parâmetro for fornecido, transmita uma stringapiKey
encriptada. Encriptar parâmetros através do ponto final de encriptação da API KMS. Para a chave, use o formatoprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: o ID do segredo do Secret Manager para a apiKey. Se o parâmetro
apiKeySource
estiver definido comoSECRET_MANAGER
, forneça este parâmetro. Use o formatoprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: a origem da chave da API. Os valores permitidos são
PLAINTEXT
,KMS
eSECRET_MANAGER
. Este parâmetro é obrigatório quando usa o Secret Manager ou o KMS. SeapiKeySource
estiver definido comoKMS
, tem de fornecerapiKeyKMSEncryptionKey
e a apiKey encriptada. SeapiKeySource
estiver definido comoSECRET_MANAGER
, tem de fornecerapiKeySecretId
. SeapiKeySource
estiver definido comoPLAINTEXT
, tem de fornecerapiKey
. A predefinição é: PLAINTEXT. - socketTimeout: se definido, substitui o tempo limite máximo de repetição predefinido e o tempo limite de socket predefinido (30 000 ms) no Elastic RestClient.
- javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função émyTransform
. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
Funções definidas pelo utilizador
Este modelo suporta funções definidas pelo utilizador (UDFs) em vários pontos do pipeline, descritos abaixo. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.
Função de transformação de texto
Transforma os dados CSV num documento do Elasticsearch.
Parâmetros de modelo:
javascriptTextTransformGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javascriptTextTransformFunctionName
: o nome da função JavaScript.
Especificação da função:
- Entrada: uma única linha de um ficheiro CSV de entrada.
- Saída: um documento JSON convertido em string para inserir no Elasticsearch.
Função ÍNDICE
Devolve o índice ao qual o documento pertence.
Parâmetros de modelo:
javaScriptIndexFnGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIndexFnName
: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados do documento.
_index
Função ID do documento
Devolve o ID do documento.
Parâmetros de modelo:
javaScriptIdFnGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIdFnName
: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados do documento.
_id
Função de eliminação de documentos
Especifica se um documento deve ser eliminado. Para usar esta função, defina o modo de inserção em massa como INDEX
e forneça uma função de ID do documento.
Parâmetros de modelo:
javaScriptIsDeleteFnGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIsDeleteFnName
: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: devolve a string
"true"
para eliminar o documento ou"false"
para inserir/atualizar o documento.
Função de tipo de mapeamento
Devolve o tipo de mapeamento do documento.
Parâmetros de modelo:
javaScriptTypeFnGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javaScriptTypeFnName
: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados do documento.
_type
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Cloud Storage to Elasticsearch template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaVERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
INPUT_FILE_SPEC
: o seu padrão de ficheiros do Cloud Storage.CONNECTION_URL
: o URL do Elasticsearch.APIKEY
: a sua chave da API codificada em base64 para autenticação.INDEX
: o seu índice do Elasticsearch.DEADLETTER_TABLE
: a sua tabela do BigQuery.
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaVERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
INPUT_FILE_SPEC
: o seu padrão de ficheiros do Cloud Storage.CONNECTION_URL
: o URL do Elasticsearch.APIKEY
: a sua chave da API codificada em base64 para autenticação.INDEX
: o seu índice do Elasticsearch.DEADLETTER_TABLE
: a sua tabela do BigQuery.
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.