Tema de Pub/Sub o suscripción a archivos de texto en la planilla de Cloud Storage
Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
El tema de Pub/Sub o la suscripción a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.
Requisitos de la canalización
El tema o la suscripción a Pub/Sub deben existir antes de la ejecución.
Los mensajes publicados en el tema deben tener formato de texto.
Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.
Parámetros de la plantilla
Parámetro
Descripción
inputTopic
El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<project-id>/topics/<topic-name>. Si se proporciona este parámetro, no se debe proporcionar inputSubscription.
inputSubscription
El tema de Pub/Sub desde el que se lee la entrada. El nombre de la suscripción debe tener el formato projects/<project-id>/subscription/<subscription-name>. Si se proporciona este parámetro, no se debe proporcionar inputTopic.
outputDirectory
La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/. El valor debe terminar con una barra.
outputFilenamePrefix
El prefijo para colocar en cada archivo con ventanas. Por ejemplo, output-.
outputFilenameSuffix
El sufijo para colocar en cada archivo con ventanas, por lo general, es una extensión de archivo como .txt o .csv.
outputShardTemplate
La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización utiliza una única fragmentación de salida para el sistema de archivo dentro de cada ventana. Esto significa que todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de fragmentos. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01.
windowDuration
La duración de la ventana es el intervalo en el que se escriben los datos en el directorio de salida (opcional). Configura la duración en función de la capacidad de procesamiento de la canalización. Por ejemplo, una capacidad de procesamiento mayor puede requerir tamaños de ventana más pequeños para que los datos se ajusten a la memoria. La configuración predeterminada es de 5 min, con un mínimo de 1 s. Los formatos permitidos son: [nro. entero] s (para los segundos, por ejemplo, 5 s), [nro. entero] min (para los minutos, por ejemplo, 12 min) y [nro. entero] h (para las horas, por ejemplo, 2 h).
Ejecuta la plantilla
Consola
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.
el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
Código fuente de la plantilla
Java
/*
* Copyright (C) 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.pubsubtotext;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.v2.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Cloud_PubSub_to_GCS_Text_Flex.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text_Flex",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage",
description =
"The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records "
+ "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.",
optionsClass = Options.class,
flexContainerName = "pubsub-to-text",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Pub/Sub topic or subscription must exist prior to execution.",
"The messages published to the topic must be in text format.",
"The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file."
},
streaming = true,
supportsAtLeastOnce = true)
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubTopic(
order = 1,
optional = true,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
String getInputTopic();
void setInputTopic(String value);
@TemplateParameter.PubsubSubscription(
order = 2,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
String getInputSubscription();
void setInputSubscription(String value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.",
example = "gs://your-bucket/your-path")
@Required
String getOutputDirectory();
void setOutputDirectory(String value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
String getUserTempLocation();
void setUserTempLocation(String value);
@TemplateParameter.Text(
order = 5,
optional = true,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
@Default.String("output")
@Required
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.",
example = ".txt")
@Default.String("")
String getOutputFilenameSuffix();
void setOutputFilenameSuffix(String value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
"Either input topic or input subscription must be provided, but not both.");
}
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (useInputSubscription) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
}
}