Apache Spark を使用して Pub/Sub Lite メッセージを書き込む

Pub/Sub Lite Spark Connector は、Pub/Sub Lite の Apache Spark Structured Streaming の入出力ソースとしての使用をサポートするオープンソース Java クライアント ライブラリです。コネクタは、Dataproc を含むすべての Apache Spark ディストリビューションで機能します。

このクイックスタートでは、次の方法について説明します。

  • Pub/Sub Lite からメッセージを読み取る
  • Pub/Sub Lite にメッセージを書き込む

Dataproc Spark クラスタからの PySpark の使用。

準備

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Pub/Sub Lite, Dataproc, Cloud Storage, Logging API を有効にします。

    API を有効にする

  5. Google Cloud CLI をインストールします。
  6. gcloud CLI を初期化するには:

    gcloud init
  7. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  8. Google Cloud プロジェクトで課金が有効になっていることを確認します

  9. Pub/Sub Lite, Dataproc, Cloud Storage, Logging API を有効にします。

    API を有効にする

  10. Google Cloud CLI をインストールします。
  11. gcloud CLI を初期化するには:

    gcloud init

設定

  1. プロジェクトの変数を作成します。

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Cloud Storage バケットを作成します。Cloud Storage バケット名は、グローバルに一意である必要があります。

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. サポートされているロケーションで Pub/Sub Lite トピックとサブスクリプションを作成します。Pub/Sub Lite 予約を使用する場合は、トピックの作成をご覧ください。

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Dataproc クラスタを作成します。

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: Pub/Sub Lite トピックとサブスクリプションが存在する、サポートされている Dataproc のリージョン
    • --image-version: クラスタにインストールされている Apache Spark のバージョンを決定するクラスタのイメージ バージョン。現在、Pub/Sub Lite Spark コネクタは Apache Spark 3.xx をサポートしているため、2.xx イメージ リリース バージョンを選択します。
    • --scopes: 同じプロジェクトの Google Cloud サービスへの API アクセスを有効にします。
    • --enable-component-gateway: Apache Spark ウェブ UI へのアクセスを有効にします。
    • --bucket: クラスタジョブの依存関係、ドライバ出力、クラスタ構成ファイルの保存に使用するステージング Cloud Storage バケット。
  5. クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Pub/Sub Lite への書き込み

次の例では、以下を行います。

  • spark.sql.Row としてフォーマットされた連続する数値とタイムスタンプを生成するレートソースを作成する
  • Pub/Sub Lite Spark Connector の writeStream API によって、必要なテーブル スキーマに合わせてデータを変換する
  • 既存の Pub/Sub Lite トピックにデータを書き込む
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

書き込みジョブを Dataproc に送信するには、以下を行います。

Console

  1. PySpark スクリプトを Cloud Storage バケットにアップロードします。
    1. Cloud Storage コンソール に移動します。
    2. バケットを選択します。
    3. [ファイルをアップロード] を使用して、使用したい PySpark スクリプトをアップロードします。
  2. Dataproc クラスタにジョブを送信します。
    1. Dataproc コンソールに移動します。
    2. ジョブに移動します。
    3. [ジョブを送信] をクリックします。
    4. ジョブの詳細を入力します。
    5. [クラスタ] で、クラスタを選択します。
    6. [ジョブ] で、ジョブ ID に名前を付けます。
    7. [ジョブタイプ] で、PySpark を選択します。
    8. [メインの Python ファイル] の場合は、gs:// で始まるアップロード済みの PySpark スクリプトの gsutil URI を指定します。
    9. [Jar ファイル] で、Maven から最新の Spark コネクタ バージョンを選択し、ダウンロード オプションに依存関係がある jar を探してそのリンクをコピーします。
    10. 引数については、GitHub から完全な PySpark スクリプトを使用する場合は、--project_number=PROJECT_NUMBER--location=PUBSUBLITE_LOCATION--topic_id=TOPIC_ID を入力します。To-Do が完了した上記の PySpark スクリプトをコピーする場合は、空白のままにします。
    11. [プロパティ] で、キー spark.master と値 yarn を入力します。
    12. [送信] をクリックします。

