Vorlage „Pub/Sub für Avro-Dateien in Cloud Storage“
Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Die Vorlage „Pub/Sub für Avro-Dateien in Cloud Storage“ ist eine Streamingpipeline, die Daten aus einem Pub/Sub-Thema liest und Avro-Dateien in einen angegebenen Cloud Storage-Bucket schreibt.
Pipelineanforderungen
Das Pub/Sub-Eingabethema muss vor der Ausführung der Pipeline vorhanden sein.
Vorlagenparameter
Erforderliche Parameter
inputTopic : Das Pub/Sub-Thema, das zur Nachrichtenaufnahme abonniert werden soll. Der Themenname sollte das Format projects/<PROJECT_ID>/topics/<TOPIC_NAME> haben.
outputDirectory : Das Ausgabeverzeichnis, in dem die Avro-Ausgabedateien archiviert werden. Muss am Ende / enthalten. Beispiel: gs://example-bucket/example-directory/.
avroTempDirectory: Verzeichnis für temporäre Avro-Dateien. Muss am Ende / enthalten. Beispiel: gs://example-bucket/example-directory/.
Optionale Parameter
outputFilenamePrefix : Das Präfix des Ausgabedateinamens für die Avro-Dateien. Die Standardeinstellung ist "output".
outputFilenameSuffix : Das Suffix des Ausgabedateinamens für die Avro-Dateien. Die Standardeinstellung ist leer.
outputShardTemplate : Die Shard-Vorlage definiert den dynamischen Teil aller Namen der Dateien im Fenstermodus. Standardmäßig verwendet die Pipeline einen einzelnen Shard für die Ausgabe in das Dateisystem in jedem Fenster. Daher werden alle Daten in einer einzigen Datei pro Fenster ausgegeben. Für outputShardTemplate wird standardmäßig to W-P-SS-of-NN verwendet. Dabei ist W der Datumsbereich des Fensters, P die Bereichsinformation, S die Shard-Nummer und N die Anzahl der Shards. Bei einer einzelnen Datei ist der Abschnitt SS-of-NN der outputShardTemplate immer 00-of-01.
yearPattern: Muster zur Formatierung des Jahres. Muss eines oder mehrere von y oder Y sein. Die Groß-/Kleinschreibung macht für das Jahr keinen Unterschied. Umschließen Sie das Muster wahlweise mit nicht alphanumerischen Zeichen oder dem Verzeichniszeichen ('/'). Die Standardeinstellung ist YYYY.
monthPattern: Muster zur Formatierung des Monats. Muss eines oder mehrere des Zeichens M sein. Umschließen Sie das Muster wahlweise mit nicht alphanumerischen Zeichen oder dem Verzeichniszeichen ('/'). Die Standardeinstellung ist MM.
dayPattern: Muster zur Formatierung des Tages. Muss eines oder mehrere von d für den Tag des Monats oder D für den Tag des Jahres sein. Umschließen Sie das Muster wahlweise mit nicht alphanumerischen Zeichen oder dem Verzeichniszeichen ('/'). Die Standardeinstellung ist dd.
hourPattern: Muster zum Formatieren der Stunde. Muss eines oder mehrere des Zeichens H sein. Umschließen Sie das Muster wahlweise mit nicht alphanumerischen Zeichen oder dem Verzeichniszeichen ('/'). Die Standardeinstellung ist HH.
minutePattern: Muster zum Formatieren der Minute. Muss eines oder mehrere des Zeichens m sein. Umschließen Sie das Muster wahlweise mit nicht alphanumerischen Zeichen oder dem Verzeichniszeichen ('/'). Die Standardeinstellung ist mm.
Führen Sie die Vorlage aus.
Console
Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
TOPIC_NAME ist der Name des Pub/Sub-Themas
BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
FILENAME_PREFIX: das gewünschte Präfix des Ausgabedateinamens
FILENAME_SUFFIX: das gewünschte Suffix des Ausgabedateinamens
SHARD_TEMPLATE: die gewünschte Shard-Ausgabevorlage
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.
Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
TOPIC_NAME ist der Name des Pub/Sub-Themas
BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
FILENAME_PREFIX: das gewünschte Präfix des Ausgabedateinamens
FILENAME_SUFFIX: das gewünschte Suffix des Ausgabedateinamens
SHARD_TEMPLATE: die gewünschte Shard-Ausgabevorlage
Quellcode der Vorlage
Java
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.avro.AvroPubsubMessageRecord;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToAvro.Options;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
/**
* This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
* windowed Avro files at the specified output directory.
*
* <p>Files output will have the following schema:
*
* <pre>
* {
* "type": "record",
* "name": "AvroPubsubMessageRecord",
* "namespace": "com.google.cloud.teleport.avro",
* "fields": [
* {"name": "message", "type": {"type": "array", "items": "bytes"}},
* {"name": "attributes", "type": {"type": "map", "values": "string"}},
* {"name": "timestamp", "type": "long"}
* ]
* }
* </pre>
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_Avro.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_PubSub_to_Avro",
category = TemplateCategory.STREAMING,
displayName = "Pub/Sub to Avro Files on Cloud Storage",
description =
"The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads data from a Pub/Sub "
+ "topic and writes Avro files into the specified Cloud Storage bucket.",
optionsClass = Options.class,
skipOptions = "inputSubscription",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-avro",
contactInformation = "https://cloud.google.com/support",
requirements = {"The input Pub/Sub topic must exist prior to pipeline execution."},
streaming = true,
supportsAtLeastOnce = true)
public class PubsubToAvro {
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
@TemplateParameter.PubsubSubscription(
order = 1,
groupName = "Source",
description = "Pub/Sub input subscription",
helpText =
"Pub/Sub subscription to read the input from, in the format of"
+ " 'projects/your-project-id/subscriptions/your-subscription-name'",
example = "projects/your-project-id/subscriptions/your-subscription-name")
ValueProvider<String> getInputSubscription();
void setInputSubscription(ValueProvider<String> value);
@TemplateParameter.PubsubTopic(
order = 2,
groupName = "Source",
description = "Pub/Sub input topic",
helpText =
"The Pub/Sub topic to subscribe to for message consumption. The topic name must be in the format projects/<PROJECT_ID>/topics/<TOPIC_NAME>.")
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@TemplateCreationParameter(value = "false")
@Description(
"This determines whether the template reads from " + "a pub/sub subscription or a topic")
@Default.Boolean(false)
Boolean getUseSubscription();
void setUseSubscription(Boolean value);
@TemplateParameter.GcsWriteFolder(
order = 4,
groupName = "Target",
description = "Output file directory in Cloud Storage",
helpText =
"The output directory where output Avro files are archived. Must contain / at the end. For example: gs://example-bucket/example-directory/")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@TemplateParameter.Text(
order = 5,
groupName = "Target",
optional = true,
description = "Output filename prefix of the files to write",
helpText = "The output filename prefix for the Avro files.",
regexes = "^[a-zA-Z\\-]+$")
@Default.String("output")
ValueProvider<String> getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider<String> value);
@TemplateParameter.Text(
order = 6,
groupName = "Target",
optional = true,
description = "Output filename suffix of the files to write",
helpText = "The output filename suffix for the Avro files.")
@Default.String("")
ValueProvider<String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 7,
description = "Temporary Avro write directory",
helpText =
"The directory for temporary Avro files. Must contain / at the end. For example: gs://example-bucket/example-directory/.")
@Required
ValueProvider<String> getAvroTempDirectory();
void setAvroTempDirectory(ValueProvider<String> value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> messages = null;
/*
* Steps:
* 1) Read messages from PubSub
* 2) Window the messages into minute intervals specified by the executor.
* 3) Output the windowed data into Avro files, one per window by default.
*/
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
} else {
messages =
pipeline.apply(
"Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
}
messages
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
// Apply windowed file writes. Use a NestedValueProvider because the filename
// policy requires a resourceId generated from the input value at runtime.
.apply(
"Write File(s)",
AvroIO.write(AvroPubsubMessageRecord.class)
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDirectory())
.withOutputFilenamePrefix(options.getOutputFilenamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(options.getOutputFilenameSuffix())
.withYearPattern(options.getYearPattern())
.withMonthPattern(options.getMonthPattern())
.withDayPattern(options.getDayPattern())
.withHourPattern(options.getHourPattern())
.withMinutePattern(options.getMinutePattern()))
.withTempDirectory(
NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input)))
/*.withTempDirectory(FileSystems.matchNewResource(
options.getAvroTempDirectory(),
Boolean.TRUE))
*/
.withWindowedWrites()
.withNumShards(options.getNumShards()));
// Execute the pipeline and return the result.
return pipeline.run();
}
/**
* Converts an incoming {@link PubsubMessage} to the {@link AvroPubsubMessageRecord} class by
* copying its fields and the timestamp of the message.
*/
static class PubsubMessageToArchiveDoFn extends DoFn<PubsubMessage, AvroPubsubMessageRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
PubsubMessage message = context.element();
context.output(
new AvroPubsubMessageRecord(
message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
}
}
}