Cloud Storage コネクタと Apache Spark の併用

このチュートリアルでは、Cloud Storage コネクタApache Spark を併用するサンプルコードの実行方法について説明します。

目標

Java、Scala、または Python で簡単な wordcount Spark ジョブを作成し、Dataproc クラスタでジョブを実行します。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

  • Compute Engine
  • Dataproc
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

以下の手順を実行して、このチュートリアルでコードを実行する準備をします。

  1. プロジェクトを設定します。 必要に応じて、有効にした Dataproc、Compute Engine、Cloud Storage API と、ローカルマシンにインストールされた Cloud SDK でプロジェクトを設定します。

    1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
    2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタに移動

    3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

    4. Dataproc, Compute Engine, and Cloud Storage API を有効にします。

      API を有効にする

    5. サービス アカウントを作成します。

      1. Cloud Console で [サービス アカウントの作成] ページに移動します。

        [サービス アカウントの作成] に移動
      2. プロジェクトを選択します。
      3. [サービス アカウント名] フィールドに名前を入力します。Cloud Console は、この名前に基づいて [サービス アカウント ID] フィールドに入力します。

        [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

      4. [作成して続行] をクリックします。
      5. [ロールを選択] フィールドをクリックします。

        [クイック アクセス] で [基本]、[オーナー] の順にクリックします。

      6. [続行] をクリックします。
      7. [完了] をクリックして、サービス アカウントの作成を完了します。

        ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

    6. サービス アカウント キーを作成します。

      1. Cloud Console で、作成したサービス アカウントのメールアドレスをクリックします。
      2. [キー] をクリックします。
      3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
      4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
      5. [閉じる] をクリックします。
    7. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

    8. Cloud SDK をインストールして初期化します。
    9. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタに移動

    10. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

    11. Dataproc, Compute Engine, and Cloud Storage API を有効にします。

      API を有効にする

    12. サービス アカウントを作成します。

      1. Cloud Console で [サービス アカウントの作成] ページに移動します。

        [サービス アカウントの作成] に移動
      2. プロジェクトを選択します。
      3. [サービス アカウント名] フィールドに名前を入力します。Cloud Console は、この名前に基づいて [サービス アカウント ID] フィールドに入力します。

        [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

      4. [作成して続行] をクリックします。
      5. [ロールを選択] フィールドをクリックします。

        [クイック アクセス] で [基本]、[オーナー] の順にクリックします。

      6. [続行] をクリックします。
      7. [完了] をクリックして、サービス アカウントの作成を完了します。

        ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

    13. サービス アカウント キーを作成します。

      1. Cloud Console で、作成したサービス アカウントのメールアドレスをクリックします。
      2. [キー] をクリックします。
      3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
      4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
      5. [閉じる] をクリックします。
    14. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

    15. Cloud SDK をインストールして初期化します。

  2. Cloud Storage バケットを作成します。チュートリアル データを保持するには Cloud Storage が必要です。使用できるバケットがない場合は、プロジェクトに新しいバケットを作成します。

    1. Cloud Console で、Cloud Storage ブラウザページに移動します。

      [ブラウザ] に移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      • [バケットに名前を付ける] で、バケット名の要件を満たす名前を入力します。
      • [データの保存場所の選択] で、次の操作を行います。
        • [ロケーション タイプ] オプションを選択します。
        • [ロケーション] オプションを選択します。
      • [データのデフォルトのストレージ クラスを選択する] で、ストレージ クラスを選択します。
      • [オブジェクトへのアクセスを制御する方法を選択する] で [アクセス制御] オプションを選択します。
      • [詳細設定(省略可)] には、暗号化メソッド保持ポリシー、またはバケットラベルを指定します。
    4. [作成] をクリックします。

  3. ローカル環境変数を設定します。ローカルマシンで環境変数を設定します。このチュートリアルで使用する Google Cloud プロジェクト ID と Cloud Storage バケットの名前を設定します。既存または新規の Dataproc クラスタの名前とリージョンも指定します。次の手順で、このチュートリアルで使用するクラスタを作成できます。

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Dataproc クラスタを作成します。以下のコマンドを実行して、指定された Compute Engine ゾーン単一ノードの Dataproc クラスタを作成します。

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. 一般公開データを Cloud Storage バケットにコピーします。一般公開データであるシェイクスピアのテキスト スニペットを Cloud Storage バケットの input フォルダにコピーします。

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Java(Apache Maven)Scala(SBT)、または Python の開発環境を設定します。

Spark wordcount ジョブを準備する

以下のタブを選択し、手順に沿ってクラスタに送信するジョブ パッケージまたはファイルを準備します。次のいずれかのジョブタイプを準備できます。

Java

  1. pom.xml ファイルをローカルマシンにコピーします。次の pom.xml ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Dataproc クラスタがこれらのライブラリを提供することを示す provided スコープが指定されます。pom.xml ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs:// で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. 以下の WordCount.java コードをローカルマシンにコピーします。
    1. パス src/main/java/dataproc/codelab を指定して一連のディレクトリを作成します。
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java をローカルマシンの src/main/java/dataproc/codelab にコピーします。
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java は Java の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. パッケージをビルドします。
    mvn clean package
    
    ビルドが成功すると、target/spark-with-gcs-1.0-SNAPSHOT.jar が作成されます。
  4. パッケージを Cloud Storage にステージングします。
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt ファイルをローカルマシンにコピーします。次の build.sbt ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Dataproc クラスタがこれらのライブラリを提供することを示す provided スコープが指定されます。build.sbt ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs:// で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. word-count.scala をローカルマシンにコピーします。 これは Java の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. パッケージをビルドします。
    sbt clean package
    
    ビルドが成功すると、target/scala-2.11/word-count_2.11-1.0.jar が作成されます。
  4. パッケージを Cloud Storage にステージングします。
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py をローカルマシンにコピーします。 これは PySpark を使用した Python の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

ジョブを送信する

次の gcloud コマンドを実行して、wordcount ジョブを Dataproc クラスタに送信します。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

出力を表示する

ジョブが終了したら、次の Cloud SDK gsutil コマンドを実行して wordcount の出力を表示します。

gsutil cat gs://${BUCKET_NAME}/output/*

wordcount の出力は、次のようになります。

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

クリーンアップ

チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

Dataproc クラスタの削除

プロジェクトを削除する代わりに、プロジェクト内のクラスタのみを削除できます。

Cloud Storage バケットの削除

Cloud Console

  1. Cloud Console の Cloud Storage ブラウザページに移動します。

    [ブラウザ] に移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。

コマンドライン

    バケットを削除します。
    gsutil rb BUCKET_NAME

次のステップ