使用数据操纵语言更新表数据

本页面介绍如何使用数据操纵语言 DML 更新和删除 BigQuery 表中的数据,但未涉及如何使用 DML 向现有表添加行。如需了解如何使用 DML 添加行,请参阅 DML 语法参考中的 INSERT 语句

请注意,BigQuery 中的 DML 存在一些限制。DML 也有自己的价格

更新数据

使用以下示例文件,按照下面的说明执行操作。该文件表示一个包含 IP 地址列的表,而您想要屏蔽此列以实现匿名化:

执行下列步骤,将示例数据加载到表中并更新 ip_address 列中的值:

第 1 步将 JSON 文件加载UserSessions 表中。

第 2 步:运行以下 DML 查询,以遮盖每行的 ip_address 列中的最后一个八位字节:

UPDATE sample_db.UserSessions
SET ip_address = REGEXP_REPLACE(ip_address, r"(\.[0-9]+)$", ".0")
WHERE TRUE

Java

试用此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;

// Sample to update data in BigQuery tables using DML query
public class UpdateTableDml {

  public static void main(String[] args) throws IOException, InterruptedException {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    updateTableDml(datasetName, tableName);
  }

  public static void updateTableDml(String datasetName, String tableName)
      throws IOException, InterruptedException {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Load JSON file into UserSessions table
      TableId tableId = TableId.of(datasetName, tableName);

      WriteChannelConfiguration writeChannelConfiguration =
          WriteChannelConfiguration.newBuilder(tableId)
              .setFormatOptions(FormatOptions.json())
              .build();

      // Imports a local JSON file into a table.
      Path jsonPath =
          FileSystems.getDefault().getPath("src/test/resources", "userSessionsData.json");

      // The location and JobName must be specified; other fields can be auto-detected.
      String jobName = "jobId_" + UUID.randomUUID().toString();
      JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();

      try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
          OutputStream stream = Channels.newOutputStream(writer)) {
        Files.copy(jsonPath, stream);
      }

      // Get the Job created by the TableDataWriteChannel and wait for it to complete.
      Job job = bigquery.getJob(jobId);
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load local file to the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      System.out.println(
          job.getStatistics().toString() + " userSessionsData json uploaded successfully");

      // Write a DML query to modify UserSessions table
      // To create DML query job to mask the last octet in every row's ip_address column
      String dmlQuery =
          String.format(
              "UPDATE `%s.%s` \n"
                  + "SET ip_address = REGEXP_REPLACE(ip_address, r\"(\\.[0-9]+)$\", \".0\")\n"
                  + "WHERE TRUE",
              datasetName, tableName);

      QueryJobConfiguration dmlQueryConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();

      // Execute the query.
      TableResult result = bigquery.query(dmlQueryConfig);

      // Print the results.
      result.iterateAll().forEach(rows -> rows.forEach(row -> System.out.println(row.getValue())));

      System.out.println("Table updated successfully using DML");
    } catch (BigQueryException e) {
      System.out.println("Table update failed \n" + e.toString());
    }
  }
}

Python

尝试此示例之前,请按照《BigQuery 快速入门:使用客户端库》中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档

import pathlib
from typing import Dict, Optional

from google.cloud import bigquery
from google.cloud.bigquery import enums

