BigQuery to Elasticsearch 模板

BigQuery to Elasticsearch 模板是一种批处理流水线,用于将 BigQuery 表中的数据作为文档注入到 Elasticsearch 中。该模板可以读取整个表,也可以使用提供的查询读取特定记录。

流水线要求

  • 源 BigQuery 表必须存在。
  • Google Cloud 实例上或 Elastic Cloud 上使用 Elasticsearch 7.0 版或更高版本的 Elasticsearch 主机。必须可从 Dataflow 工作器机器访问。

模板参数

必需参数

  • connectionUrl:Elasticsearch 网址,格式为 https://hostname:[port]。如果使用 Elastic Cloud,请指定 CloudID。例如 https://elasticsearch-host:9200
  • apiKey:用于身份验证的 Base64 编码 API 密钥。
  • index:发出请求的 Elasticsearch 索引。例如 my-index

可选参数

  • inputTableSpec:要读取的 BigQuery 表。如果指定 inputTableSpec,模板将使用 BigQuery Storage Read API (https://cloud.google.com/bigquery/docs/reference/storage) 直接从 BigQuery 存储读取数据。如需了解 Storage Read API 中的限制,请参阅 https://cloud.google.com/bigquery/docs/reference/storage#limitations。您必须指定 inputTableSpecquery。如果同时设置了这两个参数,则模板会使用 query 参数。例如 <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>
  • outputDeadletterTable:未能到达输出表的消息的 BigQuery 表。如果表不存在,系统会在流水线执行期间创建该表。如果未指定,则系统会使用 <outputTableSpec>_error_records。例如 <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
  • query:用于从 BigQuery 读取数据的 SQL 查询。如果 BigQuery 数据集与 Dataflow 作业位于不同的项目中,请在 SQL 查询中指定完整的数据集名称,例如:<PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>。默认情况下,除非 useLegacySqltrue,否则 query 参数会使用 GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql)。您必须指定 inputTableSpecquery。如果同时设置了这两个参数,则模板会使用 query 参数。例如 select * from sampledb.sample_table
  • useLegacySql:设置为 true 即可使用旧版 SQL。此参数仅在使用 query 参数时适用。默认值为 false
  • queryLocation:在没有底层表权限的情况下从授权视图读取数据时需要使用。例如 US
  • elasticsearchUsername:用于进行身份验证的 Elasticsearch 用户名。如果指定,则系统会忽略 apiKey 的值。
  • elasticsearchPassword:用于进行身份验证的 Elasticsearch 密码。如果指定,则系统会忽略 apiKey 的值。
  • batchSize:按文档数量的批次大小。默认值为 1000
  • batchSizeBytes:批次大小(以字节数为单位)。默认值为 5242880 (5mb)。
  • maxRetryAttempts:重试次数上限。必须大于零。默认值为 no retries
  • maxRetryDuration:重试时长上限(以毫秒为单位)。必须大于零。默认值为 no retries
  • propertyAsIndex:要编入索引的文档中的一个属性,其值指定批量请求要包含在文档中的 _index 元数据。优先于 _index UDF。默认值为 none
  • javaScriptIndexFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定批量请求要包含在文档中的 _index 元数据。默认值为 none
  • javaScriptIndexFnName:UDF JavaScript 函数的名称,该函数指定要将 _index 元数据包含在批量请求的文档中。默认值为 none
  • propertyAsId:要编入索引的文档中的一个属性,其值指定批量请求要包含在文档中的 _id 元数据。优先于 _id UDF。默认值为 none
  • javaScriptIdFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定批量请求要包含在文档中的 _id 元数据。默认值为 none
  • javaScriptIdFnName:UDF JavaScript 函数的名称,该函数指定批量请求要包含在文档中的 _id 元数据。默认值为 none
  • javaScriptTypeFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定要将 _type 元数据包含在批量请求的文档中。默认值为 none
  • javaScriptTypeFnName:UDF JavaScript 函数的名称,该函数指定批量请求要包含在文档中的 _type 元数据。默认值为 none
  • javaScriptIsDeleteFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数确定是否应删除文档,而不是插入或更新文档。该函数会返回字符串值 truefalse。默认值为 none
  • javaScriptIsDeleteFnName:UDF JavaScript 函数的名称,该函数确定是否应删除文档,而不是插入或更新文档。该函数会返回字符串值 truefalse。默认值为 none
  • usePartialUpdate:是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或编入索引,允许部分文档)。默认值为 false
  • bulkInsertMethod:在 Elasticsearch 批量请求中使用 INDEX(编入索引,允许 upsert)还是 CREATE(创建,会对重复 _id 报错)。默认值为 CREATE
  • trustSelfSignedCerts:是否信任自签名证书。已安装的 Elasticsearch 实例可能具有自签名证书,将此参数设置为 true 可绕过对 SSL 证书的验证。(默认值:false)。
  • disableCertificateValidation:如果为 true,则信任自签名 SSL 证书。Elasticsearch 实例可能具有自签名证书。如需绕过对证书的验证,请将此参数设置为 true。默认值为 false
  • apiKeyKMSEncryptionKey:用于解密 API 密钥的 Cloud KMS 密钥。如果 apiKeySource 设置为 KMS,则此参数是必需的。如果提供了此参数,请传入经过加密的 apiKey 字符串。使用 KMS API 加密端点对参数进行加密。对于密钥,请使用 projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME> 格式。请参阅:https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt。例如,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
  • apiKeySecretId:apiKey 的 Secret Manager Secret ID。如果 apiKeySource 设置为 SECRET_MANAGER,请提供此参数。使用 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version` 格式。
  • apiKeySource:API 密钥的来源。允许的值为 PLAINTEXTKMSSECRET_MANAGER。如果您使用的是 Secret Manager 或 KMS,则必须提供此参数。如果 apiKeySource 设置为 KMS,则必须提供 apiKeyKMSEncryptionKey 和加密的 apiKey。如果 apiKeySource 设置为 SECRET_MANAGER,则必须提供 apiKeySecretId。如果 apiKeySource 设置为 PLAINTEXT,则必须提供 apiKey。默认值为 PLAINTEXT。
  • socketTimeout:如果设置,则会覆盖 Elastic RestClient 中的默认重试超时上限和默认套接字超时 (30000 毫秒)。
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。

用户定义的函数

此模板支持流水线中多个位置的用户定义的函数 (UDF),如下所述。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

索引函数

返回文档所属的索引。

模板参数:

  • javaScriptIndexFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIndexFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _index 元数据字段的值。

文档 ID 函数

返回文档 ID。

模板参数:

  • javaScriptIdFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIdFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _id 元数据字段的值。

文档删除函数

指定是否删除文档。如需使用此函数,请将批量插入模式设置为 INDEX 并提供文档 ID 函数

模板参数:

  • javaScriptIsDeleteFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIsDeleteFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:返回字符串 "true" 可删除文档,返回 "false" 可更新/插入文档。

映射类型函数

返回文档的映射类型。

模板参数:

  • javaScriptTypeFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptTypeFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _type 元数据字段的值。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the BigQuery to Elasticsearch template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • INPUT_TABLE_SPEC:您的 BigQuery 表名称。
  • CONNECTION_URL:您的 Elasticsearch 网址。
  • APIKEY:用于身份验证的 base64 编码 API 密钥。
  • INDEX:您的 Elasticsearch 索引。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • INPUT_TABLE_SPEC:您的 BigQuery 表名称。
  • CONNECTION_URL:您的 Elasticsearch 网址。
  • APIKEY:用于身份验证的 base64 编码 API 密钥。
  • INDEX:您的 Elasticsearch 索引。

后续步骤