O modelo do BigQuery para o Elasticsearch é um pipeline em lote que introduz dados de uma tabela do BigQuery no Elasticsearch como documentos. O modelo pode ler toda a tabela ou ler registos específicos através de uma consulta fornecida.
Requisitos do pipeline
- A tabela de origem do BigQuery tem de existir.
- Um anfitrião do Elasticsearch numa instância da Google Cloud Platform ou no Elastic Cloud com a versão 7.0 ou posterior do Elasticsearch. Têm de ser acessíveis a partir das máquinas de trabalho do Dataflow.
Parâmetros de modelos
Parâmetros obrigatórios
- connectionUrl: o URL do Elasticsearch no formato
https://hostname:[port]
. Se estiver a usar o Elastic Cloud, especifique o CloudID. Por exemplo,https://elasticsearch-host:9200
. - apiKey: a chave da API codificada em Base64 a usar para autenticação.
- index: o índice do Elasticsearch para o qual os pedidos são emitidos. Por exemplo,
my-index
.
Parâmetros opcionais
- inputTableSpec: a tabela do BigQuery a partir da qual ler. Se especificar
inputTableSpec
, o modelo lê os dados diretamente do armazenamento do BigQuery através da API BigQuery Storage Read (https://cloud.google.com/bigquery/docs/reference/storage). Para obter informações sobre as limitações na API Storage Read, consulte https://cloud.google.com/bigquery/docs/reference/storage#limitations. Tem de especificarinputTableSpec
ouquery
. Se definir ambos os parâmetros, o modelo usa o parâmetroquery
. Por exemplo,<BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>
. - outputDeadletterTable: a tabela do BigQuery para mensagens que não conseguiram alcançar a tabela de saída. Se uma tabela não existir, é criada durante a execução do pipeline. Se não for especificado, é usado
<outputTableSpec>_error_records
. Por exemplo,<PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
. - query: a consulta SQL a usar para ler dados do BigQuery. Se o conjunto de dados do BigQuery estiver num projeto diferente do trabalho do Dataflow, especifique o nome completo do conjunto de dados na consulta SQL, por exemplo: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Por predefinição, o parâmetro
query
usa o GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), a menos queuseLegacySql
sejatrue
. Tem de especificarinputTableSpec
ouquery
. Se definir ambos os parâmetros, o modelo usa o parâmetroquery
. Por exemplo,select * from sampledb.sample_table
. - useLegacySql: definido como
true
para usar o SQL antigo. Este parâmetro só se aplica quando usa o parâmetroquery
. A predefinição éfalse
. - queryLocation: necessário quando lê a partir de uma vista autorizada sem a autorização da tabela subjacente. Por exemplo,
US
. - queryTempDataset: com esta opção, pode definir um conjunto de dados existente para criar a tabela temporária que armazena os resultados da consulta. Por exemplo,
temp_dataset
. - KMSEncryptionKey: se estiver a ler a partir do BigQuery através da origem de consulta, use esta chave do Cloud KMS para encriptar todas as tabelas temporárias criadas. Por exemplo,
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
. - elasticsearchUsername: o nome de utilizador do Elasticsearch para autenticação. Se for especificado, o valor de
apiKey
é ignorado. - elasticsearchPassword: a palavra-passe do Elasticsearch para autenticação. Se for especificado, o valor de
apiKey
é ignorado. - batchSize: o tamanho do lote em número de documentos. A predefinição é
1000
. - batchSizeBytes: o tamanho do lote em número de bytes. A predefinição é
5242880
(5 MB). - maxRetryAttempts: o número máximo de tentativas. Tem de ser superior a zero. A predefinição é
no retries
. - maxRetryDuration: a duração máxima da repetição em milissegundos. Tem de ser superior a zero. A predefinição é
no retries
. - propertyAsIndex: a propriedade no documento a ser indexado cujo valor especifica os metadados
_index
a incluir com o documento em pedidos em massa. Tem precedência sobre uma FDU_index
. A predefinição énone
. - javaScriptIndexFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados
_index
a incluir com o documento em pedidos em massa. A predefinição énone
. - javaScriptIndexFnName: o nome da função JavaScript da FDU que especifica os metadados
_index
a incluir no documento em pedidos em massa. A predefinição énone
. - propertyAsId: uma propriedade no documento a ser indexado cujo valor especifica os metadados
_id
a incluir com o documento em pedidos em massa. Tem precedência sobre uma FDU_id
. A predefinição énone
. - javaScriptIdFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript da função que especifica os metadados
_id
a incluir com o documento em pedidos em massa. A predefinição énone
. - javaScriptIdFnName: o nome da função JavaScript de UDF que especifica os metadados
_id
a incluir no documento em pedidos em massa. A predefinição énone
. - javaScriptTypeFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript de uma função que especifica metadados
_type
a incluir com documentos em pedidos em massa. A predefinição énone
. - javaScriptTypeFnName: o nome da função JavaScript da FDU que especifica os metadados
_type
a incluir no documento em pedidos em massa. A predefinição énone
. - javaScriptIsDeleteFnGcsPath: o caminho do Cloud Storage para a origem da FDU JavaScript para a função que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de
true
oufalse
. A predefinição énone
. - javaScriptIsDeleteFnName: o nome da função JavaScript da FDU que determina se o documento deve ser eliminado em vez de inserido ou atualizado. A função devolve um valor de string de
true
oufalse
. A predefinição énone
. - usePartialUpdate: se deve usar atualizações parciais (atualizar em vez de criar ou indexar, permitindo documentos parciais) com pedidos do Elasticsearch. A predefinição é
false
. - bulkInsertMethod: se deve usar
INDEX
(index, permite inserções/atualizações) ouCREATE
(create, erros em _id duplicados) com pedidos em massa do Elasticsearch. A predefinição éCREATE
. - trustSelfSignedCerts: se deve ou não confiar no certificado autoassinado. Uma instância do Elasticsearch instalada pode ter um certificado autoassinado. Ative esta opção como verdadeira para ignorar a validação do certificado SSL. (Predefinição:
false
). - disableCertificateValidation: se for
true
, confie no certificado SSL autoassinado. Uma instância do Elasticsearch pode ter um certificado autoassinado. Para ignorar a validação do certificado, defina este parâmetro comotrue
. A predefinição éfalse
. - apiKeyKMSEncryptionKey: a chave do Cloud KMS para desencriptar a chave da API. Este parâmetro é obrigatório se o parâmetro
apiKeySource
estiver definido comoKMS
. Se este parâmetro for fornecido, transmita uma stringapiKey
encriptada. Encriptar parâmetros através do ponto final de encriptação da API KMS. Para a chave, use o formatoprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. Consulte: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Por exemplo,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: o ID do segredo do Secret Manager para a apiKey. Se o parâmetro
apiKeySource
estiver definido comoSECRET_MANAGER
, forneça este parâmetro. Use o formatoprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: a origem da chave da API. Os valores permitidos são
PLAINTEXT
,KMS
eSECRET_MANAGER
. Este parâmetro é obrigatório quando usa o Secret Manager ou o KMS. SeapiKeySource
estiver definido comoKMS
, tem de fornecerapiKeyKMSEncryptionKey
e a apiKey encriptada. SeapiKeySource
estiver definido comoSECRET_MANAGER
, tem de fornecerapiKeySecretId
. SeapiKeySource
estiver definido comoPLAINTEXT
, tem de fornecerapiKey
. A predefinição é: PLAINTEXT. - socketTimeout: se definido, substitui o tempo limite máximo de repetição predefinido e o tempo limite de socket predefinido (30 000 ms) no Elastic RestClient.
- javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro .js que define a função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript a usar. Por exemplo, se o código da função JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função émyTransform
. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
Funções definidas pelo utilizador
Este modelo suporta funções definidas pelo utilizador (UDFs) em vários pontos do pipeline, descritos abaixo. Para mais informações, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.
Função ÍNDICE
Devolve o índice ao qual o documento pertence.
Parâmetros de modelo:
javaScriptIndexFnGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIndexFnName
: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados do documento.
_index
Função ID do documento
Devolve o ID do documento.
Parâmetros de modelo:
javaScriptIdFnGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIdFnName
: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados do documento.
_id
Função de eliminação de documentos
Especifica se um documento deve ser eliminado. Para usar esta função, defina o modo de inserção em massa como INDEX
e forneça uma função de ID do documento.
Parâmetros de modelo:
javaScriptIsDeleteFnGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javaScriptIsDeleteFnName
: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: devolve a string
"true"
para eliminar o documento ou"false"
para inserir/atualizar o documento.
Função de tipo de mapeamento
Devolve o tipo de mapeamento do documento.
Parâmetros de modelo:
javaScriptTypeFnGcsPath
: o URI do Cloud Storage do ficheiro JavaScript.javaScriptTypeFnName
: o nome da função JavaScript.
Especificação da função:
- Entrada: o documento do Elasticsearch, serializado como uma string JSON.
- Saída: o valor do campo de metadados do documento.
_type
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the BigQuery to Elasticsearch template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaREGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC
: o nome da tabela do BigQuery.CONNECTION_URL
: o URL do Elasticsearch.APIKEY
: a sua chave da API codificada em base64 para autenticação.INDEX
: o seu índice do Elasticsearch.
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch", } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaLOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC
: o nome da tabela do BigQuery.CONNECTION_URL
: o URL do Elasticsearch.APIKEY
: a sua chave da API codificada em base64 para autenticação.INDEX
: o seu índice do Elasticsearch.
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.