Apache Spark を使用して Pub/Sub Lite メッセージを書き込む

Pub/Sub Lite Spark Connector は、Pub/Sub Lite の Apache Spark Structured Streaming の入出力ソースとしての使用をサポートするオープンソース Java クライアント ライブラリです。コネクタは、Dataproc を含むすべての Apache Spark ディストリビューションで機能します。

このクイックスタートでは、次の方法について説明します。

  • Pub/Sub Lite からメッセージを読み取る
  • Pub/Sub Lite にメッセージを書き込む

Dataproc Spark クラスタからの PySpark の使用。

準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Pub/Sub Lite, Dataproc, Cloud Storage, Logging API を有効にします。

    API を有効にする

  5. Google Cloud CLI をインストールします。
  6. gcloud CLI を初期化するには:

    gcloud init
  7. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  8. Google Cloud プロジェクトで課金が有効になっていることを確認します

  9. Pub/Sub Lite, Dataproc, Cloud Storage, Logging API を有効にします。

    API を有効にする

  10. Google Cloud CLI をインストールします。
  11. gcloud CLI を初期化するには:

    gcloud init

設定

  1. プロジェクトの変数を作成します。

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Cloud Storage バケットを作成します。Cloud Storage バケット名は、グローバルに一意である必要があります。

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. サポートされているロケーションで Pub/Sub Lite トピックとサブスクリプションを作成します。Pub/Sub Lite 予約を使用する場合は、トピックの作成をご覧ください。

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Dataproc クラスタを作成します。

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: Pub/Sub Lite トピックとサブスクリプションが存在する、サポートされている Dataproc のリージョン
    • --image-version: クラスタにインストールされている Apache Spark のバージョンを決定するクラスタのイメージ バージョン。現在、Pub/Sub Lite Spark コネクタは Apache Spark 3.xx をサポートしているため、2.xx イメージ リリース バージョンを選択します。
    • --scopes: 同じプロジェクトの Google Cloud サービスへの API アクセスを有効にします。
    • --enable-component-gateway: Apache Spark ウェブ UI へのアクセスを有効にします。
    • --bucket: クラスタジョブの依存関係、ドライバ出力、クラスタ構成ファイルの保存に使用するステージング Cloud Storage バケット。
  5. クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Pub/Sub Lite への書き込み

次の例では、以下を行います。

  • spark.sql.Row としてフォーマットされた連続する数値とタイムスタンプを生成するレートソースを作成する
  • Pub/Sub Lite Spark Connector の writeStream API によって、必要なテーブル スキーマに合わせてデータを変換する
  • 既存の Pub/Sub Lite トピックにデータを書き込む
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

書き込みジョブを Dataproc に送信するには、以下を行います。

Console

  1. PySpark スクリプトを Cloud Storage バケットにアップロードします。
    1. Cloud Storage コンソール に移動します。
    2. バケットを選択します。
    3. [ファイルをアップロード] を使用して、使用したい PySpark スクリプトをアップロードします。
  2. Dataproc クラスタにジョブを送信します。
    1. Dataproc コンソールに移動します。
    2. ジョブに移動します。
    3. [ジョブを送信] をクリックします。
    4. ジョブの詳細を入力します。
    5. [クラスタ] で、クラスタを選択します。
    6. [ジョブ] で、ジョブ ID に名前を付けます。
    7. [ジョブタイプ] で、PySpark を選択します。
    8. [メインの Python ファイル] の場合は、gs:// で始まるアップロード済みの PySpark スクリプトの gsutil URI を指定します。
    9. [Jar ファイル] で、Maven から最新の Spark コネクタ バージョンを選択し、ダウンロード オプションに依存関係がある jar を探してそのリンクをコピーします。
    10. 引数については、GitHub から完全な PySpark スクリプトを使用する場合は、--project_number=PROJECT_NUMBER--location=PUBSUBLITE_LOCATION--topic_id=TOPIC_ID を入力します。To-Do が完了した上記の PySpark スクリプトをコピーする場合は、空白のままにします。
    11. [プロパティ] で、キー spark.master と値 yarn を入力します。
    12. [送信] をクリックします。

