Vorlage „Pub/Sub-Thema oder -Abo für Textdateien in Cloud Storage“
Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Die Vorlage "Pub/Sub-Thema oder -Abo für Cloud Storage Text" ist eine Streamingpipeline, die Datensätze aus Pub/Sub liest und als eine Reihe von Cloud Storage-Dateien im Textformat speichert. Die Vorlage kann als schnelle Möglichkeit zum Speichern von Daten in Pub/Sub zur späteren Verwendung genutzt werden. Standardmäßig erstellt die Vorlage alle fünf Minuten eine neue Datei.
Pipelineanforderungen
Das Pub/Sub-Thema oder -Abo muss vor der Ausführung vorhanden sein.
Die im Thema veröffentlichten Nachrichten müssen im Textformat vorliegen.
Die im Thema veröffentlichten Nachrichten dürfen keine Zeilenumbrüche enthalten. Beachten Sie, dass jede Pub/Sub-Nachricht in der Ausgabedatei als einzelne Zeile gespeichert wird.
Vorlagenparameter
Parameter
Beschreibung
inputTopic
Das Pub/Sub-Thema, aus dem die Eingabe gelesen werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben. Wenn dieser Parameter angegeben wird, sollte inputSubscription nicht angegeben werden.
inputSubscription
Das Pub/Sub-Abo, aus dem die Eingabe gelesen werden soll. Der Aboname muss das Format projects/<project-id>/subscription/<subscription-name> haben. Wenn dieser Parameter angegeben wird, sollte inputTopic nicht angegeben werden.
outputDirectory
Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Beispiel: gs://bucket-name/path/. Dieser Wert muss mit einem Schrägstrich enden.
outputFilenamePrefix
Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. z. B. output-.
outputFilenameSuffix
Das Suffix für die Namen der einzelnen Dateien im Fenstermodus, normalerweise eine Dateiendung wie .txt oder .csv.
outputShardTemplate
Die Shard-Vorlage definiert den dynamischen Teil aller Namen der Dateien im Fenstermodus. Standardmäßig verwendet die Pipeline einen einzelnen Shard für die Ausgabe in das Dateisystem in jedem Fenster. Das bedeutet, dass alle Daten in einer einzigen Datei pro Fenster ausgegeben werden. Für outputShardTemplate wird standardmäßig W-P-SS-of-NN verwendet. Dabei ist W der Datumsbereich des Fensters, P die Bereichsinformation, S die Shard-Nummer und N die Anzahl der Shards. Bei einer einzelnen Datei ist der Abschnitt SS-of-NN der outputShardTemplate immer 00-of-01.
windowDuration
(Optional) Die Fensterdauer ist das Intervall, in dem Daten in das Ausgabeverzeichnis geschrieben werden. Konfigurieren Sie die Dauer basierend auf dem Durchsatz der Pipeline. Bei einem höheren Durchsatz können beispielsweise kleinere Fenstergrößen erforderlich sein, damit die Daten in den Speicher passen. Die Standardeinstellung ist „5m”, mit mindestens 1 s. Zulässige Formate sind: [int]s (für Sekunden, Beispiel: 5s), [int]m (für Minuten, Beispiel: 12m), [int]h (für Stunden, Beispiel: 2h).
Führen Sie die Vorlage aus.
Console
Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.
Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
Quellcode der Vorlage
Java
/*
* Copyright (C) 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.pubsubtotext;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.v2.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed files at the specified output directory.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Cloud_PubSub_to_GCS_Text_Flex.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_PubSub_to_GCS_Text_Flex",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage",
description =
"The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records "
+ "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.",
optionsClass = Options.class,
flexContainerName = "pubsub-to-text",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Pub/Sub topic or subscription must exist prior to execution.",
"The messages published to the topic must be in text format.",
"The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file."
},
streaming = true,
supportsAtLeastOnce = true)
public class PubsubToText {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubTopic(
order = 1,
optional = true,
description = "Pub/Sub input topic",
helpText =
"The Pub/Sub topic to read the input from. The topic name should be in the format "
+ "`projects/<PROJECT_ID>/topics/<TOPIC_NAME>`. If this parameter is provided "
+ "don't use `inputSubscription`.",
example = "projects/your-project-id/topics/your-topic-name")
String getInputTopic();
void setInputTopic(String value);
@TemplateParameter.PubsubSubscription(
order = 2,
optional = true,
description = "Pub/Sub input subscription",
helpText =
"The Pub/Sub subscription to read the input from. The subscription name uses the format "
+ "`projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>`. If this parameter is "
+ "provided, don't use `inputTopic`.",
example = "projects/your-project-id/subscriptions/your-subscription-name")
String getInputSubscription();
void setInputSubscription(String value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix to write write output files to. "
+ "This value must end in a slash.",
example = "gs://your-bucket/your-path")
@Required
String getOutputDirectory();
void setOutputDirectory(String value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "User provided temp location",
helpText =
"The user provided directory to output temporary files to. Must end with a slash.")
String getUserTempLocation();
void setUserTempLocation(String value);
@TemplateParameter.Text(
order = 5,
optional = true,
description = "Output filename prefix of the files to write",
helpText = "The prefix to place on each windowed file.",
example = "output-")
@Default.String("output")
@Required
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String value);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Output filename suffix of the files to write",
helpText =
"The suffix to place on each windowed file, typically a file extension such as `.txt` or `.csv`.",
example = ".txt")
@Default.String("")
String getOutputFilenameSuffix();
void setOutputFilenameSuffix(String value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
if (useInputSubscription == useInputTopic) {
throw new IllegalArgumentException(
"Either input topic or input subscription must be provided, but not both.");
}
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = null;
/*
* Steps:
* 1) Read string messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed files to GCS
*/
if (useInputSubscription) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
}
messages
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
maybeUseUserTempLocation(
options.getUserTempLocation(), options.getOutputDirectory()))));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
* when output bucket is locked and temporary data cannot be deleted.
*
* @param userTempLocation user provided temp location
* @param outputLocation user provided outputDirectory to be used as the default temp location
* @return userTempLocation if available, otherwise outputLocation is returned.
*/
private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
}
}