快速入门:使用 Dataflow 进行流处理

Dataflow 是一种全托管式服务,用于以流式传输(实时)和批量模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。它使用 Apache Beam SDK 提供了一个简化的流水线开发环境;该 SDK 具有一组丰富的数据选取和会话分析基本功能,以及一个包含来源连接器与接收器连接器的生态系统。本快速入门介绍如何使用 Dataflow 执行以下操作:

  • 读取发布到 Pub/Sub 主题的消息
  • 按时间戳选取(或组合)消息
  • 将消息写入 Cloud Storage

本快速入门介绍如何在 Java 和 Python 中使用 Dataflow。SQL 也受支持。

如果您不打算进行自定义数据处理,也可以通过使用基于界面的 Dataflow 模板开始上手。

准备工作

  1. 按照安装和初始化 Cloud SDK 的说明进行操作。
  2. 为您的项目启用结算功能
  3. 要完成本快速入门,您需要启用以下 API:Compute Engine、Google Cloud 的操作套件、Cloud Storage、Cloud Storage JSON、Pub/Sub、Cloud Scheduler、Resource Manager 和 App Engine。

    启用 API

    控制台显示 API 之前,您可能需要稍等片刻。

  4. 创建服务帐号密钥:

    创建服务帐号密钥

    1. 服务帐号列表中,选择“新的服务帐号”。
    2. 服务帐号名称字段中输入名称。
    3. 角色列表中,选择项目 > Owner
    4. 点击创建

    密钥会发送到浏览器的默认下载文件夹。

  5. GOOGLE_APPLICATION_CREDENTIALS 环境变量设置为指向服务帐号密钥。

    export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
    
  6. 为存储分区和项目创建变量。Cloud Storage 存储分区名称必须是全局唯一的。

    BUCKET_NAME=bucket-name
    PROJECT_NAME=$(gcloud config get-value project)
    
  7. 创建此项目所拥有的 Cloud Storage 存储分区:

    gsutil mb gs://$BUCKET_NAME
    
  8. 在此项目中创建 Pub/Sub 主题:

    gcloud pubsub topics create cron-topic
    
  9. 在此项目中创建 Cloud Scheduler 作业。作业每隔一分钟向 Cloud Pub/Sub 主题发布一条消息。

    如果项目不存在 App Engine 应用,则此步骤将创建一个。

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
     --topic=cron-topic --message-body="Hello!"
    

    启动作业。

    gcloud scheduler jobs run publisher-job
    
  10. 使用以下命令克隆快速入门代码库并导航到示例代码目录:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics
    

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies
    

将消息从 Pub/Sub 流式传输到 Cloud Storage

代码示例

此示例代码使用 Dataflow 执行以下操作:

  • 读取 Pub/Sub 消息。
  • 按发布时间戳将消息按固定大小间隔选取(或组合)。
  • 将每个窗口中的消息写入 Cloud Storage 中的文件。

Java

import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import java.io.IOException;

public class PubSubToGCS {
  /*
  * Define your own configuration options. Add your own arguments to be processed
  * by the command-line parser, and specify default values for them.
  */
  public interface PubSubToGCSOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();
    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();
    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();
    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGCSOptions options = PipelineOptionsFactory
      .fromArgs(args)
      .withValidation()
      .as(PubSubToGCSOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
      // 1) Read string messages from a Pub/Sub topic.
      .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
      // 2) Group the messages into fixed-sized minute intervals.
      .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
      // 3) Write one file to GCS for every window of messages.
      .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
import datetime
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its
            # publish timestamp.
            | "Window into Fixed Intervals"
            >> beam.WindowInto(window.FixedWindows(self.window_size))
            | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
            # Use a dummy key to group the elements in the same window.
            # Note that all the elements in one window must fit into memory
            # for this. If the windowed elements do not fit into memory,
            # please consider using `beam.util.BatchElements`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
            | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
        )

class AddTimestamps(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        """Processes each incoming windowed element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """

        yield {
            "message_body": element.decode("utf-8"),
            "publish_time": datetime.datetime.utcfromtimestamp(
                float(publish_time)
            ).strftime("%Y-%m-%d %H:%M:%S.%f"),
        }

class WriteBatchesToGCS(beam.DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, batch, window=beam.DoFn.WindowParam):
        """Write one batch per file to a Google Cloud Storage bucket. """

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        filename = "-".join([self.output_path, window_start, window_end])

        with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for element in batch:
                f.write("{}\n".format(json.dumps(element)).encode("utf-8"))

def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Read PubSub Messages"
            >> beam.io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupWindowsIntoBatches(window_size)
            | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
        )

if __name__ == "__main__":  # noqa
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from.\n"
        '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in number of minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="GCS Path of the output file including filename prefix.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        pipeline_args,
    )

启动流水线

如需启动流水线,请使用以下命令:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGCS \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_NAME \
    --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
    --output=gs://$BUCKET_NAME/samples/output \
    --runner=DataflowRunner \
    --windowSize=2"

Python

python PubSubToGCS.py \
  --project=$PROJECT_NAME \
  --input_topic=projects/$PROJECT_NAME/topics/cron-topic \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --temp_location=gs://$BUCKET_NAME/temp

查看作业和流水线进度

您可以在 Dataflow 控制台中查看作业的进度。

转到 Dataflow 控制台

查看作业的进度

打开作业详细信息视图以查看以下内容:

  • 作业结构
  • 作业日志
  • 阶段指标

查看作业的进度

您可能需要等待几分钟才能在 Cloud Storage 中看到输出文件。

查看作业的进度

或者,您可以使用以下命令行查看哪些文件已输出。

gsutil ls gs://${BUCKET_NAME}/samples/

输出应如下所示:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32
gs://{$BUCKET_NAME}/samples/output-22:32-22:34
gs://{$BUCKET_NAME}/samples/output-22:34-22:36
gs://{$BUCKET_NAME}/samples/output-22:36-22:38

清理

  1. 删除 Cloud Scheduler 作业。

    gcloud scheduler jobs delete publisher-job
    
  2. 使用 Ctrl+C 停止终端中的程序。

  3. 在 Dataflow 控制台中,停止作业。取消流水线(不排空)。

  4. 删除主题。

    gcloud pubsub topics delete cron-topic
    
  5. 删除流水线创建的文件。

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    
  6. 移除 Cloud Storage 存储分区。

    gsutil rb gs://${BUCKET_NAME}
    

后续步骤