gcloud

gcloud dataproc jobs submit pyspark コマンドを使用して、ジョブを Dataproc に送信します。

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: 事前に選択されている Dataproc のリージョン
  • --cluster: Dataproc のクラスタ名。
  • --jars: Cloud Storage の公開バケット内の依存関係を持つ Pub/Sub Lite Spark コネクタの uber jar。Maven の依存関係を含む uber jar をダウンロードするには、こちらのリンクにアクセスしてください。
  • --driver-log-levels: ルートレベルでロギングレベルを INFO に設定します。
  • --properties: Spark マスターに YARN リソース マネージャーを使用します。
  • --: スクリプトで必要な引数を指定します。

writeStream オペレーションが成功すると、以下のようなログメッセージがローカルおよび Google Cloud コンソールのジョブの詳細ページに表示されます。

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Pub/Sub Lite からの読み取り

次の例では、readStream API を使用して既存の Pub/Sub Lite サブスクリプションからメッセージを読み取ります。コネクタは、spark.sql.Row という形式の固定テーブル スキーマに準拠するメッセージを出力します。

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

読み取りジョブを Dataproc に送信するには、以下を行います。

Console

  1. PySpark スクリプトを Cloud Storage バケットにアップロードします。
    1. Cloud Storage コンソール に移動します。
    2. バケットを選択します。
    3. [ファイルをアップロード] を使用して、使用したい PySpark スクリプトをアップロードします。
  2. Dataproc クラスタにジョブを送信します。
    1. Dataproc コンソールに移動します。
    2. ジョブに移動します。
    3. [ジョブを送信] をクリックします。
    4. ジョブの詳細を入力します。
    5. [クラスタ] で、クラスタを選択します。
    6. [ジョブ] で、ジョブ ID に名前を付けます。
    7. [ジョブタイプ] で、PySpark を選択します。
    8. [メインの Python ファイル] の場合は、gs:// で始まるアップロード済みの PySpark スクリプトの gsutil URI を指定します。
    9. [Jar ファイル] で、Maven から最新の Spark コネクタ バージョンを選択し、ダウンロード オプションに依存関係がある jar を探してそのリンクをコピーします。
    10. 引数については、GitHub から完全な PySpark スクリプトを使用する場合は、--project_number=PROJECT_NUMBER--location=PUBSUBLITE_LOCATION--subscription_id=SUBSCRIPTION_ID を入力します。To-Do が完了した上記の PySpark スクリプトをコピーする場合は、空白のままにします。
    11. [プロパティ] で、キー spark.master と値 yarn を入力します。
    12. [送信] をクリックします。

gcloud

gcloud dataproc jobs submit pyspark コマンドを再び使用して、ジョブを Dataproc に送信します。

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: 事前に選択されている Dataproc のリージョン
  • --cluster: Dataproc のクラスタ名。
  • --jars: Cloud Storage の公開バケット内の依存関係を持つ Pub/Sub Lite Spark コネクタの uber jar。Maven の依存関係を含む uber jar をダウンロードするには、こちらのリンクにアクセスしてください。
  • --driver-log-levels: ルートレベルでロギングレベルを INFO に設定します。
  • --properties: Spark マスターに YARN リソース マネージャーを使用します。
  • --: スクリプトに必要な引数を指定します。

readStream オペレーションが成功すると、以下のようなログメッセージがローカルおよび Google Cloud コンソールのジョブの詳細ページに表示されます。

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Pub/Sub Lite のメッセージを再生、削除する

Pub/Sub Lite Spark コネクタを使用して Pub/Sub Lite から読み取る場合、Apache Spark システムがパーティション内のオフセットを独自に追跡するため、シーク オペレーションは機能しません。回避策は、ワークフローをドレイン、シークして再起動することです。

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

  1. トピックとサブスクリプションを削除します。

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Dataproc クラスタを削除します。

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Cloud Storage バケットを削除します。

    gsutil rb gs://$BUCKET
    

次のステップ