DML でテーブルを更新する

DML クエリを使用して、BigQuery テーブルのデータを更新します。

コードサンプル

Java

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートにある Java の設定手順を完了してください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;

// Sample to update data in BigQuery tables using DML query
public class UpdateTableDml {

  public static void main(String[] args) throws IOException, InterruptedException {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    updateTableDml(datasetName, tableName);
  }

  public static void updateTableDml(String datasetName, String tableName)
      throws IOException, InterruptedException {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Load JSON file into UserSessions table
      TableId tableId = TableId.of(datasetName, tableName);

      WriteChannelConfiguration writeChannelConfiguration =
          WriteChannelConfiguration.newBuilder(tableId)
              .setFormatOptions(FormatOptions.json())
              .build();

      // Imports a local JSON file into a table.
      Path jsonPath =
          FileSystems.getDefault().getPath("src/test/resources", "userSessionsData.json");

      // The location and JobName must be specified; other fields can be auto-detected.
      String jobName = "jobId_" + UUID.randomUUID().toString();
      JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();

      try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
          OutputStream stream = Channels.newOutputStream(writer)) {
        Files.copy(jsonPath, stream);
      }

      // Get the Job created by the TableDataWriteChannel and wait for it to complete.
      Job job = bigquery.getJob(jobId);
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load local file to the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      System.out.println(
          job.getStatistics().toString() + " userSessionsData json uploaded successfully");

      // Write a DML query to modify UserSessions table
      // To create DML query job to mask the last octet in every row's ip_address column
      String dmlQuery =
          String.format(
              "UPDATE `%s.%s` \n"
                  + "SET ip_address = REGEXP_REPLACE(ip_address, r\"(\\.[0-9]+)$\", \".0\")\n"
                  + "WHERE TRUE",
              datasetName, tableName);

      QueryJobConfiguration dmlQueryConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();

      // Execute the query.
      TableResult result = bigquery.query(dmlQueryConfig);

      // Print the results.
      result.iterateAll().forEach(rows -> rows.forEach(row -> System.out.println(row.getValue())));

      System.out.println("Table updated successfully using DML");
    } catch (BigQueryException e) {
      System.out.println("Table update failed \n" + e.toString());
    }
  }
}

Python

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートにある Python の設定手順を完了してください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。

import pathlib
from typing import Dict, Optional

from google.cloud import bigquery
from google.cloud.bigquery import enums


def load_from_newline_delimited_json(
    client: bigquery.Client,
    filepath: pathlib.Path,
    project_id: str,
    dataset_id: str,
    table_id: str,
) -> None:
    full_table_id = f"{project_id}.{dataset_id}.{table_id}"
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = enums.SourceFormat.NEWLINE_DELIMITED_JSON
    job_config.schema = [
        bigquery.SchemaField("id", enums.SqlTypeNames.STRING),
        bigquery.SchemaField("user_id", enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("login_time", enums.SqlTypeNames.TIMESTAMP),
        bigquery.SchemaField("logout_time", enums.SqlTypeNames.TIMESTAMP),
        bigquery.SchemaField("ip_address", enums.SqlTypeNames.STRING),
    ]

    with open(filepath, "rb") as json_file:
        load_job = client.load_table_from_file(
            json_file, full_table_id, job_config=job_config
        )

    # Wait for load job to finish.
    load_job.result()


def update_with_dml(
    client: bigquery.Client, project_id: str, dataset_id: str, table_id: str
) -> int:
    query_text = f"""
    UPDATE `{project_id}.{dataset_id}.{table_id}`
    SET ip_address = REGEXP_REPLACE(ip_address, r"(\\.[0-9]+)$", ".0")
    WHERE TRUE
    """
    query_job = client.query(query_text)

    # Wait for query job to finish.
    query_job.result()

    assert query_job.num_dml_affected_rows is not None

    print(f"DML query modified {query_job.num_dml_affected_rows} rows.")
    return query_job.num_dml_affected_rows


def run_sample(override_values: Optional[Dict[str, str]] = None) -> int:
    if override_values is None:
        override_values = {}

    client = bigquery.Client()
    filepath = pathlib.Path(__file__).parent / "user_sessions_data.json"
    project_id = client.project
    dataset_id = "sample_db"
    table_id = "UserSessions"
    load_from_newline_delimited_json(client, filepath, project_id, dataset_id, table_id)
    return update_with_dml(client, project_id, dataset_id, table_id)

次のステップ

他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプル ブラウザをご覧ください。