目标
使用 Java、Scala 或 Python 编写简单的 wordcount Spark 作业,然后在 Dataproc 集群上运行该作业。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- Compute Engine
- Dataproc
- Cloud Storage
准备工作
运行以下步骤为运行本教程中的代码做准备。
设置项目。如有必要,请设置一个启用了 Dataproc、Compute Engine 和 Cloud Storage API 并在本地机器上安装了 Google Cloud CLI 的项目。
- 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
启用 Dataproc, Compute Engine, and Cloud Storage API。
-
创建服务帐号:
-
在 Google Cloud 控制台中,转到创建服务帐号页面。
转到“创建服务帐号” - 选择您的项目。
-
在服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。
在服务帐号说明字段中,输入说明。例如,
Service account for quickstart
。 - 点击创建并继续。
-
将 Project > Owner 角色授予服务帐号。
如需授予该角色,请找到选择角色列表,然后选择 Project > Owner。
- 点击继续。
-
点击完成以完成服务帐号的创建过程。
不要关闭浏览器窗口。您将在下一步骤中用到它。
-
-
创建服务帐号密钥:
- 在 Google Cloud 控制台中,点击您创建的服务帐号的电子邮件地址。
- 点击密钥。
- 点击添加密钥,然后点击创建新密钥。
- 点击创建。JSON 密钥文件将下载到您的计算机上。
- 点击关闭。
-
将环境变量
GOOGLE_APPLICATION_CREDENTIALS
设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。 - 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目。
-
启用 Dataproc, Compute Engine, and Cloud Storage API。
-
创建服务帐号:
-
在 Google Cloud 控制台中,转到创建服务帐号页面。
转到“创建服务帐号” - 选择您的项目。
-
在服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。
在服务帐号说明字段中,输入说明。例如,
Service account for quickstart
。 - 点击创建并继续。
-
将 Project > Owner 角色授予服务帐号。
如需授予该角色,请找到选择角色列表,然后选择 Project > Owner。
- 点击继续。
-
点击完成以完成服务帐号的创建过程。
不要关闭浏览器窗口。您将在下一步骤中用到它。
-
-
创建服务帐号密钥:
- 在 Google Cloud 控制台中,点击您创建的服务帐号的电子邮件地址。
- 点击密钥。
- 点击添加密钥,然后点击创建新密钥。
- 点击创建。JSON 密钥文件将下载到您的计算机上。
- 点击关闭。
-
将环境变量
GOOGLE_APPLICATION_CREDENTIALS
设置为包含凭据的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。 - 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
创建 Cloud Storage 存储桶。您需要使用 Cloud Storage 来保存教程数据。如果您没有可用的 Cloud Storage 存储桶,请在项目中创建新存储桶。
设置本地环境变量。在本地机器上设置环境变量。设置 Google Cloud 项目 ID 以及您将在本教程中使用的 Cloud Storage 存储桶的名称。此外,还要提供现有或新 Dataproc 集群的名称和区域。您可以在下一步中创建要用于本教程的集群。
PROJECT=project-id
BUCKET_NAME=bucket-name
CLUSTER=cluster-name
REGION=cluster-region Example: "us-central1"
创建 Dataproc 集群。运行以下命令,在指定的 Compute Engine 地区中创建单节点 Dataproc 集群。
gcloud dataproc clusters create ${CLUSTER} \ --project=${PROJECT} \ --region=${REGION} \ --single-node
将公共数据复制到 Cloud Storage 存储桶。将公共数据“莎士比亚”文本片段复制到 Cloud Storage 存储桶的
input
文件夹中:gsutil cp gs://pub/shakespeare/rose.txt \ gs://${BUCKET_NAME}/input/rose.txt
设置 Java (Apache Maven)、Scala (SBT) 或 Python 开发环境。
准备 Spark wordcount 作业
在下面选择一个标签页,按照步骤准备作业软件包或文件以提交到集群。您可以准备以下作业类型之一:
- Java 中的 Spark 作业使用 Apache Maven 创建一个 JAR 软件包
- Scala 中的 Spark 作业使用 SBT 创建一个 JAR 软件包
- Python (PySpark) 中的 Spark 作业
Java
- 将
pom.xml
文件复制到本地机器。 以下pom.xml
文件指定 Scala 和 Spark 库依赖项,它们具有provided
范围,指示 Dataproc 集群将在运行时提供这些库。pom.xml
文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为gs://
的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - 将下面列出的
WordCount.java
代码复制到本地机器。- 创建一组路径为
src/main/java/dataproc/codelab
的目录:mkdir -p src/main/java/dataproc/codelab
- 将
WordCount.java
复制到您的本地机器的src/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java 是 Java 中一个简单的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- 创建一组路径为
- 构建软件包。
mvn clean package
如果构建成功,则会创建target/word-count-1.0.jar
。 - 将软件包暂存到 Cloud Storage。
gsutil cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- 将
build.sbt
文件复制到本地机器。 以下build.sbt
文件指定 Scala 和 Spark 库依赖项,它们具有provided
范围,指示 Dataproc 集群将在运行时提供这些库。build.sbt
文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为gs://
的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件scalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - 将
word-count.scala
复制到本地机器。 这是 Java 中一个简单的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- 构建软件包。
sbt clean package
如果构建成功,则会创建target/scala-2.11/word-count_2.11-1.0.jar
。 - 将软件包暂存到 Cloud Storage。
gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- 将
word-count.py
复制到本地机器。 这是 Python 中一个简单的使用 PySpark 的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
提交作业
运行以下 gcloud
命令,将 Wordcount 作业提交到 Dataproc 集群。
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
查看输出
该作业完成后,运行以下 gcloud CLI gsutil
命令以查看 wordcount 输出。
gsutil cat gs://${BUCKET_NAME}/output/*
字数统计输出应类似于以下内容:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)
清理
完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。
删除项目
若要避免产生费用,最简单的方法是删除您为本教程创建的项目。
要删除项目,请执行以下操作:
- 在 Google Cloud 控制台中,进入管理资源页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在对话框中输入项目 ID,然后点击关闭以删除项目。
删除 Dataproc 集群
您可能希望只删除项目中的集群,而不是项目。
删除 Cloud Storage 存储桶
Google Cloud 控制台
- 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。
- 点击要删除的存储分区对应的复选框。
- 如需删除存储分区,请点击 删除,然后按照说明操作。
命令行
-
删除存储分区:
gcloud storage buckets delete BUCKET_NAME
后续步骤
- 请参阅 Spark 作业微调提示