Dataproc と Pub/Sub で Apache Spark DStreams を使用する

このチュートリアルでは、Apache Spark DStreams アプリを Dataproc にデプロイし、Pub/Sub からのメッセージをほぼリアルタイムに処理する方法を説明します。Spark には、ストリーミング用の API として、オリジナルの Discretized Streams API(DStreams)と、より新しい Structured Streaming API の 2 つがあります。Structured Streaming API は Spark 2.0 でアルファ版リリースとして追加され、Spark 2.2 で安定版リリースとなりました。Structured Streaming には、イベント時オペレーションデータセットとデータフレームの抽象化といった新しい重要な機能がいくつか用意されていますが、制限事項もあります。たとえば、並べ替えや複数のストリーミングの集約などのオペレーションは、Structured Streaming ではまだサポートしていません。

このチュートリアルでは、DStreams API と Google Cloud で利用可能なフルマネージド ソリューションを使用するアプリのデプロイ方法に焦点を合わせています。

目標

  • 数千件のシミュレーション ツイートを生成し、それらのツイートを個々のメッセージとして Pub/Sub にパブリッシュする。
  • Spark ストリーミング アプリを Dataproc にデプロイし、生成されたツイートをほぼリアルタイムに処理してトレンドのハッシュタグを識別できるようにする。
  • ストリーミング オペレーションの出力を Datastore に保存する。
  • HTTP 関数Cloud Functions にデプロイし、最新のトレンド ハッシュタグがウェブページ上に表示されるようにする。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただけます。

リファレンス アーキテクチャ

次の図は、システム内での情報のフローを示しています。

システム内での情報のフロー

  1. Tweet Generator プログラムが、数千件のシミュレーション ツイートを生成し、それらのツイートを Pub/Sub の tweets トピックにパブリッシュします。
  2. Pub/Sub のトピックが新しく受信したツイートを伝播し、このトピックをサブスクライブしている Spark ストリーミング アプリがそれらのツイートを pull します。
  3. Spark ストリーミング アプリは、最新のツイートを定期的に(たとえば 20 秒間隔で)pull します。その後、ハッシュタグを抽出し、スライディング ウィンドウ上(過去 60 秒間など)でカウントします。
  4. 各スライディング ウィンドウの最後に、Spark ストリーミング アプリは最新のトレンド ハッシュタグを永続保存のために Datastore に保存しています。
  5. ユーザーがウェブブラウザを開き、Cloud Functions にデプロイされている HTTP 関数が提供するページにアクセスします。
  6. HTTP 関数が、Datastore から最新のトレンド ハッシュタグを読み取ります。
  7. HTTP 関数が、トレンド ハッシュタグを表示する HTML ページを生成します。
  8. HTTP 関数が HTML ページを返し、そのページをユーザーのブラウザに表示します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    [プロジェクトの選択] ページに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Shell インスタンスを起動します。このチュートリアルでは最後まで Cloud Shell を使用します。
    Cloud Shell を開く
  5. Cloud Shell で、Dataproc、Pub/Sub、Cloud Functions、Datastore API を有効にします。
    gcloud services enable \
        dataproc.googleapis.com \
        pubsub.googleapis.com \
        cloudfunctions.googleapis.com \
        datastore.googleapis.com
  6. プロジェクトの Datastore データベースをアクティブにします。
    gcloud app create --region=us-central

    注: Datastore にはアクティブな App Engine アプリケーションが必要なので、上記のコマンドを使用してそのアプリケーションを作成する必要があります。

  7. このチュートリアルの Git リポジトリのクローンを作成します。
    git clone https://github.com/GoogleCloudPlatform/dataproc-pubsub-spark-streaming
    
  8. リポジトリのルートに移動し、このパスを後で使用できるよう環境変数に保存します。
    cd dataproc-pubsub-spark-streaming
    export REPO_ROOT=$PWD
    

