Kubernetes Engine で Spark を使用して BigQuery のデータを処理する

このチュートリアルでは、BigQuery を使用してデータを格納するデータ パイプラインを作成し、これによって生成されたデータを Google Kubernetes Engine(GKE)上の Spark で処理します。このパイプラインは、GKE でコンピューティング インフラストラクチャを標準化し、既存のワークフローを移植する方法を探しているチームに便利です。ほとんどのチームにとって、Cloud Dataproc で Spark を実行するのが、Spark アプリケーションを実行する最も簡単でスケーラブルな方法です。このチュートリアルでは、パブリック BigQuery データセットの GitHub データを評価し、投稿が必要なプロジェクトを特定します。このチュートリアルは、GKE と Apache Spark に精通していることを前提としています。次のアーキテクチャ図は、ここで使用するテクノロジーの概要を表しています。

アーキテクチャ図

GitHub の多くのプロジェクトは Go で作成されていますが、ヘルプが必要なプロジェクトや、コードベースで最も注意が必要な場所を表す指標はほとんどありません。

このチュートリアルでは、次の指標を使用して、プロジェクトに投稿が必要かどうかを判断します。

  • 未解決の問題の数。
  • 投稿者の数。
  • プロジェクトのパッケージが他のプロジェクトによってインポートされた回数。
  • FIXME または TODO コメントの頻度。

次の図は、Spark アプリケーションのパイプラインを示しています。

Spark アプリケーション パイプライン

目標

  • Spark アプリケーションを実行する Kubernetes Engine クラスタを作成する。
  • Kubernetes Engine に Spark アプリケーションをデプロイする。
  • Spark アプリケーションから BigQuery テーブルにクエリを実行し、書き込みを行う。
  • BigQuery を使用して結果を分析する。

費用

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

料金計算ツールを使用すると、予想使用量に基づいて費用の見積もりを作成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Kubernetes Engine and BigQuery API を有効にします。

    API を有効にする

環境設定

このセクションでは、チュートリアルを実行するために必要なプロジェクト設定を行います。

Cloud Shell インスタンスの起動

Cloud Shell を開く

Cloud Shell のチュートリアルの残りの部分を行います。

手動でパイプラインを実行する

以下では、BigQuery を使用して、[bigquery-public-data:github_repos.files] のサブセットである sample_files テーブルから .go 拡張子のファイルをすべて抽出します。データのサブセットを使用することで、低コストで処理を行うことができます。

  1. Cloud Shell で次のコマンドを実行し、BigQuery でクエリの中間結果を格納する新しいデータセットとテーブルを作成します。

    export PROJECT=$(gcloud info --format='value(config.project)')
    bq mk --project_id $PROJECT spark_on_k8s_manual
    bq mk --project_id $PROJECT spark_on_k8s_manual.go_files
    
  2. GitHub リポジトリ データセットから Go ファイルのサンプルを確認し、--destination_table オプションを指定して中間テーブルにファイルを格納します。

    export PROJECT=$(gcloud info --format='value(config.project)')
    bq query --project_id $PROJECT --replace \
             --destination_table spark_on_k8s_manual.go_files \
        'SELECT id, repo_name, path FROM
    [bigquery-public-data:github_repos.sample_files]
         WHERE RIGHT(path, 3) = ".go"'
    

    ファイルのパスは、元のリポジトリと一緒に表示されます。例:

    Waiting on bqjob_r311c807f17003279_0000015fb8007c47_1 ... (0s) Current status: DONE
    +------------------------------------------+------------------+-------------------------+
    |                    id                    |    repo_name     |          path           |
    +------------------------------------------+------------------+-------------------------+
    | 31a4559c1e636e | mandelsoft/spiff | spiff++/spiff.go        |
    | 15f7611dd21a89 | bep/gr           | examples/router/main.go |
    | 15cbb0b0f096a2 | knq/xo           | internal/fkmode.go      |
    +------------------------------------------+------------------+-------------------------+
    

    識別された Go ファイルのリストが spark_on_k8s_manual.go_files テーブルに格納されます。

  3. 次のクエリを実行して、各ファイルの最初の 10 文字を表示します。

    export PROJECT=$(gcloud info --format='value(config.project)')
    bq query --project_id $PROJECT 'SELECT sample_repo_name as
    repo_name, SUBSTR(content, 0, 10) FROM
    [bigquery-public-data:github_repos.sample_contents] WHERE id IN
    (SELECT id FROM spark_on_k8s_manual.go_files)'
    

