Dataflow を使用して Pub/Sub メッセージを Cloud Storage にストリーミングします。
このコードサンプルが含まれるドキュメント ページ
コンテキストで使用されているコードサンプルを表示するには、次のドキュメントをご覧ください。
コードサンプル
Java
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGcs {
/*
* Define your own configuration options. Add your own arguments to be processed
* by the command-line parser, and specify default values for them.
*/
public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();
void setInputTopic(String value);
@Description("Output file's window size in number of minutes.")
@Default.Integer(1)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("Path of the output file including its filename prefix.")
@Required
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) throws IOException {
// The maximum number of shards when writing output.
int numShards = 1;
PubSubToGcsOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
// 1) Read string messages from a Pub/Sub topic.
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
// 2) Group the messages into fixed-sized minute intervals.
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
// 3) Write one file to GCS for every window of messages.
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// Execute the pipeline and wait until it finishes running.
pipeline.run().waitUntilFinish();
}
}
Python
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
import argparse
import datetime
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its
# publish timestamp.
| "Window into Fixed Intervals"
>> beam.WindowInto(window.FixedWindows(self.window_size))
| "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
# Use a dummy key to group the elements in the same window.
# Note that all the elements in one window must fit into memory
# for this. If the windowed elements do not fit into memory,
# please consider using `beam.util.BatchElements`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)
class AddTimestamps(beam.DoFn):
def process(self, element, publish_time=beam.DoFn.TimestampParam):
"""Processes each incoming windowed element by extracting the Pub/Sub
message and its publish timestamp into a dictionary. `publish_time`
defaults to the publish timestamp returned by the Pub/Sub server. It
is bound to each element by Beam at runtime.
"""
yield {
"message_body": element.decode("utf-8"),
"publish_time": datetime.datetime.utcfromtimestamp(
float(publish_time)
).strftime("%Y-%m-%d %H:%M:%S.%f"),
}
class WriteBatchesToGCS(beam.DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, batch, window=beam.DoFn.WindowParam):
"""Write one batch per file to a Google Cloud Storage bucket. """
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
filename = "-".join([self.output_path, window_start, window_end])
with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for element in batch:
f.write("{}\n".format(json.dumps(element)).encode("utf-8"))
def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
# `save_main_session` is set to true because some DoFn's rely on
# globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Read PubSub Messages"
>> beam.io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupWindowsIntoBatches(window_size)
| "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
)
if __name__ == "__main__": # noqa
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from.\n"
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in number of minutes.",
)
parser.add_argument(
"--output_path",
help="GCS Path of the output file including filename prefix.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
pipeline_args,
)