Argomento Pub/Sub o sottoscrizione a file di testo su modello Cloud Storage
Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
Il modello di testo di argomento Pub/Sub o di sottoscrizione a Cloud Storage è una pipeline di inserimento flussi che legge i record da Pub/Sub e li salva come una serie di file Cloud Storage in formato testo. Il modello può essere utilizzato come modo rapido per salvare dati in
Pub/Sub per uso futuro. Per impostazione predefinita, il modello genera un nuovo file ogni 5 minuti.
Requisiti della pipeline
L'argomento o la sottoscrizione Pub/Sub deve esistere prima dell'esecuzione.
I messaggi pubblicati nell'argomento devono essere in formato testo.
I messaggi pubblicati nell'argomento non devono contenere caratteri di fine riga. Tieni presente che ogni messaggio Pub/Sub viene salvato come singola riga nel file di output.
Parametri del modello
Parametro
Descrizione
inputTopic
L'argomento Pub/Sub da cui leggere l'input. Il nome dell'argomento deve essere nel
formato projects/<project-id>/topics/<topic-name>. Se questo parametro viene fornito, inputSubscription non deve essere specificato.
inputSubscription
La sottoscrizione Pub/Sub da cui leggere l'input. Il nome della sottoscrizione deve
essere nel formato
projects/<project-id>/subscription/<subscription-name>. Se questo parametro viene fornito, inputTopic non deve essere specificato.
outputDirectory
Il percorso e il prefisso del nome file per la scrittura dei file di output. Ad esempio,
gs://bucket-name/path/. Questo valore deve terminare con una barra.
outputFilenamePrefix
Il prefisso da inserire in ogni file visualizzato in una finestra. Ad esempio, output-.
outputFilenameSuffix
Il suffisso da inserire in ogni file visualizzato in una finestra, in genere un'estensione del file come .txt o .csv.
outputShardTemplate
Il modello shard definisce la parte dinamica di ogni file visualizzato in finestre. Per impostazione predefinita, la pipeline utilizza un singolo shard per l'output nel file system all'interno di ogni finestra. Ciò significa che tutti i dati vengono restituiti in un unico file per finestra. Il valore predefinito di outputShardTemplate è W-P-SS-of-NN, dove W è l'intervallo di date della finestra, P è le informazioni relative al riquadro, S è il numero degli shard e N è il numero di shard. Nel caso di un solo file, la parte SS-of-NN di
outputShardTemplate è 00-of-01.
windowDuration
(Facoltativo) La durata della finestra è l'intervallo in cui i dati vengono scritti nella directory
di output. Configura la durata in base alla velocità effettiva della pipeline. Ad esempio, una velocità effettiva più elevata potrebbe richiedere finestre di dimensioni inferiori in modo che i dati rientrino nella memoria. Il valore predefinito è 5 m, con un minimo di 1 s. I formati consentiti sono: [int]s (per secondi, ad esempio 5 s), [int]m (per
minuti, ad esempio 12 m), [int]h (per ore, esempio: 2h).
il nome della versione, come 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella padre con data all'interno del bucket:
gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
BUCKET_NAME: il nome del bucket Cloud Storage
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.
il nome della versione, come 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella padre con data all'interno del bucket:
gs://dataflow-templates-REGION_NAME/
SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
BUCKET_NAME: il nome del bucket Cloud Storage
Codice sorgente del modello
Java
/*
* Copyright (C) 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.pubsubtotext;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.v2.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Cloud_PubSub_to_GCS_Text_Flex.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text_Flex",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage",
description =
"The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records "
+ "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.",
optionsClass = Options.class,
flexContainerName = "pubsub-to-text",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Pub/Sub topic or subscription must exist prior to execution.",
"The messages published to the topic must be in text format.",
"The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file."
},
streaming = true,
supportsAtLeastOnce = true)
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubTopic(
order = 1,
optional = true,
description = "Pub/Sub input topic",
helpText =
"Pub/Sub topic to read the input from, in the format of "
+ "'projects/your-project-id/topics/your-topic-name'",
example = "projects/your-project-id/topics/your-topic-name")
String getInputTopic();
void setInputTopic(String value);
@TemplateParameter.PubsubSubscription(
order = 2,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
String getInputSubscription();
void setInputSubscription(String value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime"
+ " formatting is used to parse directory path for date & time formatters.",
example = "gs://your-bucket/your-path")
@Required
String getOutputDirectory();
void setOutputDirectory(String value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
String getUserTempLocation();
void setUserTempLocation(String value);
@TemplateParameter.Text(
order = 5,
optional = true,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
@Default.String("output")
@Required
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file. Typically a file extension such "
+ "as .txt or .csv.",
example = ".txt")
@Default.String("")
String getOutputFilenameSuffix();
void setOutputFilenameSuffix(String value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
"Either input topic or input subscription must be provided, but not both.");
}
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (useInputSubscription) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
}
}