本示例介绍如何创建一个 Apache Beam 流处理流水线,该流水线可从 Pub/Sub 精简版中读取消息,使用固定大小的窗口函数对消息进行分组,并将其写入 Cloud Storage。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Java
如需向 Pub/Sub Lite 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubsubliteToGcs {
/*
* Define your own configuration options. Add your arguments to be processed
* by the command-line parser.
*/
public interface PubsubliteToGcsOptions extends StreamingOptions {
@Description("Your Pub/Sub Lite subscription.")
@Required
String getSubscription();
void setSubscription(String value);
@Description("Window size of output files in minutes.")
@Default.Integer(1)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("Filename prefix of output files.")
@Required
String getOutput();
void setOutput(String value);
}
private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);
public static void main(String[] args) throws InterruptedException {
// The maximum number of shards when writing output files.
int numShards = 1;
PubsubliteToGcsOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);
options.setStreaming(true);
SubscriberOptions subscriberOptions =
SubscriberOptions.newBuilder()
.setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
.build();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
.apply(
"Convert messages",
MapElements.into(TypeDescriptors.strings())
.via(
(SequencedMessage sequencedMessage) -> {
String data = sequencedMessage.getMessage().getData().toStringUtf8();
LOG.info("Received: " + data);
long publishTime = sequencedMessage.getPublishTime().getSeconds();
return data + "\t" + publishTime;
}))
.apply(
"Apply windowing function",
Window
// Group the elements using fixed-sized time intervals based on the element
// timestamp (using the default event time trigger). The element timestamp
// is the publish timestamp associated with a message.
//
// NOTE: If data is not being continuously ingested, such as with a batch or
// intermittent publisher, the final window will never close as the watermark
// will not advance. If this is a possibility with your pipeline, you should
// add an additional processing time trigger to force window closure after
// enough time has passed. See
// https://beam.apache.org/documentation/programming-guide/#triggers
// for more information.
.<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
.apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
// `waitUntilFinish()` will not work in Dataflow Flex Templates.
pipeline.run();
}
}
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。