Cloud Storage コネクタと Apache Spark の併用

このチュートリアルでは、Cloud Storage コネクタApache Spark を併用するサンプルコードの実行方法について説明します。


Java、Scala、または Python で簡単な wordcount Spark ジョブを作成し、Dataproc クラスタでジョブを実行します。


このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Compute Engine
  • Dataproc
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。



  1. プロジェクトを設定します。 必要に応じて、有効にした Dataproc、Compute Engine、Cloud Storage API と、ローカルマシンにインストールされた Google Cloud CLI でプロジェクトを設定します。

  2. Cloud Storage バケットを作成します。チュートリアル データを保持するには Cloud Storage が必要です。使用できるバケットがない場合は、プロジェクトに新しいバケットを作成します。

  3. ローカル環境変数を設定します。ローカルマシンで環境変数を設定します。 Google Cloud project-id と、このチュートリアルで使用する Cloud Storage バケットの名前を設定します。既存または新規の Dataproc クラスタの名前とリージョンも指定します。次の手順で、このチュートリアルで使用するクラスタを作成できます。

    REGION=cluster-region Example: "us-central1"

  4. Dataproc クラスタを作成します。以下のコマンドを実行して、指定された Compute Engine ゾーン単一ノードの Dataproc クラスタを作成します。

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \

  5. 一般公開データを Cloud Storage バケットにコピーします。一般公開データであるシェイクスピアのテキスト スニペットを Cloud Storage バケットの input フォルダにコピーします。

    gcloud storage cp gs://pub/shakespeare/rose.txt \

  6. Java(Apache Maven)Scala(SBT)、または Python の開発環境を設定します。

Spark wordcount ジョブを準備する

以下のタブを選択し、手順に沿ってクラスタに送信するジョブ パッケージまたはファイルを準備します。次のいずれかのジョブタイプを準備できます。

  1. pom.xml ファイルをローカルマシンにコピーします。次の pom.xml ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Dataproc クラスタがこれらのライブラリを提供することを示す provided スコープが指定されます。pom.xml ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs:// で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns=""
          <version>Scala version, for example, 2.11.8</version>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
  2. 以下の コードをローカルマシンにコピーします。
    1. パス src/main/java/dataproc/codelab を指定して一連のディレクトリを作成します。
      mkdir -p src/main/java/dataproc/codelab
    2. をローカルマシンの src/main/java/dataproc/codelab にコピーします。
      cp src/main/java/dataproc/codelab
   は Java の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。

    package dataproc.codelab;
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import scala.Tuple2;
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
            (Integer count1, Integer count2) -> count1 + count2
  3. パッケージをビルドします。
    mvn clean package
    ビルドが成功すると、target/word-count-1.0.jar が作成されます。
  4. パッケージを Cloud Storage にステージングします。
    gcloud storage cp target/word-count-1.0.jar \
  1. build.sbt ファイルをローカルマシンにコピーします。次の build.sbt ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Dataproc クラスタがこれらのライブラリを提供することを示す provided スコープが指定されます。build.sbt ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs:// で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。
    scalaVersion := "Scala version, for example, 2.11.8"
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
  2. word-count.scala をローカルマシンにコピーします。 これは Java の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
    package dataproc.codelab
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        val inputPath = args(0)
        val outputPath = args(1)
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = => (word, 1)).reduceByKey(_ + _)
  3. パッケージをビルドします。
    sbt clean package
    ビルドが成功すると、target/scala-2.11/word-count_2.11-1.0.jar が作成されます。
  4. パッケージを Cloud Storage にステージングします。
    gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \
  1. をローカルマシンにコピーします。 これは PySpark を使用した Python の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
    #!/usr/bin/env python
    import pyspark
    import sys
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)


次の gcloud コマンドを実行して、wordcount ジョブを Dataproc クラスタに送信します。

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
gcloud dataproc jobs submit pyspark \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/


ジョブが終了したら、次の gcloud CLI コマンドを実行して wordcount の出力を表示します。

gcloud storage cat gs://${BUCKET_NAME}/output/*

wordcount の出力は、次のようになります。







Dataproc クラスタの削除


Cloud Storage バケットの削除

    gcloud storage buckets delete BUCKET_NAME
