目標
Java、Scala、または Python で簡単な wordcount Spark ジョブを作成し、Dataproc クラスタでジョブを実行します。
費用
このドキュメントでは、課金対象である次の Google Cloud コンポーネントを使用します。
- Compute Engine
- Dataproc
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
始める前に
以下の手順を実行して、このチュートリアルでコードを実行する準備をします。
プロジェクトを設定します。 必要に応じて、有効にした Dataproc、Compute Engine、Cloud Storage API と、ローカルマシンにインストールされた Google Cloud CLI でプロジェクトを設定します。
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Dataproc, Compute Engine, and Cloud Storage API を有効にします。
-
サービス アカウントを作成します。
-
Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。
[サービス アカウントの作成] に移動 - プロジェクトを選択します。
-
[サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドに値が設定されます。
[サービス アカウントの説明] フィールドに説明を入力します。例:
Service account for quickstart
- [作成して続行] をクリックします。
-
サービス アカウントに Project > Owner ロールを付与します。
ロールを付与するには、[ロールを選択] リストで [Project > Owner] を選択します。
- [続行] をクリックします。
-
[完了] をクリックして、サービス アカウントの作成を完了します。
ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。
-
-
サービス アカウント キーを作成します。
- Google Cloud コンソールで、作成したサービス アカウントのメールアドレスをクリックします。
- [キー] をクリックします。
- [鍵を追加]、[新しい鍵を作成] の順にクリックします。
- [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
- [閉じる] をクリックします。
-
環境変数
GOOGLE_APPLICATION_CREDENTIALS
を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。 - Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Dataproc, Compute Engine, and Cloud Storage API を有効にします。
-
サービス アカウントを作成します。
-
Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。
[サービス アカウントの作成] に移動 - プロジェクトを選択します。
-
[サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドに値が設定されます。
[サービス アカウントの説明] フィールドに説明を入力します。例:
Service account for quickstart
- [作成して続行] をクリックします。
-
サービス アカウントに Project > Owner ロールを付与します。
ロールを付与するには、[ロールを選択] リストで [Project > Owner] を選択します。
- [続行] をクリックします。
-
[完了] をクリックして、サービス アカウントの作成を完了します。
ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。
-
-
サービス アカウント キーを作成します。
- Google Cloud コンソールで、作成したサービス アカウントのメールアドレスをクリックします。
- [キー] をクリックします。
- [鍵を追加]、[新しい鍵を作成] の順にクリックします。
- [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
- [閉じる] をクリックします。
-
環境変数
GOOGLE_APPLICATION_CREDENTIALS
を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。 - Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
Cloud Storage バケットを作成します。チュートリアル データを保持するには Cloud Storage が必要です。使用できるバケットがない場合は、プロジェクトに新しいバケットを作成します。
- Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。
- [バケットを作成] をクリックします。
- [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
- [作成] をクリックします。
ローカル環境変数を設定します。ローカルマシンで環境変数を設定します。このチュートリアルで使用する Google Cloud プロジェクト ID と Cloud Storage バケットの名前を設定します。既存または新規の Dataproc クラスタの名前とリージョンも指定します。次の手順で、このチュートリアルで使用するクラスタを作成できます。
PROJECT=project-id
BUCKET_NAME=bucket-name
CLUSTER=cluster-name
REGION=cluster-region Example: "us-central1"
Dataproc クラスタを作成します。以下のコマンドを実行して、指定された Compute Engine ゾーンに単一ノードの Dataproc クラスタを作成します。
gcloud dataproc clusters create ${CLUSTER} \ --project=${PROJECT} \ --region=${REGION} \ --single-node
一般公開データを Cloud Storage バケットにコピーします。一般公開データであるシェイクスピアのテキスト スニペットを Cloud Storage バケットの
input
フォルダにコピーします。gsutil cp gs://pub/shakespeare/rose.txt \ gs://${BUCKET_NAME}/input/rose.txt
Java(Apache Maven)、Scala(SBT)、または Python の開発環境を設定します。
Spark wordcount ジョブを準備する
以下のタブを選択し、手順に沿ってクラスタに送信するジョブ パッケージまたはファイルを準備します。次のいずれかのジョブタイプを準備できます。
- Java の Spark ジョブ。JAR パッケージを構築する際に Apache Maven を使用
- Scala の Spark ジョブ。JAR パッケージを構築する際に SBT を使用
- Python の Spark ジョブ(PySpark)
Java
pom.xml
ファイルをローカルマシンにコピーします。次のpom.xml
ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Dataproc クラスタがこれらのライブラリを提供することを示すprovided
スコープが指定されます。pom.xml
ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs://
で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project>- 以下の
WordCount.java
コードをローカルマシンにコピーします。- パス
src/main/java/dataproc/codelab
を指定して一連のディレクトリを作成します。mkdir -p src/main/java/dataproc/codelab
WordCount.java
をローカルマシンのsrc/main/java/dataproc/codelab
にコピーします。cp WordCount.java src/main/java/dataproc/codelab
WordCount.java は Java の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。
package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- パス
- パッケージをビルドします。
mvn clean package
ビルドが成功すると、target/word-count-1.0.jar
が作成されます。 - パッケージを Cloud Storage にステージングします。
gsutil cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
build.sbt
ファイルをローカルマシンにコピーします。次のbuild.sbt
ファイルで、Scala ライブラリと Spark ライブラリの依存関係を指定します。この依存関係には、実行時に Dataproc クラスタがこれらのライブラリを提供することを示すprovided
スコープが指定されます。build.sbt
ファイルでは、コネクタが標準の HDFS インターフェースを実装しているため、Cloud Storage の依存関係は指定されません。Spark ジョブが Cloud Storage クラスタ ファイル(gs://
で始まる URI を持つファイル)にアクセスすると、システムは自動的に Cloud Storage コネクタを使用して Cloud Storage のファイルにアクセスします。scalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" )word-count.scala
をローカルマシンにコピーします。 これは Java の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- パッケージをビルドします。
sbt clean package
ビルドが成功すると、target/scala-2.11/word-count_2.11-1.0.jar
が作成されます。 - パッケージを Cloud Storage にステージングします。
gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
word-count.py
をローカルマシンにコピーします。 これは PySpark を使用した Python の単純な Spark ジョブであり、Cloud Storage からテキスト ファイルを読み込んで単語数を計算し、そのテキスト ファイルの結果を Cloud Storage に書き込みます。#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
ジョブを送信する
次の gcloud
コマンドを実行して、wordcount ジョブを Dataproc クラスタに送信します。
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
出力を表示する
ジョブが終了したら、次の gcloud CLI gsutil
コマンドを実行して wordcount の出力を表示します。
gsutil cat gs://${BUCKET_NAME}/output/*
wordcount の出力は、次のようになります。
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)
クリーンアップ
チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。
プロジェクトの削除
課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
プロジェクトを削除するには:
- Google Cloud コンソールで、[リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
Dataproc クラスタの削除
プロジェクトを削除する代わりに、プロジェクト内のクラスタのみを削除できます。
Cloud Storage バケットの削除
Google Cloud コンソール
- Google Cloud コンソールで、Cloud Storage の [ブラウザ] ページに移動します。
- 削除するバケットのチェックボックスをクリックします。
- バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。
コマンドライン
-
バケットを削除します。
gsutil rb BUCKET_NAME
次のステップ
- Spark ジョブ調整のヒントを見る。