连接到 Apache Spark

作为 BigQuery 管理员,您可以创建一个连接,以使数据分析师能够运行 Apache Spark 的存储过程

准备工作

位置注意事项

在选择数据的位置时,请考虑以下事项:

多区域

您必须指定位于同一大型地理区域中的 Google Cloud 资源:

  • BigQuery 美国多区域中的连接可以引用该美国地理区域的任何区域中的 Spark History ServerDataproc Metastore,例如 us-central1us-east4us-west2

  • BigQuery 欧盟多区域中的连接可以引用欧盟成员国中的 Spark History Server 或 Dataproc Metastore,例如 europe-north1europe-west3

单区域

单区域的连接只能引用同一区域中的 Google Cloud 资源。例如,单区域 us-east4 中的连接只能引用 us-east4 中的 Spark History Server 或 Dataproc Metastore。

创建连接

从下列选项中选择一项:

控制台

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 如需创建连接,请点击“添加”图标 添加,然后点击与外部数据源的连接

  3. 连接类型列表中,选择 Apache Spark

  4. 连接 ID 字段中,输入连接的名称,例如 spark_connection

  5. 数据位置列表中,选择一个区域。

    您可以在支持 BigQuery 的区域和多区域中创建连接。如需了解详情,请参阅位置注意事项

  6. 可选:从 Metastore 服务列表中,选择 Dataproc Metastore

  7. 可选:在 History Server 集群字段中,输入 Dataproc Persistent History Server

  8. 点击创建连接

  9. 点击转到连接

  10. 连接信息窗格中,复制服务账号 ID 以在后续步骤中使用。

bq

  1. 在命令行环境中,使用 bq mk 命令创建连接:

    bq mk --connection --connection_type='SPARK' \
     --properties=PROPERTIES \
     --project_id=PROJECT_ID \
     --location=LOCATION
     CONNECTION_ID
    

    请替换以下内容:

    • PROPERTIES:一个键值对,可提供 JSON 格式的连接专用参数

      例如:

      --properties='{
      "metastoreServiceConfig": {"metastoreService": "METASTORE_SERVICE_NAME"},
      "sparkHistoryServerConfig": {"dataprocCluster": "DATAPROC_CLUSTER_NAME"}
      }'
      

      请替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目 ID

    • LOCATION:您要在其中存储连接的位置,例如 US

    • CONNECTION_ID:连接 ID,例如 myconnection

      当您在 Google Cloud 控制台中查看连接详情时,连接 ID 是连接 ID 中显示的完全限定连接 ID 的最后一部分中的值,例如 projects/.../locations/.../connections/myconnection

  2. 检索并复制该服务账号 ID,因为您需要在另一个步骤中用到它:

    bq show --location=LOCATION --connection PROJECT_ID.LOCATION.CONNECTION_ID
    

    输出类似于以下内容:

    Connection myproject.us.myconnection
    
           name           type                    properties
    ---------------------- ------- ---------------------------------------------------
    myproject.us.myconnection  SPARK   {"serviceAccountId": "bqserver@example.iam.gserviceaccount.com"}
    

如需了解如何管理连接,请参阅管理连接

向服务账号授予访问权限