Pub/Sub トピックとサブスクリプションを作成する

Pub/Sub のトピックとサブスクリプションを設定するには、次の手順に従います。

  1. tweets トピックを作成します:

    gcloud pubsub topics create tweets
  2. tweets トピックの tweets-subscription サブスクリプションを作成します。

    gcloud pubsub subscriptions create tweets-subscription --topic=tweets
    

Cloud Pub/Sub の tweets トピックが、後で Tweet Generator によってパブリッシュされる新しいメッセージを受け入れられる状態になりました。また、Pub/Sub の tweets-subscription サブスクリプションも Spark ストリーミング アプリから pull できる状態になりました。次の図は、上記のコマンドを実行した後の新しい状態を示しています。

Pub/Sub のトピックとサブスクリプションを作成した後の新しい状態

Dataproc のサービス アカウントを作成する

このセクションでは、Dataproc クラスタが使用できるサービス アカウントを作成します。また、クラスタ インスタンスが Pub/Sub と Datastore にアクセスするために必要な権限を割り当てます。

  1. サービス アカウントを作成します。

    export SERVICE_ACCOUNT_NAME="dataproc-service-account"
    gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
    
  2. サービス アカウントがクラスタを作成してジョブを実行できるように、Dataproc ワーカーの IAM ロールを追加します。

    export PROJECT=$(gcloud info --format='value(config.project)')
    gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com"
    
  3. サービス アカウントがデータベースに対する読み書きを行えるように、Datastore ユーザーの IAM ロールを追加します。

    gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/datastore.user \
        --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com"
    
  4. サービス アカウントが Pub/Sub の tweets-subscription サブスクリプションに登録できるように Pub/Sub サブスクライバー IAM ロールを追加します。

    gcloud beta pubsub subscriptions add-iam-policy-binding \
        tweets-subscription \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com"
    

Dataproc クラスタの作成

Dataproc クラスタを作成するときは、次の情報を提供する必要があります。

  • pubsubdatastoreアクセス スコープ。これらにより、クラスタ インスタンスは Pub/Sub と Datastore の対応する API にアクセスできます。
  • 前のセクションで作成したサービス アカウント。Dataproc はこのサービス アカウントをクラスタ内のすべてのインスタンスに割り当て、すべてのインスタンスがアプリを実行するための適切な権限を取得できるようにします。
  • Dataproc クラスタ イメージ バージョン 1.2。このイメージには、Dataproc がサポートし、このチュートリアルのアプリに必要となる Spark 2.2 の最新パッチ バージョンが含まれています。

次のコマンドを実行します。

gcloud dataproc clusters create demo-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.2 \
    --service-account="$SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com"

Spark ストリーミング ジョブを送信する

次の図は、Spark ストリーミング アプリケーションが使用するスライディング ウィンドウのメカニズムを示しています。

Spark ストリーミング アプリケーションが使用するスライディング ウィンドウのメカニズム

Spark ストリーミング アプリは、20 秒ごとに、Pub/Sub の tweets トピックから新しいツイートのパイプライン実行結果を収集します。新しいツイートと 60 秒間に収集したすべてのツイートを併せて処理します。次のコードで、Pub/Sub からのストリームを設定します。

val sparkConf = new SparkConf().setAppName("TrendingHashtags")
val ssc = new StreamingContext(sparkConf, Seconds(slidingInterval.toInt))

// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)

