Cloud Storage コネクタと Apache Spark の併用

このチュートリアルでは、Cloud Storage コネクタApache Spark を併用するサンプルコードの実行方法について説明します。

目標

Java、Scala、または Python で簡単な wordcount Spark ジョブを作成し、Dataproc クラスタでジョブを実行します。

料金

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

  • Compute Engine
  • Dataproc
  • Cloud Storage

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。Cloud Platform を初めて使用する方は、無料トライアルをご利用いただけます。

始める前に

以下の手順を実行して、このチュートリアルでコードを実行する準備をします。

  1. プロジェクトを設定します。 必要に応じて、有効にした Dataproc、Compute Engine、Cloud Storage API と、ローカルマシンにインストールされた Cloud SDK でプロジェクトを設定します。

    1. Google アカウントにログインします。

      Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

    2. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタのページに移動

    3. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

    4. Dataproc, Compute Engine, and Cloud Storage API を有効にします。

      API を有効にする

    5. Cloud SDK をインストールして初期化します。

  2. Cloud Storage バケットを作成します。チュートリアル データを保持するには Cloud Storage が必要です。使用できるバケットがない場合は、プロジェクトに新しいバケットを作成します。

    1. Cloud Console で、[Cloud Storage ブラウザ] ページに移動します。

      Cloud Storage ブラウザページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ダイアログ内で、以下の属性を指定します。
    4. [作成] をクリックします。

  3. ローカル環境変数を設定します。ローカルマシンで環境変数を設定します。このチュートリアルで使用する Google Cloud プロジェクト ID と Cloud Storage バケットの名前を設定します。既存または新規の Dataproc クラスタの名前とリージョンも指定します。次の手順で、このチュートリアルで使用するクラスタを作成できます。

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. Dataproc クラスタを作成します。以下のコマンドを実行して、指定された Compute Engine ゾーン単一ノードの Dataproc クラスタを作成します。

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. 一般公開データを Cloud Storage バケットにコピーします。一般公開データであるシェイクスピアのテキスト スニペットを Cloud Storage バケットの input フォルダにコピーします。

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. Java(Apache Maven)Scala(SBT)、または Python の開発環境を設定します。

Spark wordcount ジョブを準備する

以下のタブを選択し、手順に沿ってクラスタに送信するジョブ パッケージまたはファイルを準備します。次のいずれかのジョブタイプを準備できます。

Java

  1. pom.xml ファイルをローカルマシンにコピーします。次の pom.xml ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Dataproc クラスタがこれらのライブラリを提供することを示す provided スコープが指定されます。pom.xml ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs:// で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. 以下の WordCount.java コードをローカルマシンにコピーします。
    1. パス src/main/java/dataproc/codelab を指定して一連のディレクトリを作成します。
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java をローカルマシンの src/main/java/dataproc/codelab にコピーします。
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java は Java の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. パッケージをビルドします。
    mvn clean package
    
    ビルドが成功すると、target/spark-with-gcs-1.0-SNAPSHOT.jar が作成されます。
  4. パッケージを Cloud Storage にステージングします。
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt ファイルをローカルマシンにコピーします。次の build.sbt ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Dataproc クラスタがこれらのライブラリを提供することを示す provided スコープが指定されます。build.sbt ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs:// で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. word-count.scala をローカルマシンにコピーします。 これは Java の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. パッケージをビルドします。
    sbt clean package
    
    ビルドが成功すると、target/scala-2.11/word-count_2.11-1.0.jar が作成されます。
  4. パッケージを Cloud Storage にステージングします。
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py をローカルマシンにコピーします。 これは PySpark を使用した Python の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

ジョブを送信する

次の gcloud コマンドを実行して、wordcount ジョブを Dataproc クラスタに送信します。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

出力を表示する

ジョブが終了したら、次の Cloud SDK gsutil コマンドを実行して wordcount の出力を表示します。

gsutil cat gs://${BUCKET_NAME}/output/*

wordcount の出力は、次のようになります。

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

クリーンアップ

Dataproc を使用するのチュートリアルが終了したら、Google Cloud で作成したリソースをクリーンアップして、今後割り当ての消費や課金が発生しないようにします。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

Dataproc クラスタの削除

プロジェクトを削除する代わりに、プロジェクト内のクラスタのみを削除できます。

Cloud Storage バケットの削除

Cloud Console

  1. Cloud Console で、[Cloud Storage ブラウザ] ページに移動します。

    Cloud Storage ブラウザページに移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. [削除] をクリックして、バケットを削除します。

コマンドライン

    バケットを削除します。
    gsutil rb [BUCKET_NAME]