将 Cloud Storage 连接器与 Apache Spark 搭配使用

本教程介绍了如何执行将 Cloud Storage 连接器Apache Spark 搭配使用的示例代码。

目标

使用 Java、Scala 或 Python 编写简单的 wordcount Spark 作业,然后在 Dataproc 集群上运行该作业。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Compute Engine
  • Dataproc
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

运行以下步骤为运行本教程中的代码做准备。

  1. 设置项目。如有必要,请设置一个启用了 Dataproc、Compute Engine 和 Cloud Storage API 并在本地机器上安装了 Google Cloud CLI 的项目。

    1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
    2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

      转到“项目选择器”

    3. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

    4. 启用 Dataproc, Compute Engine, and Cloud Storage API。

      启用 API

    5. 创建服务帐号:

      1. 在控制台中,打开创建服务帐号页面。

        打开“创建服务帐号”
      2. 选择您的项目。
      3. 服务帐号名称字段中,输入一个名称。控制台会根据此名称填充服务帐号 ID 字段。

        服务帐号说明字段中,输入说明。例如,Service account for quickstart

      4. 点击创建并继续
      5. 如需提供对项目的访问权限,请向服务帐号授予以下角色:Project > Owner

        选择角色列表中,选择一个角色。

        如需添加其他角色,请点击 添加其他角色,然后添加其他各个角色。

      6. 点击继续
      7. 点击完成以完成服务帐号的创建过程。

        不要关闭浏览器窗口。您将在下一步骤中用到它。

    6. 创建服务帐号密钥:

      1. 在控制台中,点击您创建的服务帐号的电子邮件地址。
      2. 点击密钥
      3. 点击添加密钥,然后点击创建新密钥
      4. 点击创建。JSON 密钥文件将下载到您的计算机上。
      5. 点击关闭
    7. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

    8. 安装初始化 Google Cloud CLI。
    9. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

      转到“项目选择器”

    10. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

    11. 启用 Dataproc, Compute Engine, and Cloud Storage API。

      启用 API

    12. 创建服务帐号:

      1. 在控制台中,打开创建服务帐号页面。

        打开“创建服务帐号”
      2. 选择您的项目。
      3. 服务帐号名称字段中,输入一个名称。控制台会根据此名称填充服务帐号 ID 字段。

        服务帐号说明字段中,输入说明。例如,Service account for quickstart

      4. 点击创建并继续
      5. 如需提供对项目的访问权限,请向服务帐号授予以下角色:Project > Owner

        选择角色列表中,选择一个角色。

        如需添加其他角色,请点击 添加其他角色,然后添加其他各个角色。

      6. 点击继续
      7. 点击完成以完成服务帐号的创建过程。

        不要关闭浏览器窗口。您将在下一步骤中用到它。

    13. 创建服务帐号密钥:

      1. 在控制台中,点击您创建的服务帐号的电子邮件地址。
      2. 点击密钥
      3. 点击添加密钥,然后点击创建新密钥
      4. 点击创建。JSON 密钥文件将下载到您的计算机上。
      5. 点击关闭
    14. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

    15. 安装初始化 Google Cloud CLI。

  2. 创建 Cloud Storage 存储分区。您需要使用 Cloud Storage 来保存教程数据。如果您没有可用的 Cloud Storage 存储分区,请在项目中创建新存储分区。

    1. 在控制台中,打开 Cloud Storage 浏览器页面。

      打开浏览器

    2. 点击创建存储分区
    3. 创建存储分区页面上,输入您的存储分区信息。要转到下一步,请点击继续
      • 指定存储分区的名称中,输入符合存储分区命名要求的名称。
      • 对于选择数据存储位置,执行以下操作:
        • 选择位置类型选项。
        • 选择位置选项。
      • 对于为数据选择一个默认存储类别,请选择一个存储类别
      • 对于选择如何控制对象的访问权限,请选择访问权限控制选项。
      • 对于高级设置(可选),请指定加密方法保留政策存储分区标签
    4. 点击创建

  3. 设置本地环境变量。在本地机器上设置环境变量。设置 Google Cloud 项目 ID 以及您将在本教程中使用的 Cloud Storage 存储分区的名称。此外,还要提供现有或新 Dataproc 集群的名称和区域。您可以在下一步中创建要用于本教程的集群。

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. 创建 Dataproc 集群。运行以下命令,在指定的 Compute Engine 地区中创建单节点 Dataproc 集群。

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. 将公共数据复制到 Cloud Storage 存储分区。将公共数据“莎士比亚”文本片段复制到 Cloud Storage 存储分区的 input 文件夹中:

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. 设置 Java (Apache Maven)Scala (SBT)Python 开发环境。

准备 Spark wordcount 作业

在下面选择一个标签页,按照步骤准备作业软件包或文件以提交到集群。您可以准备以下作业类型之一:

Java

  1. pom.xml 文件复制到本地机器。 以下 pom.xml 文件指定 Scala 和 Spark 库依赖项,它们具有 provided 范围,指示 Dataproc 集群将在运行时提供这些库。pom.xml 文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为 gs:// 的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. 将下面列出的 WordCount.java 代码复制到您的本地机器
    1. 创建一组路径为 src/main/java/dataproc/codelab 的目录:
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java 复制到您的本地机器的 src/main/java/dataproc/codelab
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java 是 Java 中一个简单的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. 构建软件包
    mvn clean package
    
    如果构建成功,则系统会创建一个 target/spark-with-gcs-1.0-SNAPSHOT.jar
  4. 将软件包暂存到 Cloud Storage
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt 文件复制到本地机器。 以下 build.sbt 文件指定 Scala 和 Spark 库依赖项,它们具有 provided 范围,指示 Dataproc 集群将在运行时提供这些库。build.sbt 文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为 gs:// 的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. word-count.scala 复制到本地机器。 这是 Java 中一个简单的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. 构建软件包
    sbt clean package
    
    如果构建成功,则系统会创建一个 target/scala-2.11/word-count_2.11-1.0.jar
  4. 将软件包暂存到 Cloud Storage
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py 复制到本地机器。 这是 Python 中一个简单的使用 PySpark 的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

提交作业

运行以下 gcloud 命令,将 Wordcount 作业提交到 Dataproc 集群。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

查看输出

作业完成后,运行以下 gcloud CLI gsutil 命令以查看单词数输出。

gsutil cat gs://${BUCKET_NAME}/output/*

字数统计输出应类似于以下内容:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

清除数据

完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在控制台中,打开管理资源页面。

    打开“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除 Dataproc 集群

您可能希望只删除项目中的集群,而不是项目。

删除 Cloud Storage 存储分区

控制台

  1. 在控制台中,转到 Cloud Storage 浏览器页面。

    打开浏览器

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储分区,请点击删除,然后按照说明操作。

命令行

    删除存储分区:
    gsutil rb BUCKET_NAME

后续步骤