Cloud Storage Text to Firestore テンプレート

Cloud Storage Text to Firestore テンプレートは、Cloud Storage に保存されている JSON ドキュメントから Firestore にインポートするバッチ パイプラインです。

パイプラインの要件

Firestore が宛先プロジェクトで有効にされている必要があります。

入力形式

各入力ファイルには改行区切りの JSON が含まれている必要があります。各行には Datastore の Entity データ型の JSON 表現が含まれます。

たとえば、次の JSON は Users という名前のコレクション内のドキュメントを表します。この例では、読みやすいように書式設定されていますが、各ドキュメントは 1 行の入力として表す必要があります。

{
  "key": {
    "partitionId": {
      "projectId": "my-project"
    },
    "path": [
      {
        "kind": "users",
        "name": "alovelace"
      }
    ]
  },
  "properties": {
    "first": {
      "stringValue": "Ada"
    },
    "last": {
      "stringValue": "Lovelace"
    },
    "born": {
      "integerValue": "1815",
      "excludeFromIndexes": true
    }
  }
}

ドキュメント モデルの詳細については、エンティティ、プロパティ、キーをご覧ください。

テンプレートのパラメータ

必須パラメータ

  • textReadPattern: テキスト データファイルの場所を指定する Cloud Storage のパスパターン。例: gs://mybucket/somepath/*.json
  • firestoreWriteProjectId: Firestore エンティティの書き込み先となる Google Cloud プロジェクトの ID。
  • errorWritePath: 処理中に発生したエラーを書き込むために使用するエラーログ出力ファイル(例: gs://your-bucket/errors/)。

オプション パラメータ

  • javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI。たとえば、gs://my-bucket/my-udfs/my_file.js のようにします。
  • javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。
  • firestoreHintNumWorkers: Firestore のランプアップ スロットリング ステップで予想されるワーカー数のヒント。デフォルトは 500 です。

ユーザー定義関数

必要であれば、ユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: Cloud Storage 入力ファイルのテキスト行。
  • 出力: JSON 文字列としてシリアル化された Entity

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Text Files on Cloud Storage to Firestore template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Firestore \
    --region REGION_NAME \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
firestoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • PATH_TO_INPUT_TEXT_FILES: Cloud Storage 上の入力ファイル パターン
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Cloud Storage 上のエラーファイルの目的のパス

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Text_to_Firestore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "firestoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • PATH_TO_INPUT_TEXT_FILES: Cloud Storage 上の入力ファイル パターン
  • JAVASCRIPT_FUNCTION: 使用する JavaScript ユーザー定義関数(UDF)の名前

    たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例をご覧ください。

  • PATH_TO_JAVASCRIPT_UDF_FILE: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Cloud Storage 上のエラーファイルの目的のパス
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.TextToDatastore.TextToDatastoreOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreWriteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.WriteJsonEntities;
import com.google.cloud.teleport.templates.common.ErrorConverters.ErrorWriteOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters.LogErrors;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemReadOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.TupleTag;

/**
 * Dataflow template which reads from a Text Source and writes JSON encoded Entities into Datastore.
 * The JSON is expected to be in the format of: <a
 * href="https://cloud.google.com/datastore/docs/reference/rest/v1/Entity">Entity</a>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Datastore.md">README
 * Datastore</a> or <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Firestore.md">README
 * Firestore</a> for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_Text_to_Datastore",
      category = TemplateCategory.LEGACY,
      displayName = "Text Files on Cloud Storage to Datastore [Deprecated]",
      description =
          "The Cloud Storage Text to Datastore template is a batch pipeline that reads from text files stored in "
              + "Cloud Storage and writes JSON encoded Entities to Datastore. "
              + "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
      optionsClass = TextToDatastoreOptions.class,
      skipOptions = {
        "firestoreWriteProjectId",
        "firestoreWriteEntityKind",
        "firestoreWriteNamespace",
        "firestoreHintNumWorkers",
        "javascriptTextTransformReloadIntervalMinutes",
        // This template doesn't use neither firestoreWriteEntityKind/Namespace
        // nor datastoreWriteEntityKind/Namespace pipeline options.
        "datastoreWriteEntityKind",
        "datastoreWriteNamespace"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-datastore",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {"Datastore must be enabled in the destination project."}),
  @Template(
      name = "GCS_Text_to_Firestore",
      category = TemplateCategory.BATCH,
      displayName = "Text Files on Cloud Storage to Firestore (Datastore mode)",
      description =
          "The Cloud Storage Text to Firestore template is a batch pipeline that reads from text files stored in "
              + "Cloud Storage and writes JSON encoded Entities to Firestore. "
              + "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
      optionsClass = TextToDatastoreOptions.class,
      skipOptions = {
        "datastoreWriteProjectId",
        "datastoreWriteEntityKind",
        "datastoreWriteNamespace",
        "datastoreHintNumWorkers",
        "javascriptTextTransformReloadIntervalMinutes",
        // This template doesn't use neither firestoreWriteEntityKind/Namespace
        // nor datastoreWriteEntityKind/Namespace pipeline options.
        "firestoreWriteEntityKind",
        "firestoreWriteNamespace"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-firestore",
      contactInformation = "https://cloud.google.com/support",
      requirements = {"Firestore must be enabled in the destination project."})
})
public class TextToDatastore {

  public static <T> ValueProvider<T> selectProvidedInput(
      ValueProvider<T> datastoreInput, ValueProvider<T> firestoreInput) {
    return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
  }

  /** TextToDatastore Pipeline Options. */
  public interface TextToDatastoreOptions
      extends PipelineOptions,
          FilesystemReadOptions,
          JavascriptTextTransformerOptions,
          DatastoreWriteOptions,
          ErrorWriteOptions {}

  /**
   * Runs a pipeline which reads from a Text Source, passes the Text to a Javascript UDF, writes the
   * JSON encoded Entities to a TextIO sink.
   *
   * <p>If your Text Source does not contain JSON encoded Entities, then you'll need to supply a
   * Javascript UDF which transforms your data to be JSON encoded Entities.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    TextToDatastoreOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(TextToDatastoreOptions.class);

    TupleTag<String> errorTag = new TupleTag<String>("errors") {};

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(TextIO.read().from(options.getTextReadPattern()))
        .apply(
            TransformTextViaJavascript.newBuilder()
                .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                .setFunctionName(options.getJavascriptTextTransformFunctionName())
                .build())
        .apply(
            WriteJsonEntities.newBuilder()
                .setProjectId(
                    selectProvidedInput(
                        options.getDatastoreWriteProjectId(), options.getFirestoreWriteProjectId()))
                .setHintNumWorkers(
                    selectProvidedInput(
                        options.getDatastoreHintNumWorkers(), options.getFirestoreHintNumWorkers()))
                .setErrorTag(errorTag)
                .build())
        .apply(
            LogErrors.newBuilder()
                .setErrorWritePath(options.getErrorWritePath())
                .setErrorTag(errorTag)
                .build());

    pipeline.run();
  }
}

次のステップ