def load_from_newline_delimited_json(
    client: bigquery.Client,
    filepath: pathlib.Path,
    project_id: str,
    dataset_id: str,
    table_id: str,
) -> None:
    full_table_id = f"{project_id}.{dataset_id}.{table_id}"
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = enums.SourceFormat.NEWLINE_DELIMITED_JSON
    job_config.schema = [
        bigquery.SchemaField("id", enums.SqlTypeNames.STRING),
        bigquery.SchemaField("user_id", enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("login_time", enums.SqlTypeNames.TIMESTAMP),
        bigquery.SchemaField("logout_time", enums.SqlTypeNames.TIMESTAMP),
        bigquery.SchemaField("ip_address", enums.SqlTypeNames.STRING),
    ]

    with open(filepath, "rb") as json_file:
        load_job = client.load_table_from_file(
            json_file, full_table_id, job_config=job_config
        )

    # Wait for load job to finish.
    load_job.result()

def update_with_dml(
    client: bigquery.Client, project_id: str, dataset_id: str, table_id: str
) -> int:
    query_text = f"""
    UPDATE `{project_id}.{dataset_id}.{table_id}`
    SET ip_address = REGEXP_REPLACE(ip_address, r"(\\.[0-9]+)$", ".0")
    WHERE TRUE
    """
    query_job = client.query(query_text)

    # Wait for query job to finish.
    query_job.result()

    assert query_job.num_dml_affected_rows is not None

    print(f"DML query modified {query_job.num_dml_affected_rows} rows.")
    return query_job.num_dml_affected_rows

def run_sample(override_values: Optional[Dict[str, str]] = None) -> int:
    if override_values is None:
        override_values = {}

    client = bigquery.Client()
    filepath = pathlib.Path(__file__).parent / "user_sessions_data.json"
    project_id = client.project
    dataset_id = "sample_db"
    table_id = "UserSessions"
    load_from_newline_delimited_json(client, filepath, project_id, dataset_id, table_id)
    return update_with_dml(client, project_id, dataset_id, table_id)

删除数据

使用以下示例文件,按照下面的说明执行操作。这些文件表示一个数据集,该数据集具有多个用户会话分析数据表以及一个待删除用户表。

执行下列步骤,将数据加载到三个表中,然后删除 DeletedUsers 表中列出的用户。

第 1 步:分别加载 JSON 文件到 DeletedUsers、Users 和 UserSessions 表中。

控制台

  1. 打开 Cloud Console

  2. 浏览器面板中,展开您的项目并选择数据集。

  3. 展开 操作选项,然后点击打开

  4. 在详情面板中,点击创建表

  5. 基于以下数据创建表部分,选择上传

  6. 对于选择文件,浏览并选择您已下载的文件。

    浏览文件

  7. 对于文件格式,选择 JSON(以换行符分隔)

  8. 选择适当的表名称

  9. 架构下,点击添加字段并为表中的每列输入名称,然后选择适当的类型

    • 点击添加字段并重复该步骤,直到为表的所有列都输入了数据。
  10. 点击 Create table

示例表的架构如下:

  • DeletedUsers
    • 名称为 id,类型为 INTEGER
  • Users
    • 名称为 id,类型为 INTEGER
    • 名称为 date_joined,类型为 TIMESTAMP
  • UserSessions
    • 名称为 id,类型为 STRING
    • 名称为 user_id,类型为 INTEGER
    • 名称为 login_time,类型为 TIMESTAMP
    • 名称为 logout_time,类型为 TIMESTAMP
    • 名称为 ip_address,类型为 STRING

bq

如需使用 bq 命令行工具创建表,请使用 bq load 命令。添加 --location 标志并将其值设置为您的位置--location 标志是可选的。例如,如果您在 asia-northeast1(东京)区域使用 BigQuery,则加载命令将如下所示:

bq --location=asia-northeast1 load ...

要创建 DeleteUsers 表,请执行以下命令:

bq --location=asia-northeast1 load \
--source_format=NEWLINE_DELIMITED_JSON \
sample_db.DeletedUsers \
deletedUsersData.json \
id:integer

要创建 Users 表,请执行以下命令:

bq --location=asia-northeast1 load \
--source_format=NEWLINE_DELIMITED_JSON \
sample_db.Users \
usersData.json \
id:integer,date_joined:timestamp

要创建 UserSessions 表,请执行以下命令:

bq --location=asia-northeast1 load \
--source_format=NEWLINE_DELIMITED_JSON \
sample_db.UserSessions \
userSessionsData.json \
id:string,user_id:integer,login_time:timestamp,logout_time:timestamp,ip_address:string

第 2 步:删除与 DeletedUsers 表中的用户相关的信息。 运行以下 DML 查询

  • UsersSessions 中删除

    DELETE FROM sample_db.UserSessions
    WHERE user_id in (SELECT id from sample_db.DeletedUsers)
    
  • Users 中删除

    DELETE FROM sample_db.Users
    WHERE id in (SELECT id from sample_db.DeletedUsers)
    

表安全性

如需控制对 BigQuery 中表的访问权限,请参阅表访问权限控制简介

后续步骤

  • 请参阅 DML 参考页面。
  • 请参阅 DML 语法页面中的 DML 语法和示例。
  • 了解多语句事务,利用此事务,您可以执行多个 DML 语句并以原子方式提交结果。