// Create stream
val messagesStream: DStream[String] = PubsubUtils
  .createStream(
    ssc,
    projectID,
    None,
    "tweets-subscription",  // Cloud Pub/Sub subscription for incoming tweets
    SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
  .map(message => new String(message.getData(), StandardCharsets.UTF_8))

Spark アプリは、この単純なパイプラインを使用して、すべてのハッシュタグを抽出し、カウントします。

private[demo] def extractTrendingTags(input: RDD[String]): RDD[Popularity] =
  input.flatMap(_.split("\\s+")) // Split on any white character
    .filter(_.startsWith("#")) // Keep only the hashtags
    // Remove punctuation, force to lowercase
    .map(_.replaceAll("[,.!?:;]", "").toLowerCase)
    // Remove the first #
    .map(_.replaceFirst("^#", ""))
    .filter(!_.isEmpty) // Remove any non-words
    .map((_, 1)) // Create word count pairs
    .reduceByKey(_ + _) // Count occurrences
    .map(r => Popularity(r._1, r._2))
    // Sort hashtags by descending number of occurrences
    .sortBy(r => (-r.amount, r.tag), ascending = true)

Spark アプリは、上位 10 件のトレンド ハッシュタグを Datastore 内の新しいデータベース表に保存します。

private[demo] def convertToEntity(hashtags: Array[Popularity],
                                  keyFactory: String => KeyFactory): FullEntity[IncompleteKey] = {
  val hashtagKeyFactory: KeyFactory = keyFactory("Hashtag")

  val listValue = hashtags.foldLeft[ListValue.Builder](ListValue.newBuilder())(
    (listValue, hashTag) => listValue.addValue(convertToDatastore(hashtagKeyFactory, hashTag))
  )

  val rowKeyFactory: KeyFactory = keyFactory("TrendingHashtags")

  FullEntity.newBuilder(rowKeyFactory.newKey())
    .set("datetime", Timestamp.now())
    .set("hashtags", listValue.build())
    .build()
}

詳細については、Spark ストリーミング アプリの完全なコードをご覧ください。

アプリを実行する手順は以下のとおりです。

  1. アプリケーション ディレクトリに移動します。

    cd $REPO_ROOT/spark
  2. Java Development Kit(JDK)バージョンを 1.8 に変更します。

    sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre
  3. アプリケーション アーカイブを作成します。

    mvn clean package

    spark/target ディレクトリ内に、アプリケーション コードと依存関係を含む spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar アーカイブが作成されます。

  4. Spark ストリーミング アプリのジョブを送信します。

    export PROJECT=$(gcloud info --format='value(config.project)')
    export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
    export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
    export ARGUMENTS="$PROJECT 60 20 60 hdfs:///user/spark/checkpoint"
    
    gcloud dataproc jobs submit spark \
    --cluster demo-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
    

    :

    • ジョブの引数は次のとおりです。
      • Google Cloud プロジェクトの ID です。
      • ウィンドウの期間(秒数)。
      • ストリーミング オペレーションの実行間隔(秒数)。
      • ジョブの合計実行時間(分数)。「0」を指定すると、ジョブは明示的に終了されるまで無期限に実行されます。
      • フォールト トレランスを向上させて、データ損失を回避できるよう、定期的なチェックポイント データを保管するためのディレクトリ。短時間のチェックポイント間隔を使用する場合、このディレクトリを HDFS 内に確保することをおすすめします。
    • --async パラメータを使用すると、ジョブが非同期で実行されます。この場合、ジョブの実行中でも Cloud Shell を引き続き使用できます。
    • --jar パラメータは、前の手順で作成したアプリケーション アーカイブを指します。Dataproc は、ジョブが開始する直前に、このアーカイブをクラスタ インスタンスにアップロードします。
    • --max-failures-per-hour パラメータを使用すると、障害の発生時にジョブが再起動されるため、復元力を向上させることができます。詳細については、再実行可能なジョブに関するドキュメントをご覧ください。
    • spark.dynamicAllocation.enabled=false プロパティは、Dataproc クラスタでデフォルトで有効にされている動的リソース割り当てを無効にします。動的リソース割り当てでは、ワークロードに応じてエグゼキュータの数が調整されます。この機能はストリーミングの場合には効果がなく、データ損失の原因になる可能性があります。
    • spark.streaming.receiver.writeAheadLog.enabled=true プロパティは、ログ先行書き込みを有効にします。したがって、フォールト トレランスが向上し、データ損失の回避に役立ちます。
  5. アクティブなジョブのリストを表示して、該当するジョブの JOB_ID 値を記録します。

    gcloud dataproc jobs list --region=us-central1 --state-filter=active

    出力は次のようになります。

    JOB_ID                            TYPE   STATUS
    473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
    
  6. ジョブの出力を表示するために、ブラウザで次の URL を開きます。[JOB_ID] は、前のステップで記録した値で置き換えます。

    https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
    

    出力は次のようになります。

    -------------------------
    Window ending Wed Apr 11 22:03:00 UTC 2018 for the past 60 seconds
    
    No trending hashtags in this window.
    
    -------------------------
    Window ending Wed Apr 11 22:03:20 UTC 2018 for the past 60 seconds
    
    No trending hashtags in this window.
    

Spark ストリーミング ジョブは Dataproc 上で実行され、Pub/Sub の tweets トピックから新しいツイートが伝播されていないかどうかを定期的にチェックします。この時点ではツイートは生成されていないため、アプリはまだデータを処理しません。次の図は、この新しい状態を示しています。

Dataproc クラスタと Datastore データベースを作成した後の新しい状態

HTTP 関数をデプロイする

HTTP 関数は、最新のトレンド ハッシュタグを表示するシンプルなユーザー インターフェースを提供します。この関数の URL をウェブブラウザで開くと、Datastore から最新のトレンド ハッシュタグを読み取り、それらをシンプルなウェブページに返します。詳細については、関数のコードをご覧ください。

HTTP 関数をデプロイしてテストする手順は以下のとおりです。

  1. HTTP 関数をデプロイします。デプロイが完了するまでに最大で 2 分かかることがあります。

    cd $REPO_ROOT/http_function
    gcloud functions deploy http_function \
        --trigger-http \
        --runtime nodejs10 \
        --allow-unauthenticated \
        --region=us-central1
    
  2. ハッシュタグが表示されていないことを確認するために、ブラウザで次の URL を開きます。[PROJECT] は、プロジェクト ID で置き換えてください。

    https://us-central1-[PROJECT].cloudfunctions.net/http_function
    

    まだツイートが生成されていないため、ページに「現在のところ、トレンド ハッシュタグはありません...しばらくしてからもう一度お試しください」というテキストが表示されます。

次の図は、前の手順で有効にした情報のフローを示しています。HTTP 関数がデプロイされ、ユーザー リクエストを受け取って Datastore データベースをクエリできる状態になっています。

有効にした情報のフロー

シミュレーション ツイートを生成する

次の手順に沿って、数千件のシミュレートされたツイートを Pub/Sub にパブリッシュします。

  1. Python 仮想環境を作成してアクティブにします。

    cd $REPO_ROOT/tweet-generator
    virtualenv venv
    source venv/bin/activate
    
  2. Python 依存関係をインストールします。

    pip install -r requirements.txt
  3. Tweet Generator を実行して数千件のシミュレーション ツイートを生成します。

    export PROJECT=$(gcloud info --format='value(config.project)')
    python tweet-generator.py $PROJECT 15 1000 &
    

    :

    • 引数は次のとおりです。
      • オブジェクトの ID。
      • Tweet Generator の合計実行時間(分数)。
      • 1 分ごとに生成されるツイートの数。
    • 渡された引数に基づいて、このプログラムは 15 分間実行され、1 分ごとに 1,000 件のツイート(合計 15,000 件のツイート)を生成します。
    • & 文字を使用すると、プログラムがバックグランドで実行されるので、プログラムの実行中も引き続き Cloud Shell を使用できます。

    以下に、プログラムによって生成されるツイートの例をいくつか記載します。

    "Piece let #outside carry identify. Management great whom
    surface. Sure away couple build seat."
    
    "South wonder hotel those. Machine among source manager #likely
    anyone. Ground enough big no know."
    

    Tweet Generator の動作について詳しくは、ソースコードをご覧ください。

  4. ジョブの出力を表示するために、ブラウザで次のページにアクセスします。[JOB_ID] は、前のステップで記録した値で置き換えます。

    https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
    

    出力は次のようになります。

    -------------------------
    Window ending 2018-04-11T22:13:40.053000000Z for the past 30 seconds
    
    Trending hashtags in this window:
    #outside, 22
    #likely, 18
    #information, 18
    #support, 17
    #national, 12
    #knowledge, 11
    #painting, 11
    #marriage, 8
    #remain, 7
    #finish, 7
    

    最新の上位 10 件のトレンド ハッシュタグと、現在のストリーム ウィンドウ内での各ハッシュタグの出現回数が表示されます。

  5. 結果が正常にデータベースに保存されていることを確認するために、次の GQL クエリを実行します。

    export ACCESS_TOKEN=$(gcloud auth print-access-token)
    curl -X POST \ -H "Authorization: Bearer "$ACCESS_TOKEN \ -H "Content-Type: application/json; charset=utf-8" \ --data "{ 'gqlQuery': { 'allowLiterals': true, 'queryString': ' SELECT * from TrendingHashtags ORDER BY datetime DESC LIMIT 1' } }" \ "https://datastore.googleapis.com/v1beta3/projects/$PROJECT:runQuery"

    出力は次のようになります(簡潔にするために省略されています)。

    {
      "batch": {
        "entityResultType": "FULL",
        "entityResults": [
          {
            "entity": {
              "key": {
                "partitionId": {
                  "projectId": "[PROJECT]"
                },
                "path": [
                  {
                    "kind": "TrendingHashtags",
                    "id": "5681777339269120"
                  }
                ]
              },
              "properties": {
                "datetime": {
                  "timestampValue": "2018-04-11T15:15:20.253Z"
                },
                "hashtags": {
                  "arrayValue": {
                    "values": [
                      {
                        "entityValue": {
                          "key": {
                            "partitionId": {
                              "projectId": "[PROJECT]"
                            },
                            "path": [
                              {
                                "k
                            ]
                          },
                          "properties": {
                            "name": {
                              "stringValue": "#outside"
                            },
                            "occurrences": {
                              "integerValue": "24"
                            }
                          }
                            }
                      },
        ...
    
  6. ブラウザのアドレスバーに次の URL を入力し、HTTP 関数のウェブページを開きます。[PROJECT] は、プロジェクト ID で置き換えてください。

    https://us-central1-[PROJECT].cloudfunctions.net/http_function
    

このページには、最新のトレンド ハッシュタグが表示されます。ページは数秒ごとに更新されます。

ジョブを終了する

ジョブは数分後に自動的に終了するように設定されていますが、次のコマンドを実行することで、今すぐジョブを終了できます。[JOB_ID] は、前に記録したジョブ ID で置き換えてください。

gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Tweet Generator を停止するには、次のコマンドを実行します。

pkill -9 -f "tweet-generator.py"

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

  • 作成したすべてのリソースをクリーンアップして、これ以後リソースの料金が発生しないようにします。課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
  • あるいは、リソースを個別に削除することもできます。

プロジェクトの削除

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

リソースを個別に削除する

プロジェクト全体ではなく個々のリソースを削除するには、次のコマンドを実行します。

gcloud dataproc clusters delete demo-cluster --quiet
gcloud functions delete http_function --quiet
gcloud pubsub topics delete tweets --quiet
gcloud pubsub subscriptions delete tweets-subscription --quiet
gcloud iam service-accounts delete $SERVICE_ACCOUNT_NAME@$PROJECT.iam.gserviceaccount.com --quiet

最後に、Datastore データベースからすべての TrendingHashtags エンティティを一括削除する方法に関するドキュメントをご覧ください。

次のステップ