Apache Spark를 사용하여 Pub/Sub 라이트 메시지 쓰기

Pub/Sub Lite Spark 커넥터는 Pub/Sub Lite를 Apache Spark Structured Streaming에 대한 입력 및 출력 소스로 사용하도록 지원하는 오픈소스 자바 클라이언트 라이브러리입니다. 커넥터는 Dataproc을 포함한 모든 Apache Spark 배포판에서 작동합니다.

이 빠른 시작에서는 다음 방법을 보여줍니다.

  • Pub/Sub Lite에서 메시지 읽기
  • Pub/Sub Lite에 메시지 쓰기

Dataproc Spark 클러스터에서 PySpark를 사용하여 위의 작업을 수행합니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  4. API Pub/Sub Lite, Dataproc, Cloud Storage, Logging 사용 설정

    API 사용 설정

  5. Google Cloud CLI를 설치합니다.
  6. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  7. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  8. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  9. API Pub/Sub Lite, Dataproc, Cloud Storage, Logging 사용 설정

    API 사용 설정

  10. Google Cloud CLI를 설치합니다.
  11. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init

설정

  1. 프로젝트의 변수를 만듭니다.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Cloud Storage 버킷을 만듭니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다.

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. 지원되는 위치에서 Pub/Sub Lite 주제 및 구독을 만듭니다. Pub/Sub 라이트 예약을 사용하는 경우 주제 만들기를 참조하세요.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Dataproc 클러스터를 만듭니다.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: Pub/Sub Lite 주제 및 구독이 위치한 지원되는 Dataproc 리전입니다.
    • --image-version: 클러스터에 설치된 Apache Spark 버전을 결정하는 클러스터의 이미지 버전입니다. Pub/Sub Lite Spark 커넥터는 현재 Apache Spark 3.x.x를 지원하므로 2.x.x 이미지 출시 버전을 선택합니다.
    • --scopes: 동일한 프로젝트에서 Google Cloud 서비스에 대한 API 액세스를 사용 설정합니다.
    • --enable-component-gateway: Apache Spark 웹 UI에 대한 액세스를 사용 설정합니다.
    • --bucket: 클러스터 작업 종속 항목, 드라이버 출력, 클러스터 구성 파일을 저장하는 데 사용되는 스테이징 Cloud Storage 버킷입니다.
  5. 빠른 시작 저장소를 클론하고 샘플 코드 디렉터리로 이동합니다.

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Pub/Sub Lite에 쓰기

아래 예시는 다음을 수행합니다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Dataproc에 쓰기 작업을 제출하려면 다음 안내를 따르세요.

콘솔

  1. Cloud Storage 버킷에 PySpark 스크립트를 업로드합니다.
    1. Cloud Storage Console로 이동합니다.
    2. 버킷을 선택합니다.
    3. 파일 업로드를 사용하여 사용하려는 PySpark 스크립트를 업로드합니다.
  2. Dataproc 클러스터에 작업을 제출합니다.
    1. Dataproc 콘솔로 이동합니다.
    2. 작업으로 이동합니다.
    3. 작업 제출을 클릭합니다.
    4. 작업 세부정보를 입력합니다.
    5. 클러스터에서 클러스터를 선택합니다.
    6. 작업에서 작업 ID에 이름을 지정합니다.
    7. 작업 유형으로 PySpark를 선택합니다.
    8. 기본 python 파일의 경우 gs://로 시작하는 업로드된 PySpark 스크립트의 gsutil URI를 입력합니다.
    9. Jar 파일에서 Maven의 최신 Spark 커넥터 버전을 선택하고 다운로드 옵션에서 종속 항목이 있는 jar을 찾고 링크를 복사합니다.
    10. 인수의 경우 GitHub의 전체 PySpark 스크립트를 사용한다면 --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID를 입력합니다. 위의 PySpark 스크립트를 복사하고 할 일을 완료한 경우 비워 둡니다.
    11. 속성spark.master 키와 yarn 값을 입력합니다.
    12. 제출을 클릭합니다.

gcloud

