Plantilla de Cloud Storage en archivos de texto a Datastore [obsoleta]

Esta plantilla está obsoleta y se quitará en el tercer trimestre de 2023. Migra a la plantilla de Cloud Storage en archivos de texto a Firestore.

La plantilla texto de Cloud Storage a Datastore es una canalización por lotes que lee desde archivos de texto almacenados en Cloud Storage y escribe entidades codificadas en JSON en Datastore. Cada línea en los archivos de texto de entrada debe estar en el formato JSON especificado.

Requisitos de la canalización

  • Se debe habilitar Datastore en el proyecto de destino.

Parámetros de la plantilla

Parámetros obligatorios

  • textReadPattern: Es un patrón de ruta de acceso de Cloud Storage que especifica la ubicación de tus archivos de datos de texto. Por ejemplo, gs://mybucket/somepath/*.json
  • datastoreWriteProjectId: El ID del proyecto de Google Cloud en el que se escribirán las entidades de Datastore.
  • errorWritePath: Es el archivo de salida del registro de errores para escribir las fallas que ocurren durante el procesamiento. Por ejemplo, gs://your-bucket/errors/

Parámetros opcionales

  • javascriptTextTransformGcsPath: El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: Es el nombre de la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • datastoreHintNumWorkers: Una sugerencia para la cantidad esperada de trabajadores en el paso de regulación del aumento de Datastore. La configuración predeterminada es 500.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to Datastore template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Datastore \
    --region REGION_NAME \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
datastoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • PATH_TO_INPUT_TEXT_FILES: Es el patrón de archivos de entrada en Cloud Storage.
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Es la ruta de acceso deseada al archivo de error en Cloud Storage.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Text_to_Datastore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "datastoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • PATH_TO_INPUT_TEXT_FILES: Es el patrón de archivos de entrada en Cloud Storage.
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH: Es la ruta de acceso deseada al archivo de error en Cloud Storage.
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.TextToDatastore.TextToDatastoreOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreWriteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.WriteJsonEntities;
import com.google.cloud.teleport.templates.common.ErrorConverters.ErrorWriteOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters.LogErrors;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemReadOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.TupleTag;

/**
 * Dataflow template which reads from a Text Source and writes JSON encoded Entities into Datastore.
 * The JSON is expected to be in the format of: <a
 * href="https://cloud.google.com/datastore/docs/reference/rest/v1/Entity">Entity</a>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Datastore.md">README
 * Datastore</a> or <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Firestore.md">README
 * Firestore</a> for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_Text_to_Datastore",
      category = TemplateCategory.LEGACY,
      displayName = "Text Files on Cloud Storage to Datastore [Deprecated]",
      description =
          "The Cloud Storage Text to Datastore template is a batch pipeline that reads from text files stored in "
              + "Cloud Storage and writes JSON encoded Entities to Datastore. "
              + "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
      optionsClass = TextToDatastoreOptions.class,
      skipOptions = {
        "firestoreWriteProjectId",
        "firestoreWriteEntityKind",
        "firestoreWriteNamespace",
        "firestoreHintNumWorkers",
        "javascriptTextTransformReloadIntervalMinutes",
        // This template doesn't use neither firestoreWriteEntityKind/Namespace
        // nor datastoreWriteEntityKind/Namespace pipeline options.
        "datastoreWriteEntityKind",
        "datastoreWriteNamespace"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-datastore",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {"Datastore must be enabled in the destination project."}),
  @Template(
      name = "GCS_Text_to_Firestore",
      category = TemplateCategory.BATCH,
      displayName = "Text Files on Cloud Storage to Firestore (Datastore mode)",
      description =
          "The Cloud Storage Text to Firestore template is a batch pipeline that reads from text files stored in "
              + "Cloud Storage and writes JSON encoded Entities to Firestore. "
              + "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
      optionsClass = TextToDatastoreOptions.class,
      skipOptions = {
        "datastoreWriteProjectId",
        "datastoreWriteEntityKind",
        "datastoreWriteNamespace",
        "datastoreHintNumWorkers",
        "javascriptTextTransformReloadIntervalMinutes",
        // This template doesn't use neither firestoreWriteEntityKind/Namespace
        // nor datastoreWriteEntityKind/Namespace pipeline options.
        "firestoreWriteEntityKind",
        "firestoreWriteNamespace"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-firestore",
      contactInformation = "https://cloud.google.com/support",
      requirements = {"Firestore must be enabled in the destination project."})
})
public class TextToDatastore {

  public static <T> ValueProvider<T> selectProvidedInput(
      ValueProvider<T> datastoreInput, ValueProvider<T> firestoreInput) {
    return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
  }

  /** TextToDatastore Pipeline Options. */
  public interface TextToDatastoreOptions
      extends PipelineOptions,
          FilesystemReadOptions,
          JavascriptTextTransformerOptions,
          DatastoreWriteOptions,
          ErrorWriteOptions {}

  /**
   * Runs a pipeline which reads from a Text Source, passes the Text to a Javascript UDF, writes the
   * JSON encoded Entities to a TextIO sink.
   *
   * <p>If your Text Source does not contain JSON encoded Entities, then you'll need to supply a
   * Javascript UDF which transforms your data to be JSON encoded Entities.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    TextToDatastoreOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(TextToDatastoreOptions.class);

    TupleTag<String> errorTag = new TupleTag<String>("errors") {};

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(TextIO.read().from(options.getTextReadPattern()))
        .apply(
            TransformTextViaJavascript.newBuilder()
                .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                .setFunctionName(options.getJavascriptTextTransformFunctionName())
                .build())
        .apply(
            WriteJsonEntities.newBuilder()
                .setProjectId(
                    selectProvidedInput(
                        options.getDatastoreWriteProjectId(), options.getFirestoreWriteProjectId()))
                .setHintNumWorkers(
                    selectProvidedInput(
                        options.getDatastoreHintNumWorkers(), options.getFirestoreHintNumWorkers()))
                .setErrorTag(errorTag)
                .build())
        .apply(
            LogErrors.newBuilder()
                .setErrorWritePath(options.getErrorWritePath())
                .setErrorTag(errorTag)
                .build());

    pipeline.run();
  }
}

¿Qué sigue?