이 샘플은 Pub/Sub 라이트에서 메시지를 읽고, 고정 크기 윈도우 함수를 사용하여 메시지를 그룹화하고, Cloud Storage에 쓰는 Apache Beam 스트리밍 파이프라인을 만드는 방법을 보여줍니다.
더 살펴보기
이 코드 샘플이 포함된 자세한 문서는 다음을 참조하세요.
코드 샘플
Java
Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubsubliteToGcs {
/*
* Define your own configuration options. Add your arguments to be processed
* by the command-line parser.
*/
public interface PubsubliteToGcsOptions extends StreamingOptions {
@Description("Your Pub/Sub Lite subscription.")
@Required
String getSubscription();
void setSubscription(String value);
@Description("Window size of output files in minutes.")
@Default.Integer(1)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("Filename prefix of output files.")
@Required
String getOutput();
void setOutput(String value);
}
private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);
public static void main(String[] args) throws InterruptedException {
// The maximum number of shards when writing output files.
int numShards = 1;
PubsubliteToGcsOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);
options.setStreaming(true);
SubscriberOptions subscriberOptions =
SubscriberOptions.newBuilder()
.setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
.build();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
.apply(
"Convert messages",
MapElements.into(TypeDescriptors.strings())
.via(
(SequencedMessage sequencedMessage) -> {
String data = sequencedMessage.getMessage().getData().toStringUtf8();
LOG.info("Received: " + data);
long publishTime = sequencedMessage.getPublishTime().getSeconds();
return data + "\t" + publishTime;
}))
.apply(
"Apply windowing function",
Window
// Group the elements using fixed-sized time intervals based on the element
// timestamp (using the default event time trigger). The element timestamp
// is the publish timestamp associated with a message.
//
// NOTE: If data is not being continuously ingested, such as with a batch or
// intermittent publisher, the final window will never close as the watermark
// will not advance. If this is a possibility with your pipeline, you should
// add an additional processing time trigger to force window closure after
// enough time has passed. See
// https://beam.apache.org/documentation/programming-guide/#triggers
// for more information.
.<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
.apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
// `waitUntilFinish()` will not work in Dataflow Flex Templates.
pipeline.run();
}
}
다음 단계
다른 Google Cloud 제품의 코드 샘플을 검색하고 필터링하려면 Google Cloud 샘플 브라우저를 참조하세요.