Datastore to Cloud Storage Text 模板[已弃用]

此模板已弃用,将于 2023 年第三季度移除。请迁移到 Firestore to Cloud Storage Text 模板。

Datastore to Cloud Storage Text 模板是一种批处理流水线,可读取 Datastore 实体并以文本文件形式将其写入 Cloud Storage。您可以提供一个函数以将每个实体处理为 JSON 字符串。如果您未提供此类函数,则输出文件中的每一行都将是一个 JSON 序列化实体。

流水线要求

在运行此流水线之前,必须先在项目中设置 Datastore。

模板参数

必需参数

  • datastoreReadGqlQuery:指定要获取的实体的 GQL (https://cloud.google.com/datastore/docs/reference/gql_reference) 查询。例如 SELECT * FROM MyKind
  • datastoreReadProjectId:您要从中读取数据的 Datastore 实例所属的 Google Cloud 项目的 ID。
  • textWritePrefix:指定数据写入位置的 Cloud Storage 路径前缀。例如 gs://mybucket/somefolder/

可选参数

  • datastoreReadNamespace:所请求实体的命名空间。如需使用默认命名空间,请将此参数留空。
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Datastore to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Datastore_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
datastoreReadGqlQuery="SELECT * FROM DATASTORE_KIND",\
datastoreReadProjectId=DATASTORE_PROJECT_ID,\
datastoreReadNamespace=DATASTORE_NAMESPACE,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
textWritePrefix=gs://BUCKET_NAME/output/

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • DATASTORE_PROJECT_ID:Datastore 实例所在的 Google Cloud 项目 ID
  • DATASTORE_KIND:您的 Datastore 实体的类型
  • DATASTORE_NAMESPACE:Datastore 实体的命名空间
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Datastore_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "SELECT * FROM DATASTORE_KIND"
       "datastoreReadProjectId": "DATASTORE_PROJECT_ID",
       "datastoreReadNamespace": "DATASTORE_NAMESPACE",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • DATASTORE_PROJECT_ID:Datastore 实例所在的 Google Cloud 项目 ID
  • DATASTORE_KIND:您的 Datastore 实体的类型
  • DATASTORE_NAMESPACE:Datastore 实体的命名空间
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.DatastoreToText.DatastoreToTextOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;

/**
 * Dataflow template which copies Datastore Entities to a Text sink. Text is encoded using JSON
 * encoded entity in the v1/Entity rest format: <a
 * href="https://cloud.google.com/datastore/docs/reference/rest/v1/Entity">Entity</a>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Datastore_to_GCS_Text.md">README_Datastore_to_GCS_Text</a>
 * or <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Firestore_to_GCS_Text.md">README_Firestore_to_GCS_Text</a>
 * for instructions on how to use or change this template.
 */
@Template(
    name = "Datastore_to_GCS_Text",
    category = TemplateCategory.LEGACY,
    displayName = "Datastore to Text Files on Cloud Storage [Deprecated]",
    description =
        "The Datastore to Cloud Storage Text template is a batch pipeline that reads Datastore entities and writes them "
            + "to Cloud Storage as text files. You can provide a function to process each entity as a JSON string. "
            + "If you don't provide such a function, every line in the output file will be a JSON-serialized entity.",
    optionsClass = DatastoreToTextOptions.class,
    skipOptions = {
      "firestoreReadNamespace",
      "firestoreReadGqlQuery",
      "firestoreReadProjectId",
      "javascriptTextTransformReloadIntervalMinutes"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/datastore-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"Datastore must be set up in the project before running the pipeline."})
@Template(
    name = "Firestore_to_GCS_Text",
    category = TemplateCategory.BATCH,
    displayName = "Firestore (Datastore mode) to Text Files on Cloud Storage",
    description =
        "The Firestore to Cloud Storage Text template is a batch pipeline that reads Firestore entities and writes them "
            + "to Cloud Storage as text files. You can provide a function to process each entity as a JSON string. "
            + "If you don't provide such a function, every line in the output file will be a JSON-serialized entity.",
    optionsClass = DatastoreToTextOptions.class,
    skipOptions = {
      "datastoreReadNamespace",
      "datastoreReadGqlQuery",
      "datastoreReadProjectId",
      "javascriptTextTransformReloadIntervalMinutes"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/firestore-to-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"Firestore must be set up in the project before running the pipeline."})
public class DatastoreToText {

  public static ValueProvider<String> selectProvidedInput(
      ValueProvider<String> datastoreInput, ValueProvider<String> firestoreInput) {
    return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
  }

  /** Custom PipelineOptions. */
  public interface DatastoreToTextOptions
      extends PipelineOptions,
          DatastoreReadOptions,
          JavascriptTextTransformerOptions,
          FilesystemWriteOptions {}

  /**
   * Runs a pipeline which reads in Entities from Datastore, passes in the JSON encoded Entities to
   * a Javascript UDF, and writes the JSON to TextIO sink.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    DatastoreToTextOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(DatastoreToTextOptions.class);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(
            ReadJsonEntities.newBuilder()
                .setGqlQuery(
                    selectProvidedInput(
                        options.getDatastoreReadGqlQuery(), options.getFirestoreReadGqlQuery()))
                .setProjectId(
                    selectProvidedInput(
                        options.getDatastoreReadProjectId(), options.getFirestoreReadProjectId()))
                .setNamespace(
                    selectProvidedInput(
                        options.getDatastoreReadNamespace(), options.getFirestoreReadNamespace()))
                .build())
        .apply(
            TransformTextViaJavascript.newBuilder()
                .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                .setFunctionName(options.getJavascriptTextTransformFunctionName())
                .build())
        .apply(TextIO.write().to(options.getTextWritePrefix()).withSuffix(".json"));

    pipeline.run();
  }
}

后续步骤