gcloud

gcloud dataproc jobs submit pyspark コマンドを使用して、ジョブを Dataproc に送信します。

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: 事前に選択されている Dataproc のリージョン
  • --cluster: Dataproc のクラスタ名。
  • --jars: Cloud Storage の公開バケット内の依存関係を持つ Pub/Sub Lite Spark コネクタの uber jar。Maven の依存関係を含む uber jar をダウンロードするには、こちらのリンクにアクセスしてください。
  • --driver-log-levels: ルートレベルでロギングレベルを INFO に設定します。
  • --properties: Spark マスターに YARN リソース マネージャーを使用します。
  • --: スクリプトで必要な引数を指定します。

writeStream オペレーションが成功すると、以下のようなログメッセージがローカルおよび Google Cloud コンソールのジョブの詳細ページに表示されます。

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Pub/Sub Lite からの読み取り

次の例では、readStream API を使用して既存の Pub/Sub Lite サブスクリプションからメッセージを読み取ります。コネクタは、spark.sql.Row という形式の固定テーブル スキーマに準拠するメッセージを出力します。

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

読み取りジョブを Dataproc に送信するには、以下を行います。

Console

  1. PySpark スクリプトを Cloud Storage バケットにアップロードします。
    1. Cloud Storage コンソール に移動します。
    2. バケットを選択します。
    3. [ファイルをアップロード] を使用して、使用したい PySpark スクリプトをアップロードします。
  2. Dataproc クラスタにジョブを送信します。
    1. Dataproc コンソールに移動します。
    2. ジョブに移動します。
    3. [ジョブを送信] をクリックします。
    4. ジョブの詳細を入力します。
    5. [クラスタ] で、クラスタを選択します。
    6. [ジョブ] で、ジョブ ID に名前を付けます。
    7. [ジョブタイプ] で、PySpark を選択します。
    8. [メインの Python ファイル] の場合は、gs:// で始まるアップロード済みの PySpark スクリプトの gsutil URI を指定します。
    9. [Jar ファイル] で、Maven から最新の Spark コネクタ バージョンを選択し、ダウンロード オプションに依存関係がある jar を探してそのリンクをコピーします。
    10. 引数については、GitHub から完全な PySpark スクリプトを使用する場合は、--project_number=PROJECT_NUMBER--location=PUBSUBLITE_LOCATION--subscription_id=SUBSCRIPTION_ID を入力します。To-Do が完了した上記の PySpark スクリプトをコピーする場合は、空白のままにします。
    11. [プロパティ] で、キー spark.master と値 yarn を入力します。
    12. [送信] をクリックします。

gcloud

gcloud dataproc jobs submit pyspark コマンドを再び使用して、ジョブを Dataproc に送信します。

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: 事前に選択されている Dataproc のリージョン
  • --cluster: Dataproc のクラスタ名。
  • --jars: Cloud Storage の公開バケット内の依存関係を持つ Pub/Sub Lite Spark コネクタの uber jar。Maven の依存関係を含む uber jar をダウンロードするには、こちらのリンクにアクセスしてください。
  • --driver-log-levels: ルートレベルでロギングレベルを INFO に設定します。
  • --properties: Spark マスターに YARN リソース マネージャーを使用します。
  • --: スクリプトに必要な引数を指定します。

readStream オペレーションが成功すると、以下のようなログメッセージがローカルおよび Google Cloud コンソールのジョブの詳細ページに表示されます。

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Pub/Sub Lite のメッセージを再生、削除する

Pub/Sub Lite Spark コネクタを使用して Pub/Sub Lite から読み取る場合、Apache Spark システムがパーティション内のオフセットを独自に追跡するため、シーク オペレーションは機能しません。回避策は、ワークフローをドレイン、シークして再起動することです。

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

  1. トピックとサブスクリプションを削除します。

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Dataproc クラスタを削除します。

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Cloud Storage バケットを削除します。

    gsutil rb gs://$BUCKET
    

次のステップ