在 Cloud Dataproc 集群上编写并运行 Spark Scala 作业

本教程说明了创建 Spark Scala 作业并将作业提交到 Cloud Dataproc 集群的不同方法,教程中介绍了以下过程:

  • 使用 Scala REPL(Read-Evaluate-Print-Loop 或交互式解析器)、SBT 构建工具或包含了 Eclipse 的 Scala IDE 插件的 Eclipse IDE,通过命令行在本地机器上编写和编译 Spark Scala“Hello World”应用
  • 将经过编译的 Scala 类打包成一个包含清单的 jar 文件
  • 将 Scala jar 提交到您的 Cloud Dataproc 集群上运行的 Spark 作业
  • 从 Google Cloud Platform Console 检查 Scala 作业输出

本教程还向您展示了如何执行以下操作:

  • 使用 spark-shell REPL 直接在 Cloud Dataproc 集群上编写并运行 Spark Scala“WordCount”mapreduce 作业

  • 在集群上运行预先安装的 Apache Spark 和 Hadoop 示例

设置 Google Cloud Platform 项目

如果您尚未对项目进行设置,请按以下步骤进行操作:

  1. 设置项目
  2. 创建 Cloud Storage 存储分区
  3. 创建 Cloud Dataproc 集群

在本地编写和编译 Scala 代码

作为本教程的一个简单练习,请在开发机器上本地使用 Scala REPLSBT 命令行界面或 Eclipse 的 Scala IDE 插件编写“Hello World”Scala 应用。

使用 Scala

  1. Scala 安装页面下载 Scala 二进制文件
  2. Scala 安装说明中所示,解压缩该文件,设置 SCALA_HOME 环境变量,并将其添加到您的路径中。例如:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. 启动 Scala REPL

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. HelloWorld 代码复制并粘贴到 Scala REPL 中

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }
    
    

  5. 保存 HelloWorld.scala 并退出 REPL

    scala> :save HelloWorld.scala
    scala> :q
    

  6. 使用 scalac 进行编译

    $ scalac HelloWorld.scala
    

  7. 列出已编译的 .class 文件

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

使用 SBT

  1. 下载 SBT

  2. 创建一个“HelloWorld”项目,如下所示

    $ mkdir hello
    $ cd hello
    $ echo \
      'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
      HelloWorld.scala
    

  3. 创建一个 sbt.build 配置文件,以将 artifactName(您将在下面生成的 jar 文件的名称)设置为“HelloWorld.jar”(请参阅修改默认软件工件

    echo \
      'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
      "HelloWorld.jar" }' > \
      build.sbt
    

  4. 启动 SBT 并运行代码

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. 将代码打包成带有指定了主类入口点 (HelloWorld) 的清单jar 文件,然后退出

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

使用 Eclipse 的 SBT 插件

  1. 安装 Eclipse 的 SBT 插件

  2. 选择/创建一个工作区并启动一个新的 Scala 项目

  3. 创建 Scala 项目页面上,命名“HelloWorld”项目,然后点击“完成”以接受默认设置
  4. 选择左窗格 (Package Explorer) 中的 src 文件夹,然后选择“新建→Scala 对象”
  5. 新建文件对话框中,将“HelloWorld”作为 Scala 对象名称插入,然后点击“完成”
  6. HelloWorld.scala 文件在创建时附带了一个 HelloWorld 对象模板。请填写模板以完成 HelloWorld 对象定义,如下所示:
    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }
    
    
  7. 点击 HelloWorld.scala 文档,或在左窗格 (Package Explorer) 中选择文档名称,然后右键点击并选择“运行方式→Scala 应用”以构建并运行应用
  8. Console 将显示已成功构建并运行该应用
  9. 已编译的类文件位于 Eclipse 工作区中的项目 bin 文件夹内。请使用这些文件创建 jar - 请参阅创建 jar

创建 jar

使用 SBT 或使用 jar 命令创建 jar 文件

使用 SBT

SBT 软件包命令可创建 jar 文件(请参阅使用 SBT)。

手动创建 jar

  1. 将目录 (cd) 更改为包含已编译的 HelloWorld*.class 文件的目录,然后运行以下命令将类文件打包到具有指定了主类入口点 (HelloWorld) 的清单的 jar 中。
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

将 jar 复制到 Cloud Storage

  1. 使用 gsutil 命令将 jar 复制到项目中的 Cloud Storage 存储分区
$ gsutil cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

