将 Cloud Storage 连接器与 Apache Spark 搭配使用

本教程介绍了如何执行将 Cloud Storage 连接器Apache Spark 搭配使用的示例代码。

目标

使用 Java、Scala 或 Python 编写简单的 wordcount Spark 作业,然后在 Dataproc 集群上运行该作业。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Compute Engine
  • Dataproc
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

运行以下步骤为运行本教程中的代码做准备。

  1. 设置项目。必要的话,请设置一个启用了 Dataproc、Compute Engine 和 Cloud Storage API 并在本地机器上安装了 Cloud SDK 的项目。

    1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
    2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

      转到“项目选择器”

    3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

    4. 启用 Dataproc, Compute Engine, and Cloud Storage API。

      启用 API

    5. 创建服务帐号:

      1. 在 Cloud Console 中,转到创建服务帐号页面。

        转到“创建服务帐号”
      2. 选择一个项目。
      3. 服务帐号名称字段中,输入一个名称。 Cloud Console 会根据此名称填充服务帐号 ID 字段。

        服务帐号说明字段中,输入说明。例如,Service account for quickstart

      4. 点击创建并继续
      5. 点击选择角色字段。

        快速访问下,点击基本,然后点击所有者

      6. 点击继续
      7. 点击完成以完成服务帐号的创建过程。

        不要关闭浏览器窗口。您将在下一步骤中用到它。

    6. 创建服务帐号密钥:

      1. 在 Cloud Console 中,点击您创建的服务帐号的电子邮件地址。
      2. 点击密钥
      3. 依次点击添加密钥创建新密钥
      4. 点击创建。JSON 密钥文件将下载到您的计算机上。
      5. 点击关闭
    7. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

    8. 安装并初始化 Cloud SDK
    9. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

      转到“项目选择器”

    10. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

    11. 启用 Dataproc, Compute Engine, and Cloud Storage API。

      启用 API

    12. 创建服务帐号:

      1. 在 Cloud Console 中,转到创建服务帐号页面。

        转到“创建服务帐号”
      2. 选择一个项目。
      3. 服务帐号名称字段中,输入一个名称。 Cloud Console 会根据此名称填充服务帐号 ID 字段。

        服务帐号说明字段中,输入说明。例如,Service account for quickstart

      4. 点击创建并继续
      5. 点击选择角色字段。

        快速访问下,点击基本,然后点击所有者

      6. 点击继续
      7. 点击完成以完成服务帐号的创建过程。

        不要关闭浏览器窗口。您将在下一步骤中用到它。

    13. 创建服务帐号密钥:

      1. 在 Cloud Console 中,点击您创建的服务帐号的电子邮件地址。
      2. 点击密钥
      3. 依次点击添加密钥创建新密钥
      4. 点击创建。JSON 密钥文件将下载到您的计算机上。
      5. 点击关闭
    14. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。

    15. 安装并初始化 Cloud SDK

  2. 创建 Cloud Storage 存储分区。您需要使用 Cloud Storage 来保存教程数据。如果您没有可用的 Cloud Storage 存储分区,请在项目中创建新存储分区。

    1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

      转到浏览器

    2. 点击创建存储分区
    3. 创建存储分区页面上,输入您的存储分区信息。要转到下一步,请点击继续
      • 指定存储分区的名称中,输入符合存储分区命名要求的名称。
      • 对于选择数据存储位置,执行以下操作:
        • 选择位置类型选项。
        • 选择位置选项。
      • 对于为数据选择一个默认存储类别,请选择一个存储类别
      • 对于选择如何控制对象的访问权限,请选择访问权限控制选项。
      • 对于高级设置(可选),请指定加密方法保留政策存储分区标签
    4. 点击创建

  3. 设置本地环境变量。在本地机器上设置环境变量。设置 Google Cloud 项目 ID 以及您将在本教程中使用的 Cloud Storage 存储分区的名称。此外,还要提供现有或新 Dataproc 集群的名称和区域。您可以在下一步中创建要用于本教程的集群。

    PROJECT=project-id
    
    BUCKET_NAME=bucket-name
    
    CLUSTER=cluster-name
    
    REGION=cluster-region Example: "us-central1"
    

  4. 创建 Dataproc 集群。运行以下命令,在指定的 Compute Engine 地区中创建单节点 Dataproc 集群。

    gcloud dataproc clusters create ${CLUSTER} \
        --project=${PROJECT} \
        --region=${REGION} \
        --single-node
    

  5. 将公共数据复制到 Cloud Storage 存储分区。将公共数据“莎士比亚”文本片段复制到 Cloud Storage 存储分区的 input 文件夹中:

    gsutil cp gs://pub/shakespeare/rose.txt \
        gs://${BUCKET_NAME}/input/rose.txt
    

  6. 设置 Java (Apache Maven)Scala (SBT)Python 开发环境。

准备 Spark wordcount 作业

在下面选择一个标签页,按照步骤准备作业软件包或文件以提交到集群。您可以准备以下作业类型之一:

Java

  1. pom.xml 文件复制到本地机器。 以下 pom.xml 文件指定 Scala 和 Spark 库依赖项,它们具有 provided 范围,指示 Dataproc 集群将在运行时提供这些库。pom.xml 文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为 gs:// 的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>dataproc.codelab</groupId>
      <artifactId>word-count</artifactId>
      <version>1.0</version>
    
      <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>Scala version, for example, 2.11.8</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_Scala major.minor.version, for example, 2.11</artifactId>
          <version>Spark version, for example, 2.3.1</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
    </project>
    
  2. 将下面列出的 WordCount.java 代码复制到您的本地机器
    1. 创建一组路径为 src/main/java/dataproc/codelab 的目录:
      mkdir -p src/main/java/dataproc/codelab
      
    2. WordCount.java 复制到您的本地机器的 src/main/java/dataproc/codelab
      cp WordCount.java src/main/java/dataproc/codelab
      

    WordCount.java 是 Java 中一个简单的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。

    package dataproc.codelab;
    
    import java.util.Arrays;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    public class WordCount {
      public static void main(String[] args) {
        if (args.length != 2) {
          throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>");
        }
        String inputPath = args[0];
        String outputPath = args[1];
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count"));
        JavaRDD<String> lines = sparkContext.textFile(inputPath);
        JavaRDD<String> words = lines.flatMap(
            (String line) -> Arrays.asList(line.split(" ")).iterator()
        );
        JavaPairRDD<String, Integer> wordCounts = words.mapToPair(
            (String word) -> new Tuple2<>(word, 1)
        ).reduceByKey(
            (Integer count1, Integer count2) -> count1 + count2
        );
        wordCounts.saveAsTextFile(outputPath);
      }
    }
    
  3. 构建软件包
    mvn clean package
    
    如果构建成功,则系统会创建一个 target/spark-with-gcs-1.0-SNAPSHOT.jar
  4. 将软件包暂存到 Cloud Storage
    gsutil cp target/word-count-1.0.jar \
        gs://${BUCKET_NAME}/java/word-count-1.0.jar
    

Scala

  1. build.sbt 文件复制到本地机器。 以下 build.sbt 文件指定 Scala 和 Spark 库依赖项,它们具有 provided 范围,指示 Dataproc 集群将在运行时提供这些库。build.sbt 文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为 gs:// 的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件
    scalaVersion := "Scala version, for example, 2.11.8"
    
    name := "word-count"
    organization := "dataproc.codelab"
    version := "1.0"
    
    libraryDependencies ++= Seq(
      "org.scala-lang" % "scala-library" % scalaVersion.value % "provided",
      "org.apache.spark" %% "spark-core" % "Spark version, for example, 2.3.1" % "provided"
    )
    
    
  2. word-count.scala 复制到本地机器。 这是 Java 中一个简单的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
    package dataproc.codelab
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length != 2) {
          throw new IllegalArgumentException(
              "Exactly 2 arguments are required: <inputPath> <outputPath>")
        }
    
        val inputPath = args(0)
        val outputPath = args(1)
    
        val sc = new SparkContext(new SparkConf().setAppName("Word Count"))
        val lines = sc.textFile(inputPath)
        val words = lines.flatMap(line => line.split(" "))
        val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
        wordCounts.saveAsTextFile(outputPath)
      }
    }
    
    
  3. 构建软件包
    sbt clean package
    
    如果构建成功,则系统会创建一个 target/scala-2.11/word-count_2.11-1.0.jar
  4. 将软件包暂存到 Cloud Storage
    gsutil cp target/scala-2.11/word-count_2.11-1.0.jar \
        gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
    

Python

  1. word-count.py 复制到本地机器。 这是 Python 中一个简单的使用 PySpark 的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
    #!/usr/bin/env python
    
    import pyspark
    import sys
    
    if len(sys.argv) != 3:
      raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
    
    inputUri=sys.argv[1]
    outputUri=sys.argv[2]
    
    sc = pyspark.SparkContext()
    lines = sc.textFile(sys.argv[1])
    words = lines.flatMap(lambda line: line.split())
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
    wordCounts.saveAsTextFile(sys.argv[2])
    

提交作业

运行以下 gcloud 命令,将 Wordcount 作业提交到 Dataproc 集群。

Java

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Scala

gcloud dataproc jobs submit spark \
    --cluster=${CLUSTER} \
    --class=dataproc.codelab.WordCount \
    --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

Python

gcloud dataproc jobs submit pyspark word-count.py \
    --cluster=${CLUSTER} \
    --region=${REGION} \
    -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/

查看输出

作业完成后,运行以下 Cloud SDK gsutil 命令以查看字数统计输出。

gsutil cat gs://${BUCKET_NAME}/output/*

字数统计输出应类似于以下内容:

(a,2)
(call,1)
(What's,1)
(sweet.,1)
(we,1)
(as,1)
(name?,1)
(any,1)
(other,1)
(rose,1)
(smell,1)
(name,1)
(would,1)
(in,1)
(which,1)
(That,1)
(By,1)

清除数据

完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除 Dataproc 集群

您可能希望只删除项目中的集群,而不是项目。

删除 Cloud Storage 存储分区

Cloud Console

  1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

    转到浏览器

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储分区,请点击删除,然后按照说明操作。

命令行

    删除存储分区:
    gsutil rb BUCKET_NAME

后续步骤