Kubernetes の Spark でパイプラインを実行する

次に、spark-bigquery コネクタを使用する Spark アプリケーションで同様の手順を自動化し、BigQuery に直接 SQL クエリを実行します。このアプリケーションは Spark SQL と DataFrames API を使用して結果を操作し、BigQuery に保存します。

Kubernetes Engine クラスタの作成

Spark とサンプル アプリケーションをデプロイするには、次のコマンドを実行して Kubernetes Engine クラスタを作成します。

gcloud config set compute/zone us-central1-f
gcloud container clusters create spark-on-gke --machine-type n1-standard-2

サンプルコードのダウンロード

サンプル アプリケーション リポジトリのクローンを作成します。

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-gcp-examples.git
cd spark-on-k8s-gcp-examples/github-insights

Identity and Access Management を構成する

Spark に BigQuery へのアクセスを許可するには、Identity and Access Management(IAM)サービス アカウントを作成する必要があります。

  1. サービス アカウントを作成します。

    gcloud iam service-accounts create spark-bq --display-name spark-bq
    
  2. 後のコマンドで使用するために、サービス アカウントのメールアドレスと現在のプロジェクト ID を環境変数に格納します。

    export SA_EMAIL=$(gcloud iam service-accounts list --filter="displayName:spark-bq" --format='value(email)')
    export PROJECT=$(gcloud info --format='value(config.project)')
    
  3. サンプル アプリケーションでは、BigQuery データセットとテーブルを作成して操作し、Cloud Storage からアーティファクトを削除します。bigquery.dataOwnerbigQuery.jobUserstorage.admin のロールをサービス アカウントにバインドします。

    gcloud projects add-iam-policy-binding $PROJECT --member serviceAccount:$SA_EMAIL --role roles/storage.admin
    gcloud projects add-iam-policy-binding $PROJECT --member serviceAccount:$SA_EMAIL --role roles/bigquery.dataOwner
    gcloud projects add-iam-policy-binding $PROJECT --member serviceAccount:$SA_EMAIL --role roles/bigquery.jobUser
    
  4. サービス アカウントの JSON キーをダウンロードして、Kubernetes シークレットに保存します。Spark のドライバとエグゼキュータは、このシークレットを使用して BigQuery の認証を行います。

    gcloud iam service-accounts keys create spark-sa.json --iam-account $SA_EMAIL
    kubectl create secret generic spark-sa --from-file=spark-sa.json
    
  5. Kubernetes クラスタでジョブを起動できるように Spark の権限を追加します。

    kubectl create clusterrolebinding user-admin-binding --clusterrole=cluster-admin --user=$(gcloud config get-value account)
    kubectl create clusterrolebinding --clusterrole=cluster-admin --serviceaccount=default:default spark-admin
    

Spark アプリケーションの設定と実行

