DML クエリを使用して、BigQuery テーブルのデータを更新します。
コードサンプル
Java
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの Java の手順に沿って設定を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
// Sample to update data in BigQuery tables using DML query
public class UpdateTableDml {
public static void main(String[] args) throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
updateTableDml(datasetName, tableName);
}
public static void updateTableDml(String datasetName, String tableName)
throws IOException, InterruptedException {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Load JSON file into UserSessions table
TableId tableId = TableId.of(datasetName, tableName);
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId)
.setFormatOptions(FormatOptions.json())
.build();
// Imports a local JSON file into a table.
Path jsonPath =
FileSystems.getDefault().getPath("src/test/resources", "userSessionsData.json");
// The location and JobName must be specified; other fields can be auto-detected.
String jobName = "jobId_" + UUID.randomUUID().toString();
JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();
try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
OutputStream stream = Channels.newOutputStream(writer)) {
Files.copy(jsonPath, stream);
}
// Get the Job created by the TableDataWriteChannel and wait for it to complete.
Job job = bigquery.getJob(jobId);
Job completedJob = job.waitFor();
if (completedJob == null) {
System.out.println("Job not executed since it no longer exists.");
return;
} else if (completedJob.getStatus().getError() != null) {
System.out.println(
"BigQuery was unable to load local file to the table due to an error: \n"
+ job.getStatus().getError());
return;
}
System.out.println(
job.getStatistics().toString() + " userSessionsData json uploaded successfully");
// Write a DML query to modify UserSessions table
// To create DML query job to mask the last octet in every row's ip_address column
String dmlQuery =
String.format(
"UPDATE `%s.%s` \n"
+ "SET ip_address = REGEXP_REPLACE(ip_address, r\"(\\.[0-9]+)$\", \".0\")\n"
+ "WHERE TRUE",
datasetName, tableName);
QueryJobConfiguration dmlQueryConfig = QueryJobConfiguration.newBuilder(dmlQuery).build();
// Execute the query.
TableResult result = bigquery.query(dmlQueryConfig);
// Print the results.
result.iterateAll().forEach(rows -> rows.forEach(row -> System.out.println(row.getValue())));
System.out.println("Table updated successfully using DML");
} catch (BigQueryException e) {
System.out.println("Table update failed \n" + e.toString());
}
}
}
Python
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの Python の手順に沿って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
import pathlib
from typing import Dict, Optional
from google.cloud import bigquery
from google.cloud.bigquery import enums
def load_from_newline_delimited_json(
client: bigquery.Client,
filepath: pathlib.Path,
project_id: str,
dataset_id: str,
table_id: str,
) -> None:
full_table_id = f"{project_id}.{dataset_id}.{table_id}"
job_config = bigquery.LoadJobConfig()
job_config.source_format = enums.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.schema = [
bigquery.SchemaField("id", enums.SqlTypeNames.STRING),
bigquery.SchemaField("user_id", enums.SqlTypeNames.INTEGER),
bigquery.SchemaField("login_time", enums.SqlTypeNames.TIMESTAMP),
bigquery.SchemaField("logout_time", enums.SqlTypeNames.TIMESTAMP),
bigquery.SchemaField("ip_address", enums.SqlTypeNames.STRING),
]
with open(filepath, "rb") as json_file:
load_job = client.load_table_from_file(
json_file, full_table_id, job_config=job_config
)
# Wait for load job to finish.
load_job.result()
def update_with_dml(
client: bigquery.Client, project_id: str, dataset_id: str, table_id: str
) -> int:
query_text = f"""
UPDATE `{project_id}.{dataset_id}.{table_id}`
SET ip_address = REGEXP_REPLACE(ip_address, r"(\\.[0-9]+)$", ".0")
WHERE TRUE
"""
query_job = client.query(query_text)
# Wait for query job to finish.
query_job.result()
assert query_job.num_dml_affected_rows is not None
print(f"DML query modified {query_job.num_dml_affected_rows} rows.")
return query_job.num_dml_affected_rows
def run_sample(override_values: Optional[Dict[str, str]] = None) -> int:
if override_values is None:
override_values = {}
client = bigquery.Client()
filepath = pathlib.Path(__file__).parent / "user_sessions_data.json"
project_id = client.project
dataset_id = "sample_db"
table_id = "UserSessions"
load_from_newline_delimited_json(client, filepath, project_id, dataset_id, table_id)
return update_with_dml(client, project_id, dataset_id, table_id)
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。