gcloud dataproc jobs submit pyspark 명령어를 사용하여 작업을 Dataproc에 제출합니다.

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: 미리 선택된 Dataproc 리전입니다.
  • --cluster: Dataproc 클러스터 이름입니다.
  • --jars: 공개 Cloud Storage 버킷에서 종속 항목이 있는 Pub/Sub Lite Spark 커넥터의 uber jar입니다. 이 링크에서 Maven의 종속 항목이 포함된 uber jar을 다운로드할 수도 있습니다.
  • --driver-log-levels: 루트 수준에서 로깅 수준을 INFO로 설정합니다.
  • --properties: Spark 마스터에 YARN 리소스 관리자를 사용합니다.
  • --: 스크립트에 필요한 인수를 제공합니다.

writeStream 작업이 성공하면 로컬뿐만 아니라 Google Cloud 콘솔의 작업 세부정보 페이지에도 다음과 같은 로그 메시지가 표시됩니다.

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Pub/Sub Lite에서 읽기

다음 예시는 readStream API를 사용하여 기존 Pub/Sub Lite 구독에서 메시지를 읽습니다. 커넥터는 spark.sql.Row 형식의 고정 테이블 스키마를 준수하는 메시지를 출력합니다.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Dataproc에 읽기 작업을 제출하려면 다음 안내를 따르세요.

콘솔

  1. Cloud Storage 버킷에 PySpark 스크립트를 업로드합니다.
    1. Cloud Storage Console로 이동합니다.
    2. 버킷을 선택합니다.
    3. 파일 업로드를 사용하여 사용하려는 PySpark 스크립트를 업로드합니다.
  2. Dataproc 클러스터에 작업을 제출합니다.
    1. Dataproc 콘솔로 이동합니다.
    2. 작업으로 이동합니다.
    3. 작업 제출을 클릭합니다.
    4. 작업 세부정보를 입력합니다.
    5. 클러스터에서 클러스터를 선택합니다.
    6. 작업에서 작업 ID에 이름을 지정합니다.
    7. 작업 유형으로 PySpark를 선택합니다.
    8. 기본 python 파일의 경우 gs://로 시작하는 업로드된 PySpark 스크립트의 gsutil URI를 입력합니다.
    9. Jar 파일에서 Maven의 최신 Spark 커넥터 버전을 선택하고 다운로드 옵션에서 종속 항목이 있는 jar을 찾고 링크를 복사합니다.
    10. 인수의 경우 GitHub의 전체 PySpark 스크립트를 사용한다면 --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID를 입력합니다. 위의 PySpark 스크립트를 복사하고 할 일을 완료한 경우 비워 둡니다.
    11. 속성spark.master 키와 yarn 값을 입력합니다.
    12. 제출을 클릭합니다.

gcloud

gcloud dataproc jobs submit pyspark 명령어를 다시 사용하여 작업을 Dataproc에 제출합니다.

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: 미리 선택된 Dataproc 리전입니다.
  • --cluster: Dataproc 클러스터 이름입니다.
  • --jars: 공개 Cloud Storage 버킷에서 종속 항목이 있는 Pub/Sub Lite Spark 커넥터의 uber jar입니다. 이 링크에서 Maven의 종속 항목이 포함된 uber jar을 다운로드할 수도 있습니다.
  • --driver-log-levels: 루트 수준에서 로깅 수준을 INFO로 설정합니다.
  • --properties: Spark 마스터에 YARN 리소스 관리자를 사용합니다.
  • --: 스크립트에 필요한 인수를 제공합니다.

readStream 작업이 성공하면 로컬뿐만 아니라 Google Cloud 콘솔의 작업 세부정보 페이지에도 다음과 같은 로그 메시지가 표시됩니다.

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Pub/Sub 라이트에서 메시지 재생 및 삭제

Apache Spark 시스템이 파티션 내에서 오프셋 추적을 직접 수행하므로 Pub/Sub Lite Spark 커넥터를 사용하여 Pub/Sub 라이트에서 읽을 때 탐색 작업이 작동하지 않습니다. 해결 방법은 워크플로를 드레이닝하고 탐색한 후 다시 시작하는 것입니다.

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 수행합니다.

  1. 주제 및 구독을 삭제합니다.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Dataproc 클러스터를 삭제합니다.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Cloud Storage 버킷을 제거합니다.

    gsutil rb gs://$BUCKET
    

다음 단계