Apache Spark를 사용하여 Pub/Sub 라이트 메시지 쓰기
Pub/Sub 라이트 Spark 커넥터는 Pub/Sub 라이트를 Apache Spark Structured Streaming에 대한 입력 및 출력 소스로 사용하도록 지원하는 오픈소스 Java 클라이언트 라이브러리입니다. 커넥터는 Dataproc을 포함한 모든 Apache Spark 배포판에서 작동합니다.
이 빠른 시작에서는 다음 방법을 보여줍니다.
- Pub/Sub Lite에서 메시지 읽기
- Pub/Sub Lite에 메시지 쓰기
Dataproc Spark 클러스터에서 PySpark를 사용하여 위의 작업을 수행합니다.
시작하기 전에
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
설정
프로젝트의 변수를 만듭니다.
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Cloud Storage 버킷을 만듭니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다.
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
지원되는 위치에서 Pub/Sub Lite 주제 및 구독을 만듭니다. Pub/Sub 라이트 예약을 사용하는 경우 주제 만들기를 참조하세요.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Dataproc 클러스터를 만듭니다.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: Pub/Sub Lite 주제 및 구독이 위치한 지원되는 Dataproc 리전입니다.--image-version
: 클러스터에 설치된 Apache Spark 버전을 결정하는 클러스터의 이미지 버전입니다. Pub/Sub Lite Spark 커넥터는 현재 Apache Spark 3.x.x를 지원하므로 2.x.x 이미지 출시 버전을 선택합니다.--scopes
: 동일한 프로젝트에서 Google Cloud 서비스에 대한 API 액세스를 사용 설정합니다.--enable-component-gateway
: Apache Spark 웹 UI에 대한 액세스를 사용 설정합니다.--bucket
: 클러스터 작업 종속 항목, 드라이버 출력, 클러스터 구성 파일을 저장하는 데 사용되는 스테이징 Cloud Storage 버킷입니다.
빠른 시작 저장소를 클론하고 샘플 코드 디렉터리로 이동합니다.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Pub/Sub Lite에 쓰기
아래 예시는 다음을 수행합니다.
- 연속된 숫자와 타임스탬프를
spark.sql.Row
형식으로 생성하는 비율 소스 만들기 - Pub/Sub 라이트 Spark 커넥터의
writeStream
API를 통해 필요한 테이블 스키마와 일치하도록 데이터 변환 - 기존 Pub/Sub Lite 주제에 데이터 쓰기
Dataproc에 쓰기 작업을 제출하려면 다음 안내를 따르세요.
콘솔
- Cloud Storage 버킷에 PySpark 스크립트를 업로드합니다.
- Cloud Storage Console로 이동합니다.
- 버킷을 선택합니다.
- 파일 업로드를 사용하여 사용하려는 PySpark 스크립트를 업로드합니다.
- Dataproc 클러스터에 작업을 제출합니다.
- Dataproc 콘솔로 이동합니다.
- 작업으로 이동합니다.
- 작업 제출을 클릭합니다.
- 작업 세부정보를 입력합니다.
- 클러스터에서 클러스터를 선택합니다.
- 작업에서 작업 ID에 이름을 지정합니다.
- 작업 유형으로 PySpark를 선택합니다.
- 기본 python 파일의 경우
gs://
로 시작하는 업로드된 PySpark 스크립트의 gcloud storage URI를 입력합니다. - jar 파일의 경우 Maven의 최신 Spark 커넥터 버전을 선택하고 다운로드 옵션에서 종속 항목이 있는 jar을 찾고 링크를 복사합니다.
- 인수의 경우 GitHub의 전체 PySpark 스크립트를 사용한다면
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID를 입력합니다. 위의 PySpark 스크립트를 복사하고 할 일을 완료한 경우 비워 둡니다. - 속성에
spark.master
키와yarn
값을 입력합니다. - 제출을 클릭합니다.
gcloud
gcloud dataproc jobs submit pyspark 명령어를 사용하여 작업을 Dataproc에 제출합니다.
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: 미리 선택된 Dataproc 리전입니다.--cluster
: Dataproc 클러스터 이름입니다.--jars
: 공개 Cloud Storage 버킷에서 종속 항목이 있는 Pub/Sub Lite Spark 커넥터의 uber jar입니다. 이 링크에서 Maven의 종속 항목이 포함된 uber jar을 다운로드할 수도 있습니다.--driver-log-levels
: 루트 수준에서 로깅 수준을 INFO로 설정합니다.--properties
: Spark 마스터에 YARN 리소스 관리자를 사용합니다.--
: 스크립트에 필요한 인수를 제공합니다.
writeStream
작업이 성공하면 로컬뿐만 아니라 Google Cloud 콘솔의 작업 세부정보 페이지에도 다음과 같은 로그 메시지가 표시됩니다.
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Pub/Sub Lite에서 읽기
다음 예시는 readStream
API를 사용하여 기존 Pub/Sub 라이트 구독에서 메시지를 읽습니다. 커넥터는 spark.sql.Row
형식의 고정 테이블 스키마를 준수하는 메시지를 출력합니다.
Dataproc에 읽기 작업을 제출하려면 다음 안내를 따르세요.
콘솔
- Cloud Storage 버킷에 PySpark 스크립트를 업로드합니다.
- Cloud Storage Console로 이동합니다.
- 버킷을 선택합니다.
- 파일 업로드를 사용하여 사용하려는 PySpark 스크립트를 업로드합니다.
- Dataproc 클러스터에 작업을 제출합니다.
- Dataproc 콘솔로 이동합니다.
- 작업으로 이동합니다.
- 작업 제출을 클릭합니다.
- 작업 세부정보를 입력합니다.
- 클러스터에서 클러스터를 선택합니다.
- 작업에서 작업 ID에 이름을 지정합니다.
- 작업 유형으로 PySpark를 선택합니다.
- 기본 python 파일의 경우
gs://
로 시작하는 업로드된 PySpark 스크립트의 gcloud storage URI를 입력합니다. - jar 파일의 경우 Maven의 최신 Spark 커넥터 버전을 선택하고 다운로드 옵션에서 종속 항목이 있는 jar을 찾고 링크를 복사합니다.
- 인수의 경우 GitHub의 전체 PySpark 스크립트를 사용한다면
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID를 입력합니다. 위의 PySpark 스크립트를 복사하고 할 일을 완료한 경우 비워 둡니다. - 속성에
spark.master
키와yarn
값을 입력합니다. - 제출을 클릭합니다.
gcloud
gcloud dataproc jobs submit pyspark 명령어를 다시 사용하여 작업을 Dataproc에 제출합니다.
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: 미리 선택된 Dataproc 리전입니다.--cluster
: Dataproc 클러스터 이름입니다.--jars
: 공개 Cloud Storage 버킷에서 종속 항목이 있는 Pub/Sub Lite Spark 커넥터의 uber jar입니다. 이 링크에서 Maven의 종속 항목이 포함된 uber jar을 다운로드할 수도 있습니다.--driver-log-levels
: 루트 수준에서 로깅 수준을 INFO로 설정합니다.--properties
: Spark 마스터에 YARN 리소스 관리자를 사용합니다.--
: 스크립트에 필요한 인수를 제공합니다.
readStream
작업이 성공하면 로컬뿐만 아니라 Google Cloud 콘솔의 작업 세부정보 페이지에도 다음과 같은 로그 메시지가 표시됩니다.
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Pub/Sub 라이트에서 메시지 재생 및 삭제
Apache Spark 시스템이 파티션 내에서 오프셋 추적을 직접 수행하므로 Pub/Sub Lite Spark 커넥터를 사용하여 Pub/Sub 라이트에서 읽을 때 탐색 작업이 작동하지 않습니다. 해결 방법은 워크플로를 드레이닝하고 탐색한 후 다시 시작하는 것입니다.
삭제
이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 수행합니다.
주제 및 구독을 삭제합니다.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Dataproc 클러스터를 삭제합니다.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Cloud Storage 버킷을 제거합니다.
gcloud storage rm gs://$BUCKET
다음 단계
Pub/Sub 라이트 Spark 커넥터의 Java 단어 수 예시 알아보기
Dataproc 작업 드라이버 출력에 액세스하는 방법 알아보기
Google Cloud 제품별 기타 Spark 커넥터: BigQuery 커넥터, Bigtable 커넥터, Cloud Storage 커넥터