Template Teks Cloud Storage ke Firestore

Template Teks Cloud Storage ke Firestore adalah pipeline batch yang mengimpor dari dokumen JSON yang disimpan di Cloud Storage ke Firestore.

Persyaratan pipeline

Firestore harus diaktifkan di project tujuan.

Format input

Setiap file input harus berisi JSON yang dibatasi baris baru, dengan setiap baris berisi representasi JSON dari jenis data Entity Datastore.

Misalnya, JSON berikut mewakili dokumen dalam koleksi bernama Users. Contoh ini diformat agar mudah dibaca, tetapi setiap dokumen harus muncul sebagai satu baris input.

{
  "key": {
    "partitionId": {
      "projectId": "my-project"
    },
    "path": [
      {
        "kind": "users",
        "name": "alovelace"
      }
    ]
  },
  "properties": {
    "first": {
      "stringValue": "Ada"
    },
    "last": {
      "stringValue": "Lovelace"
    },
    "born": {
      "integerValue": "1815",
      "excludeFromIndexes": true
    }
  }
}

Untuk mengetahui informasi selengkapnya tentang model dokumen, lihat Entity, Properti, dan Kunci.

Parameter template

Parameter yang diperlukan

  • textReadPattern: Pola jalur Cloud Storage yang menentukan lokasi file data teks Anda. Contoh, gs://mybucket/somepath/*.json.
  • firestoreWriteProjectId: ID project Google Cloud tempat menulis entity Firestore.
  • errorWritePath: File output log error yang akan digunakan untuk kegagalan operasi tulis yang terjadi selama pemrosesan. Contoh, gs://your-bucket/errors/.

Parameter opsional

  • javascriptTextTransformGcsPath: URI Cloud Storage file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Contoh, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • firestoreHintNumWorkers: Petunjuk untuk jumlah pekerja yang diharapkan dalam langkah throttling peningkatan kapasitas Firestore. Nilai defaultnya adalah 500.

Fungsi yang ditentukan pengguna (UDF)

Secara opsional, Anda dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: baris teks dari file input Cloud Storage.
  • Output: Entity, yang diserialisasi sebagai string JSON.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Text Files on Cloud Storage to Firestore template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Firestore \
    --region REGION_NAME \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
firestoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • PATH_TO_INPUT_TEXT_FILES: pola file input di Cloud Storage
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: jalur yang Anda inginkan ke file error di Cloud Storage

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Text_to_Firestore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "firestoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • PATH_TO_INPUT_TEXT_FILES: pola file input di Cloud Storage
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: jalur yang Anda inginkan ke file error di Cloud Storage
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.TextToDatastore.TextToDatastoreOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreWriteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.WriteJsonEntities;
import com.google.cloud.teleport.templates.common.ErrorConverters.ErrorWriteOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters.LogErrors;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemReadOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.TupleTag;

/**
 * Dataflow template which reads from a Text Source and writes JSON encoded Entities into Datastore.
 * The JSON is expected to be in the format of: <a
 * href="https://cloud.google.com/datastore/docs/reference/rest/v1/Entity">Entity</a>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Datastore.md">README
 * Datastore</a> or <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Firestore.md">README
 * Firestore</a> for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_Text_to_Datastore",
      category = TemplateCategory.LEGACY,
      displayName = "Text Files on Cloud Storage to Datastore [Deprecated]",
      description =
          "The Cloud Storage Text to Datastore template is a batch pipeline that reads from text files stored in "
              + "Cloud Storage and writes JSON encoded Entities to Datastore. "
              + "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
      optionsClass = TextToDatastoreOptions.class,
      skipOptions = {
        "firestoreWriteProjectId",
        "firestoreWriteEntityKind",
        "firestoreWriteNamespace",
        "firestoreHintNumWorkers",
        "javascriptTextTransformReloadIntervalMinutes",
        // This template doesn't use neither firestoreWriteEntityKind/Namespace
        // nor datastoreWriteEntityKind/Namespace pipeline options.
        "datastoreWriteEntityKind",
        "datastoreWriteNamespace"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-datastore",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {"Datastore must be enabled in the destination project."}),
  @Template(
      name = "GCS_Text_to_Firestore",
      category = TemplateCategory.BATCH,
      displayName = "Text Files on Cloud Storage to Firestore (Datastore mode)",
      description =
          "The Cloud Storage Text to Firestore template is a batch pipeline that reads from text files stored in "
              + "Cloud Storage and writes JSON encoded Entities to Firestore. "
              + "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
      optionsClass = TextToDatastoreOptions.class,
      skipOptions = {
        "datastoreWriteProjectId",
        "datastoreWriteEntityKind",
        "datastoreWriteNamespace",
        "datastoreHintNumWorkers",
        "javascriptTextTransformReloadIntervalMinutes",
        // This template doesn't use neither firestoreWriteEntityKind/Namespace
        // nor datastoreWriteEntityKind/Namespace pipeline options.
        "firestoreWriteEntityKind",
        "firestoreWriteNamespace"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-firestore",
      contactInformation = "https://cloud.google.com/support",
      requirements = {"Firestore must be enabled in the destination project."})
})
public class TextToDatastore {

  public static <T> ValueProvider<T> selectProvidedInput(
      ValueProvider<T> datastoreInput, ValueProvider<T> firestoreInput) {
    return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
  }

  /** TextToDatastore Pipeline Options. */
  public interface TextToDatastoreOptions
      extends PipelineOptions,
          FilesystemReadOptions,
          JavascriptTextTransformerOptions,
          DatastoreWriteOptions,
          ErrorWriteOptions {}

  /**
   * Runs a pipeline which reads from a Text Source, passes the Text to a Javascript UDF, writes the
   * JSON encoded Entities to a TextIO sink.
   *
   * <p>If your Text Source does not contain JSON encoded Entities, then you'll need to supply a
   * Javascript UDF which transforms your data to be JSON encoded Entities.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    TextToDatastoreOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(TextToDatastoreOptions.class);

    TupleTag<String> errorTag = new TupleTag<String>("errors") {};

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(TextIO.read().from(options.getTextReadPattern()))
        .apply(
            TransformTextViaJavascript.newBuilder()
                .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                .setFunctionName(options.getJavascriptTextTransformFunctionName())
                .build())
        .apply(
            WriteJsonEntities.newBuilder()
                .setProjectId(
                    selectProvidedInput(
                        options.getDatastoreWriteProjectId(), options.getFirestoreWriteProjectId()))
                .setHintNumWorkers(
                    selectProvidedInput(
                        options.getDatastoreHintNumWorkers(), options.getFirestoreHintNumWorkers()))
                .setErrorTag(errorTag)
                .build())
        .apply(
            LogErrors.newBuilder()
                .setErrorWritePath(options.getErrorWritePath())
                .setErrorTag(errorTag)
                .build());

    pipeline.run();
  }
}

Langkah berikutnya