Plantillas de utilidad proporcionadas por Google

Google proporciona un conjunto de plantillas de código abierto de Dataflow. Para obtener información general sobre las plantillas, consulta la página Descripción general. Para obtener una lista de todas las plantillas proporcionadas por Google, consulta la página Plantillas que proporciona Google.

En esta página, se documentan las siguientes plantillas de utilidad:

Compresión masiva de archivos de Cloud Storage

La plantilla de compresión masiva de archivos de Cloud Storage es una canalización por lotes que comprime archivos en Cloud Storage en una ubicación especificada. Esta plantilla puede ser útil cuando necesites comprimir grandes lotes de archivos como parte de un proceso de archivo periódico. Los modos de compresión compatibles son BZIP2, DEFLATE y GZIP. Los archivos que se envían a la ubicación de destino seguirán un esquema de nombres que consta del nombre de archivo original anexado a la extensión del modo de compresión. Las extensiones anexadas serán .bzip2, .deflate y .gz.

Cualquier error que ocurra durante el proceso de compresión se escribirá en el archivo de falla en formato CSV de nombre de archivo, mensaje de error. Si no se producen fallas mientras se ejecuta la canalización, el archivo de error se creará, pero no tendrá registros de errores.

Requisitos para esta canalización:

  • La compresión debe tener uno de los siguientes formatos: BZIP2, DEFLATE y GZIP.
  • El directorio de salida debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/uncompressed/*.txt.
outputDirectory La ubicación de salida para escribir. Por ejemplo, gs://bucket-name/compressed/.
outputFailureFile El archivo de salida del registro de errores para escribir fallas que ocurran durante el proceso de compresión. Por ejemplo, gs://bucket-name/compressed/failed.csv. Si no se encuentran fallas, el archivo se creará, pero estará vacío. El contenido del archivo tiene el formato CSV (nombre de archivo, error) y consta de una línea por cada archivo con errores de compresión.
compression El algoritmo de compresión que se utiliza para comprimir los archivos coincidentes. El modo debe ser uno de los siguientes: BZIP2, DEFLATE o GZIP.

Ejecuta la plantilla de compresión masiva de archivos de Cloud Storage

CONSOLE

Realiza la ejecución desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Bulk Compress Cloud Storage Files template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecútala desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 del SDK de Cloud o una posterior.

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_BUCKET_NAME con el nombre del depósito de Cloud Storage.
  • Reemplaza COMPRESSION con el algoritmo de compresión que elijas.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://YOUR_BUCKET_NAME/compressed,\
outputFailureFile=gs://YOUR_BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

API

Ejecútala desde la API de REST

Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files

Para ejecutar esta plantilla con una solicitud de la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere de una autorización.

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_BUCKET_NAME con el nombre del depósito de Cloud Storage.
  • Reemplaza COMPRESSION con el algoritmo de compresión que elijas.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/compressed",
       "outputFailureFile": "gs://YOUR_BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Descompresión masiva de archivos de Cloud Storage

La plantilla de descompresión masiva de archivos de Cloud Storage es una canalización por lotes que descomprime archivos de Cloud Storage en una ubicación especificada. Esta función es útil cuando deseas utilizar datos comprimidos para minimizar los costos del ancho de banda de la red durante una migración, pero quieres maximizar la velocidad de procesamiento analítico trabajando con datos no comprimidos luego de la migración. La canalización controla de forma automática los diferentes modos de compresión en una sola ejecución y determina el modo de descompresión que utilizará según la extensión de los archivos (.bzip2, .deflate, .gz, .zip).

Requisitos para esta canalización:

  • Los archivos que se descomprimirán deben tener uno de los siguientes formatos: Bzip2, Deflate, Gzip o Zip.
  • El directorio de salida debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/compressed/*.gz.
outputDirectory La ubicación de salida para escribir. Por ejemplo, gs://bucket-name/decompressed.
outputFailureFile El archivo de salida del registro de errores para escribir fallas que ocurran durante el proceso de descompresión. Por ejemplo, gs://bucket-name/decompressed/failed.csv. Si no se encuentran fallas, el archivo se creará, pero estará vacío. El contenido del archivo tiene el formato CSV (nombre de archivo, error) y consta de una línea por cada archivo con errores de descompresión.

Ejecuta la plantilla de descompresión masiva de archivos de Cloud Storage

CONSOLE

Realiza la ejecución desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Bulk Decompress Cloud Storage Files template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecútala desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 del SDK de Cloud o una posterior.

Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_BUCKET_NAME con el nombre del depósito de Cloud Storage.
  • Reemplaza OUTPUT_FAILURE_FILE_PATH con tu ruta de acceso al archivo que contiene la información de falla.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://YOUR_BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

API

Ejecútala desde la API de REST

Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files

Para ejecutar esta plantilla con una solicitud de la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere de una autorización.

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza YOUR_BUCKET_NAME con el nombre del depósito de Cloud Storage.
  • Reemplaza OUTPUT_FAILURE_FILE_PATH con tu ruta de acceso al archivo que contiene la información de falla.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://YOUR_BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Borrado masivo de Datastore

La plantilla de borrado masivo de Datastore es una canalización que lee entidades de Datastore con una consulta de GQL determinada y, luego, borra todas las entidades coincidentes en el proyecto de destino seleccionado. De forma opcional, la canalización puede pasar las entidades de Datastore codificadas en JSON a tu UDF de JavaScript, que puedes usar para filtrar entidades mostrando valores nulos.

Requisitos para esta canalización:

  • Datastore debe configurarse en el proyecto antes de ejecutar la plantilla.
  • Si se realiza la lectura y el borrado desde instancias de Datastore diferentes, la cuenta de servicio del controlador de Dataflow debe tener permiso para leer desde una instancia y borrar desde la otra.

Parámetros de la plantilla

Parámetro Descripción
datastoreReadGqlQuery Consulta de GQL que especifica las entidades que deben coincidir para la eliminación. Por ejemplo, "SELECT * FROM MyKind".
datastoreReadProjectId ID del proyecto de GCP de la instancia de Datastore desde la que deseas leer las entidades (mediante tu consulta de GQL) que se usan para las coincidencias.
datastoreDeleteProjectId ID del proyecto de GCP de la instancia de Datastore desde la cual borrar las entidades coincidentes. Esto puede ser igual a datastoreReadProjectId si deseas leer y borrar dentro de la misma instancia de Datastore.
datastoreReadNamespace Espacio de nombres de las entidades solicitadas [opcional]. Configurado como "" para el espacio de nombres predeterminado.
javascriptTextTransformGcsPath Ruta de acceso de Cloud Storage que contiene todo el código de JavaScript. Por ejemplo, "gs://mybucket/mytransforms/*.js" [opcional]. Si no quieres usar una UDF, deja este campo en blanco.
javascriptTextTransformFunctionName [Opcional] Nombre de la función para llamar. Si esta función muestra un valor no definido o nulo para una entidad de Datastore determinada, esa entidad no se borrará. Si tienes el código de JavaScript "function myTransform(inJson) { …dostuff…}", el nombre de la función será "myTransform". Si no quieres usar una UDF, deja este campo en blanco.

Ejecuta la plantilla de borrado masivo de Datastore

CONSOLE

Realiza la ejecución desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Datastore Bulk Delete template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecútala desde la herramienta de línea de comandos de gcloud

Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 del SDK de Cloud o una posterior.

Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza GQL_QUERY con la consulta que usarás para borrar las entidades coincidentes.
  • Reemplaza DATASTORE_READ_AND_DELETE_PROJECT_ID con tu ID del proyecto de la instancia de Datastore. En este ejemplo, se realizan operaciones de lectura y de eliminación desde la misma instancia de Datastore.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

API

Ejecútala desde la API de REST

Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete

Para ejecutar esta plantilla con una solicitud de la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere de una autorización.

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza GQL_QUERY con la consulta que usarás para borrar las entidades coincidentes.
  • Reemplaza DATASTORE_READ_AND_DELETE_PROJECT_ID con tu ID del proyecto de la instancia de Datastore. En este ejemplo, se realizan operaciones de lectura y de eliminación desde la misma instancia de Datastore.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "READ_PROJECT_ID",
       "datastoreDeleteProjectId": "DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Generador de datos de transmisión a Pub/Sub

Durante el desarrollo de canalizaciones de Dataflow, uno de los requisitos comunes es ejecutar una comparativa de rendimiento cada cierta cantidad de consultas por segundo (QPS). Esta plantilla se puede usar a fin de generar mensajes JSON falsos basados en el esquema proporcionado por el usuario a la velocidad especificada para un tema de Pub/Sub.

La biblioteca del Generador de datos JSON que usa la canalización permite usar varias funciones de este tipo para cada campo de esquema. Consulta los documentos para obtener más información sobre estas funciones y el formato del esquema.

Requisitos para esta canalización:

  • Crea un archivo de esquema y almacénalo en la ubicación de Cloud Storage
  • El tema de Pub/Sub de salida debe existir antes de la ejecución.

Parámetros de la plantilla

Parámetro Descripción
schemaLocation Ubicación del archivo de esquema. Por ejemplo: gs://mybucket/filename.json.
topic Nombre del tema de Pub/Sub en el que la canalización debe publicar datos. Por ejemplo: projects/<project-id>/topics/<topic-name>
qps Cantidad de mensajes que se publicarán por segundo. Por ejemplo: 100.
autoscalingAlgorithm Algoritmo que se usa para el ajuste de escala automático de los trabajadores (opcional). Los valores posibles son THROUGHPUT_BASED, para habilitar el ajuste de escala automático, o NONE, si deseas inhabilitarlo.
maxNumWorkers Cantidad máxima de máquinas de trabajador (opcional). Por ejemplo: 10.

Ejecuta la plantilla de Generador de datos de transmisión

CONSOLE

Ejecuta desde Google Cloud Console
  1. Ve a la página de Dataflow en Cloud Console.
  2. Ir a la página de Dataflow
  3. Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
  4. Botón Crear trabajo a partir de una plantilla de Cloud Platform Console
  5. Selecciona the Streaming Data Generator template en el menú desplegable Plantilla de Dataflow.
  6. Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  7. Ingresa los valores de tus parámetros en los campos de parámetros provistos.
  8. Haz clic en Run Job (Ejecutar trabajo).

GCLOUD

Ejecuta desde la herramienta de línea de comandos de gcloud

Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza REGION_NAME por el nombre de la región de Dataflow. Por ejemplo: us-central1.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza SCHEMA_LOCATION por la ruta de acceso al archivo de esquema en Cloud Storage. Por ejemplo: gs://mybucket/filename.json.
  • Reemplaza PUBSUB_TOPIC por el tema de Pub/Sub de salida. Por ejemplo: projects/<project-id>/topics/<topic-name>.
  • Reemplaza QPS por la cantidad de mensajes que se publicarán por segundo.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
topic=PUBSUB_TOPIC,\
qps=QPS
  

API

Ejecuta desde la API de REST

Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:

gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator

Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.

En este ejemplo, debes reemplazar los siguientes valores:

  • Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
  • Reemplaza LOCATION por el nombre de la región de Dataflow. Por ejemplo: us-central1.
  • Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Reemplaza SCHEMA_LOCATION por la ruta de acceso al archivo de esquema en Cloud Storage. Por ejemplo: gs://mybucket/filename.json.
  • Reemplaza PUBSUB_TOPIC por el tema de Pub/Sub de salida. Por ejemplo: projects/<project-id>/topics/<topic-name>.
  • Reemplaza QPS por la cantidad de mensajes que se publicarán por segundo.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "topic": "PUBSUB_TOPIC",
          "qps": "QPS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Streaming_Data_Generator",
   }
}