为了让 Apache Spark 的存储过程能够访问您的 Google Cloud 资源,您需要向与该存储过程的连接关联的服务账号授予必要的 IAM 权限。 或者,您也可以使用自定义服务账号进行数据访问。

  • 如需针对 BigQuery 执行数据读写操作,您需要向服务账号授予以下 IAM 权限:

    • 针对 BigQuery 表的 bigquery.tables.* 权限
    • 针对您的项目的 bigquery.readsessions.* 权限

    roles/bigquery.admin IAM 角色可提供服务账号从 BigQuery 读取数据以及将数据写入 BigQuery 所需的权限。

  • 如需针对 Cloud Storage 执行数据读写操作,您需要向服务账号授予针对 Cloud Storage 对象的 storage.objects.* 权限。

    roles/storage.objectAdmin IAM 角色可提供服务账号从 Cloud Storage 读取数据以及将数据写入 Cloud Storage 所需的权限。

  • 如果您在创建连接时指定 Dataproc Metastore,则为了使 BigQuery 能够检索有关 Metastore 配置的详细信息,您需要向服务账号授予针对 Dataproc Metastore 的 metastore.services.get 权限。

    预定义的 roles/metastore.metadataViewer 角色包含服务账号检索 Metastore 配置的详细信息所需的权限。

    您还需要向服务账号授予针对 Cloud Storage 存储桶的 roles/storage.objectAdmin 角色,以便存储过程可以访问 Dataproc Metastore 的 Hive 仓库目录 (hive.metastore.warehouse.dir)。如果存储过程要针对 Metastore 执行操作,则可能需要授予其他权限。如需详细了解 Dataproc Metastore 中的 IAM 角色和权限,请参阅 Dataproc Metastore 预定义角色和权限

  • 如果您在创建连接时指定 Dataproc Persistent History Server,则需要向服务账号授予以下角色:

    • 针对 Dataproc Persistent History Server 的 roles/dataproc.viewer 角色,其包含 dataproc.clusters.get 权限。
    • 针对您在创建 Dataproc Persistent History Server 时为属性 spark:spark.history.fs.logDirectory 指定的 Cloud Storage 存储桶的 roles/storage.objectAdmin 角色。

    如需了解详情,请参阅 Dataproc Persistent History ServerDataproc 角色和权限

与用户共享连接

您可以授予以下角色,以使用户可以查询数据并管理连接:

  • roles/bigquery.connectionUser:允许用户使用连接与外部数据源建立连接,并对其运行查询。

  • roles/bigquery.connectionAdmin:允许用户管理连接。

如需详细了解 BigQuery 中的 IAM 角色和权限,请参阅预定义的角色和权限

从下列选项中选择一项:

控制台

  1. 转到 BigQuery 页面。

    转到 BigQuery

    连接列在项目的外部连接组中。

  2. 探索器窗格中,点击您的项目名称 > 外部连接 > 连接

  3. 详细信息窗格中,点击共享以共享连接。之后,执行以下操作:

    1. 连接权限对话框中,通过添加或修改主账号与其他主账号共享连接。

    2. 点击保存

bq

您不能使用 bq 命令行工具共享连接。如需共享连接,请使用 Google Cloud 控制台或 BigQuery Connections API 方法共享连接。

API

使用 BigQuery Connections REST API 参考文档部分中的 projects.locations.connections.setIAM 方法,并提供一个 policy 资源实例。

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.api.resourcenames.ResourceName;
import com.google.cloud.bigquery.connection.v1.ConnectionName;
import com.google.cloud.bigqueryconnection.v1.ConnectionServiceClient;
import com.google.iam.v1.Binding;
import com.google.iam.v1.Policy;
import com.google.iam.v1.SetIamPolicyRequest;
import java.io.IOException;

// Sample to share connections
public class ShareConnection {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String location = "MY_LOCATION";
    String connectionId = "MY_CONNECTION_ID";
    shareConnection(projectId, location, connectionId);
  }

  static void shareConnection(String projectId, String location, String connectionId)
      throws IOException {
    try (ConnectionServiceClient client = ConnectionServiceClient.create()) {
      ResourceName resource = ConnectionName.of(projectId, location, connectionId);
      Binding binding =
          Binding.newBuilder()
              .addMembers("group:example-analyst-group@google.com")
              .setRole("roles/bigquery.connectionUser")
              .build();
      Policy policy = Policy.newBuilder().addBindings(binding).build();
      SetIamPolicyRequest request =
          SetIamPolicyRequest.newBuilder()
              .setResource(resource.toString())
              .setPolicy(policy)
              .build();
      client.setIamPolicy(request);
      System.out.println("Connection shared successfully");
    }
  }
}

后续步骤