Dataflow를 사용하여 Pub/Sub에서 메시지 스트리밍

Dataflow는 신뢰성과 표현 능력은 그대로 유지하면서 스트림(실시간) 및 일괄 모드에서 데이터를 변환하고 강화하는 완전 관리형 서비스입니다. 소스 및 싱크 커넥터의 생태계는 물론 다양한 기간 설정과 세션 분석 기본 도구를 제공하는 Apache Beam SDK를 사용하여 간소화된 파이프라인 개발 환경을 제공합니다. 이 빠른 시작에서는 Dataflow를 사용하여 다음을 수행하는 방법을 보여줍니다.

  • Pub/Sub 주제에 게시된 메시지 읽기
  • 타임스탬프를 기준으로 메시지 기간 설정 또는 그룹화
  • Cloud Storage에 메시지 쓰기

이 빠른 시작에서는 Dataflow를 자바 및 Python으로 사용하는 방법을 소개합니다. SQL도 지원됩니다. 이 빠른 시작은 시작할 수 있도록 임시 사용자 인증 정보를 제공하는 Google Cloud Skills Boost 튜토리얼로도 제공됩니다.

커스텀 데이터 처리를 수행하지 않으려는 경우 UI 기반 Dataflow 템플릿을 사용하여 시작할 수도 있습니다.

시작하기 전에

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login

Pub/Sub 프로젝트 설정

  1. 버킷, 프로젝트, 리전의 변수를 만듭니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다. 이 빠른 시작에서 명령어를 실행할 위치에 가까운 Dataflow 리전을 선택합니다. REGION 변수 값은 유효한 리전 이름이어야 합니다. 리전과 위치에 대한 자세한 내용은 Dataflow 위치를 참조하세요.

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
  2. 이 프로젝트가 소유한 Cloud Storage 버킷을 만듭니다.

    gcloud storage buckets create gs://$BUCKET_NAME
  3. 이 프로젝트에서 Pub/Sub 주제를 만듭니다.

    gcloud pubsub topics create $TOPIC_ID
  4. 이 프로젝트에서 Cloud Scheduler 작업을 만듭니다. 작업은 1분 간격으로 Pub/Sub 주제에 메시지를 게시합니다.

    App Engine 앱이 프로젝트에 존재하지 않는 경우 이 단계를 수행하면 App Engine 앱이 생성됩니다.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    작업을 시작합니다.

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. 다음 명령어를 사용하여 빠른 시작 저장소를 클론하고 샘플 코드 디렉터리로 이동합니다.

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Pub/Sub에서 Cloud Storage로 메시지 스트리밍

코드 샘플

이 샘플 코드에서는 Dataflow를 사용하여 다음을 수행합니다.

  • Pub/Sub 메시지를 읽습니다.
  • 게시 타임스탬프를 기준으로 메시지를 고정 크기 간격으로 기간을 설정 또는 그룹화합니다.
  • 각 창의 메시지를 Cloud Storage의 파일에 작성합니다.

자바


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows


class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )


class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )


class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())


def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

파이프라인 시작

파이프라인을 시작하려면 다음 명령어를 실행합니다.

자바

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

위 명령어는 로컬에서 실행되고 클라우드에서 실행되는 Dataflow 작업을 시작합니다. 명령어가 JOB_MESSAGE_DETAILED: Workers have started successfully를 반환하면 Ctrl+C를 사용하여 로컬 프로그램을 종료합니다.

작업 및 파이프라인 진행 상황 관찰

작업 진행 상황은 Dataflow 콘솔에서 확인할 수 있습니다.

Dataflow 콘솔로 이동

작업 진행 상황 관찰

작업 세부정보 보기를 열어 다음을 확인합니다.

  • 작업 구조
  • 작업 로그
  • 단계 측정항목

작업 진행 상황 관찰

Cloud Storage에서 출력 파일을 보려면 몇 분 정도 기다려야 할 수 있습니다.

작업 진행 상황 관찰

또는 아래 명령줄을 사용하여 어떤 파일이 작성되었는지 확인할 수 있습니다.

gcloud storage ls gs://${BUCKET_NAME}/samples/

다음과 유사하게 출력됩니다.

자바

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 Google Cloud 프로젝트를 삭제하면 됩니다.

  1. Cloud Scheduler 작업 삭제

    gcloud scheduler jobs delete publisher-job --location=$REGION
  2. Dataflow 콘솔에서 작업을 중지합니다. 파이프라인을 드레이닝하지 않고 파이프라인을 취소합니다.

  3. 주제를 삭제합니다.

    gcloud pubsub topics delete $TOPIC_ID
  4. 파이프라인에서 만든 파일을 삭제합니다.

    gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error
    gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
  5. Cloud Storage 버킷을 제거합니다.

    gcloud storage rm gs://${BUCKET_NAME} --recursive

  6. 서비스 계정을 삭제합니다.
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

다음 단계