Vorlage „Cloud Storage Text für Firestore“

Die Vorlage "Cloud Storage Text für Firestore" ist eine Batchpipeline, die aus JSON-Dokumenten, die in Cloud Storage gespeichert sind, Inhalte in Firestore importiert.

Pipelineanforderungen

Cloud Firestore muss im Zielprojekt aktiviert sein.

Eingabeformat

Jede Eingabedatei muss durch Zeilenumbrüche getrennte JSON-Daten enthalten, wobei jede Zeile eine JSON-Darstellung eines Datastore-Datentyps vom Typ Entity enthält.

Das folgende JSON-Objekt stellt beispielsweise ein Dokument in einer Sammlung namens Users dar. Das Beispiel ist für die Lesbarkeit formatiert, aber jedes Dokument muss als einzelne Eingabezeile erscheinen.

{
  "key": {
    "partitionId": {
      "projectId": "my-project"
    },
    "path": [
      {
        "kind": "users",
        "name": "alovelace"
      }
    ]
  },
  "properties": {
    "first": {
      "stringValue": "Ada"
    },
    "last": {
      "stringValue": "Lovelace"
    },
    "born": {
      "integerValue": "1815",
      "excludeFromIndexes": true
    }
  }
}

Weitere Informationen zum Dokumentmodell finden Sie unter Entitäten, Properties und Schlüssel.

Vorlagenparameter

Erforderliche Parameter

  • textReadPattern: Ein Cloud Storage-Pfadmuster, das den Speicherort Ihrer Textdatendateien angibt. Beispiel: gs://mybucket/somepath/*.json.
  • firestoreWriteProjectId: Die ID des Google Cloud-Projekts, in das die Firestore-Entitäten geschrieben werden sollen.
  • errorWritePath: Die Ausgabedatei des Fehler-Logs für Schreibfehler, die während der Verarbeitung auftreten. Beispiel: gs://your-bucket/errors/.

Optionale Parameter

  • javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • firestoreHintNumWorkers: Hinweis für die erwartete Anzahl von Workern im Schritt zur Drosselung der Erhöhung in Firestore. Der Standardwert ist 500.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: eine Textzeile aus einer Cloud Storage-Eingabedatei
  • Ausgabe: Ein Entity, serialisiert als JSON-String.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Text Files on Cloud Storage to Firestore templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Firestore \
    --region REGION_NAME \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
firestoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

Ersetzen Sie dabei Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • PATH_TO_INPUT_TEXT_FILES: Das Muster der Eingabedateien in Cloud Storage
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Der gewünschte Pfad zur Fehlerdatei in Cloud Storage

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Text_to_Firestore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "firestoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • PATH_TO_INPUT_TEXT_FILES: Das Muster der Eingabedateien in Cloud Storage
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Der gewünschte Pfad zur Fehlerdatei in Cloud Storage
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.TextToDatastore.TextToDatastoreOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreWriteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.WriteJsonEntities;
import com.google.cloud.teleport.templates.common.ErrorConverters.ErrorWriteOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters.LogErrors;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemReadOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.TupleTag;

/**
 * Dataflow template which reads from a Text Source and writes JSON encoded Entities into Datastore.
 * The JSON is expected to be in the format of: <a
 * href="https://cloud.google.com/datastore/docs/reference/rest/v1/Entity">Entity</a>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Datastore.md">README
 * Datastore</a> or <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Firestore.md">README
 * Firestore</a> for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_Text_to_Datastore",
      category = TemplateCategory.LEGACY,
      displayName = "Text Files on Cloud Storage to Datastore [Deprecated]",
      description =
          "The Cloud Storage Text to Datastore template is a batch pipeline that reads from text files stored in "
              + "Cloud Storage and writes JSON encoded Entities to Datastore. "
              + "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
      optionsClass = TextToDatastoreOptions.class,
      skipOptions = {
        "firestoreWriteProjectId",
        "firestoreWriteEntityKind",
        "firestoreWriteNamespace",
        "firestoreHintNumWorkers",
        "javascriptTextTransformReloadIntervalMinutes",
        // This template doesn't use neither firestoreWriteEntityKind/Namespace
        // nor datastoreWriteEntityKind/Namespace pipeline options.
        "datastoreWriteEntityKind",
        "datastoreWriteNamespace"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-datastore",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {"Datastore must be enabled in the destination project."}),
  @Template(
      name = "GCS_Text_to_Firestore",
      category = TemplateCategory.BATCH,
      displayName = "Text Files on Cloud Storage to Firestore (Datastore mode)",
      description =
          "The Cloud Storage Text to Firestore template is a batch pipeline that reads from text files stored in "
              + "Cloud Storage and writes JSON encoded Entities to Firestore. "
              + "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
      optionsClass = TextToDatastoreOptions.class,
      skipOptions = {
        "datastoreWriteProjectId",
        "datastoreWriteEntityKind",
        "datastoreWriteNamespace",
        "datastoreHintNumWorkers",
        "javascriptTextTransformReloadIntervalMinutes",
        // This template doesn't use neither firestoreWriteEntityKind/Namespace
        // nor datastoreWriteEntityKind/Namespace pipeline options.
        "firestoreWriteEntityKind",
        "firestoreWriteNamespace"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-firestore",
      contactInformation = "https://cloud.google.com/support",
      requirements = {"Firestore must be enabled in the destination project."})
})
public class TextToDatastore {

  public static <T> ValueProvider<T> selectProvidedInput(
      ValueProvider<T> datastoreInput, ValueProvider<T> firestoreInput) {
    return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
  }

  /** TextToDatastore Pipeline Options. */
  public interface TextToDatastoreOptions
      extends PipelineOptions,
          FilesystemReadOptions,
          JavascriptTextTransformerOptions,
          DatastoreWriteOptions,
          ErrorWriteOptions {}

  /**
   * Runs a pipeline which reads from a Text Source, passes the Text to a Javascript UDF, writes the
   * JSON encoded Entities to a TextIO sink.
   *
   * <p>If your Text Source does not contain JSON encoded Entities, then you'll need to supply a
   * Javascript UDF which transforms your data to be JSON encoded Entities.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    TextToDatastoreOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(TextToDatastoreOptions.class);

    TupleTag<String> errorTag = new TupleTag<String>("errors") {};

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(TextIO.read().from(options.getTextReadPattern()))
        .apply(
            TransformTextViaJavascript.newBuilder()
                .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                .setFunctionName(options.getJavascriptTextTransformFunctionName())
                .build())
        .apply(
            WriteJsonEntities.newBuilder()
                .setProjectId(
                    selectProvidedInput(
                        options.getDatastoreWriteProjectId(), options.getFirestoreWriteProjectId()))
                .setHintNumWorkers(
                    selectProvidedInput(
                        options.getDatastoreHintNumWorkers(), options.getFirestoreHintNumWorkers()))
                .setErrorTag(errorTag)
                .build())
        .apply(
            LogErrors.newBuilder()
                .setErrorWritePath(options.getErrorWritePath())
                .setErrorTag(errorTag)
                .build());

    pipeline.run();
  }
}

Nächste Schritte