将 jar 提交到 Cloud Dataproc Spark 作业

  1. 使用 GCP Console 将 jar 文件提交到您的 Cloud Dataproc Spark 作业

  2. 填写提交作业页面上的字段,如下所示:

    • 集群:从集群列表中选择您的集群的名称
    • 作业类型:Spark
    • 主类或 jar:指定您的 HelloWorld jar 的 Cloud Storage URI 路径 (gs://your-bucket-name/HelloWorld.jar)。如果您的 jar 没有包含指定了代码入口点(“Main-Class: HelloWorld”)的清单,则“主类或 jar”字段应声明您的主类的名称(“HelloWorld”),而您应使用您的 jar 文件的 URI 路径 (gs://your-bucket-name/HelloWorld.jar) 填写“Jar 文件”字段。
  3. 点击提交以启动作业。作业启动后,它会被添加到作业列表中

  4. 点击“作业 ID”以打开作业页面,您可以在该页面中查看作业的驱动程序输出

使用集群的 spark-shell REPL 编写并运行 Spark Scala 代码

建议您直接在您的 Cloud Dataproc 集群上开发 Scala 应用。Hadoop 和 Spark 预先安装在 Cloud Dataproc 集群上,并且它们配置了 Cloud Storage 连接器,该连接器允许您的代码直接从 Cloud Storage 读取数据并将数据写入 Cloud Storage。

此示例向您展示了如何通过 SSH 连接到项目的 Cloud Dataproc 集群主实例节点,然后使用 spark-shell REPL 创建并运行 Scala wordcount mapreduce 应用。

  1. 通过 SSH 连接到 Cloud Dataproc 集群的主实例节点
    1. 在 GCP Console 中转到项目的 Cloud Dataproc 集群页面,然后点击您的集群名称
    2. 在集群详情页面上,选择“VM 实例”标签,然后点击显示在集群主实例节点名称右侧的 SSH 选项
      此时会打开一个浏览器窗口并显示主实例节点上的主目录
      Connected, host fingerprint: ssh-rsa 2048 ...
      ...
      user@clusterName-m:~$
      
  2. 启动 spark-shell
    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    
  3. 从位于公共 Cloud Storage 中的莎士比亚作品文本片段创建 RDD(弹性分布式数据集)
    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    
  4. 在该文本上运行 wordcount mapreduce,然后显示 wordcounts 结果
    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
      (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    
  5. 将计数保存在 Cloud Storage 内的 <bucket-name>/wordcounts-out 中,然后退出 scala-shell
    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    
  6. 使用 gsutil 列出输出文件并显示文件内容
    $ gsutil ls gs://<bucket-name>/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    
  7. 检查 gs://<bucket-name>/wordcounts-out/part-00000 内容
    $ gsutil cat gs://<bucket-name>/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

运行预先安装的示例代码

Cloud Dataproc 主实例节点包含带有标准 Apache Hadoop 和 Spark 示例的可运行 jar 文件。

Jar 类型 Master node /usr/lib/ location GitHub 来源 Apache 文档
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar 来源链接 MapReduce 教程
Spark spark/lib/spark-examples.jar 来源链接 Spark 示例

通过命令行向集群提交示例

您可以使用 Cloud SDK gcloud 命令行工具从本地开发机器提交示例(请参阅使用 Google Cloud Platform Console 以从 GCP Console 提交作业)。

Hadoop WordCount 示例

gcloud dataproc jobs submit hadoop --cluster <cluster-name> \
  --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
  --class org.apache.hadoop.examples.WordCount \
  <URI of input file> <URI of output file>

Spark WordCount 示例

gcloud dataproc jobs submit spark --cluster <cluster-name> \
  --jar file:///usr/lib/spark/lib/spark-examples.jar \
  --class org.apache.spark.examples.JavaWordCount
  <URI of input file>

关闭集群

为避免不断产生费用,请关闭本教程使用的集群并删除 Cloud Storage 资源(Cloud Storage 存储分区和文件)。

要关闭集群,请运行以下命令:

gcloud dataproc clusters delete <cluster-name>

要删除 Cloud Storage jar 文件,请运行以下命令:

gsutil rm gs://<bucket-name>/HelloWorld.jar

您可以使用以下命令删除存储分区及其所有文件夹和文件:

gsutil rm -r gs://<bucket-name>/

此页内容是否有用?请给出您的反馈和评价:

发送以下问题的反馈:

此网页
Cloud Dataproc 文档
需要帮助?请访问我们的支持页面