Spark をダウンロードしてインストールし、Kubernetes Engine クラスタで Spark サンプル アプリケーションを実行します。

  1. サンプル アプリケーションのビルドプロセスを管理するため、Maven をインストールします。

    sudo apt-get install -y maven
  2. サンプル アプリケーションの jar をビルドします。

    mvn clean package
  3. アプリケーションの jar と Spark パイプラインの結果を格納する Cloud Storage バケットを作成します。

    export PROJECT=$(gcloud info --format='value(config.project)')
    gsutil mb gs://$PROJECT-spark-on-k8s
    
  4. アプリケーションの jar を Cloud Storage バケットにアップロードします。

    gsutil cp target/github-insights-1.0-SNAPSHOT-jar-with-dependencies.jar \
                   gs://$PROJECT-spark-on-k8s/jars/
    
  5. 新しい BigQuery データセットを作成します。

    bq mk --project_id $PROJECT spark_on_k8s
  6. 公式の Spark 2.3 ディストリビューションをダウンロードして、アーカイブを解除します。

    wget https://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
    tar xvf spark-2.3.0-bin-hadoop2.7.tgz
    cd spark-2.3.0-bin-hadoop2.7
    
  7. プロジェクト固有の情報を含むプロパティ ファイルを作成して、Spark アプリケーションを構成します。

    cat > properties << EOF
    spark.app.name  github-insights
    spark.kubernetes.namespace default
    spark.kubernetes.driverEnv.GCS_PROJECT_ID $PROJECT
    spark.kubernetes.driverEnv.GOOGLE_APPLICATION_CREDENTIALS /mnt/secrets/spark-sa.json
    spark.kubernetes.container.image gcr.io/cloud-solutions-images/spark:v2.3.0-gcs
    spark.kubernetes.driver.secrets.spark-sa  /mnt/secrets
    spark.kubernetes.executor.secrets.spark-sa /mnt/secrets
    spark.driver.cores 0.1
    spark.executor.instances 3
    spark.executor.cores 1
    spark.executor.memory 512m
    spark.executorEnv.GCS_PROJECT_ID    $PROJECT
    spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS /mnt/secrets/spark-sa.json
    spark.hadoop.google.cloud.auth.service.account.enable true
    spark.hadoop.google.cloud.auth.service.account.json.keyfile /mnt/secrets/spark-sa.json
    spark.hadoop.fs.gs.project.id $PROJECT
    EOF
    
  8. 次のコマンドを使用して、サンプルの GitHub データセットに Spark アプリケーションを実行します。

    export KUBERNETES_MASTER_IP=$(gcloud container clusters list --filter name=spark-on-gke --format='value(MASTER_IP)')
    bin/spark-submit \
    --properties-file properties \
    --deploy-mode cluster \
    --class spark.bigquery.example.github.NeedingHelpGoPackageFinder \
    --master k8s://https://$KUBERNETES_MASTER_IP:443 \
    --jars http://central.maven.org/maven2/com/databricks/spark-avro_2.11/4.0.0/spark-avro_2.11-4.0.0.jar \
    gs://$PROJECT-spark-on-k8s/jars/github-insights-1.0-SNAPSHOT-jar-with-dependencies.jar \
    $PROJECT spark_on_k8s $PROJECT-spark-on-k8s --usesample
    
  9. [Cloud Shell セッションを追加] ボタンをクリックして、新しい Cloud Shell セッションを開始します。

    [Cloud Shell セッションを追加] ボタン

  10. 新しい Cloud Shell セッションで、次のコマンドを使用してアプリケーションの進行状況を追跡し、ログを表示します。アプリケーションの実行には 5 分ほどかかります。

    kubectl logs -l spark-role=driver
  11. アプリケーションの実行が終了したら、次のコマンドを実行して、最も利用されている 10 個のパッケージを確認します。

    bq query "SELECT * FROM spark_on_k8s.popular_go_packages
    ORDER BY popularity DESC LIMIT 10"
    

ステップ 8 で --usesample オプションを削除すると、GitHub データセット内のすべてのテーブルに同じパイプラインを実行できます。完全なデータセットのサイズはサンプル データセットよりも大きいため、妥当な時間内にパイプラインを完了するには、より大きなクラスタが必要になる可能性があります。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ

  • Spark で BigQuery や Dataproc を使用する別の例を確認する。
  • このチュートリアルで、Cloud Dataproc、BigQuery、Apache Spark ML を使用して機械学習を実行する方法を確認する。

  • Google Cloud に関するリファレンス アーキテクチャ、図、チュートリアル、ベスト プラクティスを確認する。Cloud Architecture Center を確認します。