Dataflow를 사용하여 Pub/Sub Lite 메시지 스트리밍

자체 데이터 처리 프로그램을 작성하고 실행하는 대신 Apache Beam용 Pub/Sub 라이트 I/O 커넥터와 함께 Dataflow를 사용할 수 있습니다. Dataflow는 신뢰성과 표현 능력은 그대로 유지하면서 스트리밍(실시간) 및 일괄 모드에서 데이터를 변환하고 강화하는 완전 관리형 서비스입니다. 확장 가능한 강력한 스테이트풀(Stateful) 처리 추상화 세트와 다른 스트리밍 및 일괄 시스템에 대한 I/O 커넥터가 있는 Apache Beam SDK를 사용하여 개발된 프로그램을 안정적으로 실행합니다.

이 빠른 시작에서는 다음과 같은 작업을 수행하는 Apache Beam 파이프라인을 작성하는 방법을 보여줍니다.

  • Pub/Sub 라이트에서 메시지 읽기
  • 게시 타임스탬프를 기준으로 메시지 기간 설정 또는 그룹화
  • Cloud Storage에 메시지 쓰기

또한 다음을 수행하는 방법도 보여줍니다.

  • Dataflow에서 실행할 파이프라인 제출
  • 파이프라인에서 Dataflow Flex 템플릿 만들기

이 튜토리얼에는 Maven이 필요하지만 프로젝트 예시를 Maven에서 Gradle로 변환할 수도 있습니다. 자세한 내용은 선택사항: Maven에서 Gradle로 변환을 참조하세요.

시작하기 전에

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login

Pub/Sub Lite 프로젝트 설정

  1. Cloud Storage 버킷, 프로젝트, Dataflow 리전의 변수를 만듭니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다. Dataflow 리전은 작업을 실행할 수 있는 유효한 리전이어야 합니다. 리전과 위치에 대한 자세한 내용은 Dataflow 위치를 참조하세요.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
  2. 이 프로젝트가 소유한 Cloud Storage 버킷을 만듭니다.

       gcloud storage buckets create gs://$BUCKET

Pub/Sub 라이트 영역별 라이트 주제 및 구독 만들기

영역별 라이트 Pub/Sub 라이트 주제 및 라이트 구독을 만듭니다.

Lite 위치에 지원되는 Pub/Sub Lite 위치를 선택합니다. 리전의 영역도 지정해야 합니다. 예를 들면 us-central1-a입니다.

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Dataflow로 메시지 스트리밍

빠른 시작 샘플 코드 다운로드

빠른 시작 저장소를 클론하고 샘플 코드 디렉터리로 이동합니다.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

샘플 코드

이 샘플 코드에서는 Dataflow를 사용하여 다음을 수행합니다.

  • Pub/Sub 라이트 구독의 메시지를 제한되지 않은 소스로 읽습니다.
  • 고정 시간 기간기본 트리거를 사용하여 게시 타임스탬프를 기준으로 메시지를 그룹화합니다.
  • 그룹화된 메시지를 Cloud Storage의 파일에 작성합니다.

자바

이 샘플을 실행하기 전에 Pub/Sub 라이트 클라이언트 라이브러리의 자바 설정 안내를 따르세요.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Dataflow 파이프라인 시작

Dataflow에서 파이프라인을 시작하려면 다음 명령어를 실행하세요.

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

앞의 명령어는 Dataflow 작업을 시작합니다. 콘솔 출력의 링크를 따라 Dataflow 모니터링 콘솔에서 작업에 액세스합니다.

작업 진행률 관찰

Dataflow 콘솔에서 작업의 진행률을 관찰합니다.

Dataflow 콘솔로 이동

작업 세부정보 보기를 열어 다음을 확인합니다.

  • 작업 그래프
  • 실행 세부정보
  • 작업 측정항목

라이트 주제에 일부 메시지를 게시합니다.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

작업자 로그에서 메시지를 보려면 몇 분 정도 기다려야 할 수 있습니다.

아래 명령어를 사용하여 Cloud Storage에 기록된 파일을 확인합니다.

gcloud storage ls "gs://$BUCKET/samples/"

다음과 유사하게 출력됩니다.

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

아래 명령어를 사용하여 파일의 콘텐츠를 확인합니다.

gcloud storage cat "gs://$BUCKET/samples/your-filename"

선택사항: Dataflow 템플릿 만들기

파이프라인을 기반으로 커스텀 Dataflow Flex 템플릿을 만들 수 있습니다. Dataflow 템플릿을 사용하면 전체 자바 개발 환경을 설정하지 않고도 Google Cloud 콘솔 또는 명령줄에서 다른 입력 매개변수를 사용하여 작업을 실행할 수 있습니다.

  1. 파이프라인의 모든 종속 항목이 포함된 팻 JAR을 만듭니다. 명령어가 실행되면 target/pubsublite-streaming-bundled-1.0.jar이 표시됩니다.

    mvn clean package -DskipTests=true
  2. 템플릿 파일 및 템플릿 컨테이너 이미지의 이름과 위치를 제공합니다.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
  3. 커스텀 Flex 템플릿을 빌드합니다. 작업을 실행하는 데 필요한 사양이 포함된 필수 metadata.json 파일이 예시와 함께 제공됩니다.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
  4. 커스텀 Flex 템플릿을 사용하여 작업을 실행합니다.

Console

  1. 템플릿에서 작업 만들기.

  2. 작업 이름을 입력합니다.

  3. Dataflow 리전을 입력합니다.

  4. 커스텀 템플릿을 선택합니다.

  5. 템플릿 경로를 입력합니다.

  6. 필수 매개변수를 입력합니다.

  7. 작업 실행을 클릭합니다.

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \ 
     --serviceAccount=$SERVICE_ACCOUNT

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 Google Cloud 프로젝트를 삭제하면 됩니다.

  1. Dataflow 콘솔에서 작업을 중지합니다. 파이프라인을 드레이닝하는 대신 파이프라인을 취소합니다.

  2. 주제 및 구독을 삭제합니다.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
  3. 파이프라인에서 만든 파일을 삭제합니다.

    gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
    gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
  4. 템플릿 이미지와 템플릿 파일(있는 경우)을 삭제합니다.

    gcloud container images delete $TEMPLATE_IMAGE
    gcloud storage rm $TEMPLATE_PATH
  5. Cloud Storage 버킷을 제거합니다.

    gcloud storage rm gs://$BUCKET --recursive

  6. 서비스 계정을 삭제합니다.
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

다음 단계