从 Parquet 加载 CSV 文件,并替换表。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Go
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 BigQuery Go API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
import (
"context"
"fmt"
"cloud.google.com/go/bigquery"
)
// importParquetTruncate demonstrates loading Apache Parquet data from Cloud Storage into a table
// and overwriting/truncating existing data in the table.
func importParquetTruncate(projectID, datasetID, tableID string) error {
// projectID := "my-project-id"
// datasetID := "mydataset"
// tableID := "mytable"
ctx := context.Background()
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("bigquery.NewClient: %w", err)
}
defer client.Close()
gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
gcsRef.SourceFormat = bigquery.Parquet
gcsRef.AutoDetect = true
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
if status.Err() != nil {
return fmt.Errorf("job completed with error: %w", status.Err())
}
return nil
}
Java
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 BigQuery Java API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;
public class LoadParquetReplaceTable {
public static void main(String[] args) {
// TODO(developer): Replace these variables before running the sample.
String datasetName = "MY_DATASET_NAME";
String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
String tableName = "us_states";
loadParquetReplaceTable(datasetName, tableName, sourceUri);
}
public static void loadParquetReplaceTable(
String datasetName, String tableName, String sourceUri) {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Imports a GCS file into a table and overwrites table data if table already exists.
// This sample loads CSV file at:
// https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
TableId tableId = TableId.of(datasetName, tableName);
// For more information on LoadJobConfiguration see:
// https://googleapis.dev/java/google-cloud-clients/latest/com/google/cloud/bigquery/LoadJobConfiguration.Builder.html
LoadJobConfiguration configuration =
LoadJobConfiguration.builder(tableId, sourceUri)
.setFormatOptions(FormatOptions.parquet())
// Set the write disposition to overwrite existing table data.
.setWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.build();
// For more information on Job see:
// https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
// Load the table
Job job = bigquery.create(JobInfo.of(configuration));
// Load data from a GCS parquet file into the table
// Blocks until this load table job completes its execution, either failing or succeeding.
Job completedJob = job.waitFor();
if (completedJob == null) {
System.out.println("Job not executed since it no longer exists.");
return;
} else if (completedJob.getStatus().getError() != null) {
System.out.println(
"BigQuery was unable to load into the table due to an error: \n"
+ job.getStatus().getError());
return;
}
// Check number of rows loaded into the table
BigInteger numRows = bigquery.getTable(tableId).getNumRows();
System.out.printf("Loaded %d rows. \n", numRows);
System.out.println("GCS parquet overwrote existing table successfully.");
} catch (BigQueryException | InterruptedException e) {
System.out.println("Table extraction job was interrupted. \n" + e.toString());
}
}
}
Node.js
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Node.js 设置说明进行操作。如需了解详情,请参阅 BigQuery Node.js API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');
// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();
/**
* This sample loads the CSV file at
* https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
*
* TODO(developer): Replace the following lines with the path to your file.
*/
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';
async function loadParquetFromGCSTruncate() {
/**
* Imports a GCS file into a table and overwrites
* table data if table already exists.
*/
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const datasetId = "my_dataset";
// const tableId = "my_table";
// Configure the load job. For full list of options, see:
// https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
const metadata = {
sourceFormat: 'PARQUET',
// Set the write disposition to overwrite existing table data.
writeDisposition: 'WRITE_TRUNCATE',
};
// Load data from a Google Cloud Storage file into the table
const [job] = await bigquery
.dataset(datasetId)
.table(tableId)
.load(storage.bucket(bucketName).file(filename), metadata);
// load() waits for the job to finish
console.log(`Job ${job.id} completed.`);
console.log(
`Write disposition used: ${job.configuration.load.writeDisposition}.`
);
}
PHP
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 PHP 设置说明进行操作。如需了解详情,请参阅 BigQuery PHP API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
use Google\Cloud\BigQuery\BigQueryClient;
/**
* Import data from storage parquet with write truncate option.
*
* @param string $projectId The project Id of your Google Cloud Project.
* @param string $datasetId The BigQuery dataset ID.
* @param string $tableId The BigQuery table ID.
*/
function import_from_storage_parquet_truncate(
string $projectId,
string $datasetId,
string $tableId
): void {
// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
'projectId' => $projectId,
]);
$table = $bigQuery->dataset($datasetId)->table($tableId);
// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET')->writeDisposition('WRITE_TRUNCATE');
$job = $table->runJob($loadConfig);
// check if the job is complete
$job->reload();
if (!$job->isComplete()) {
throw new \Exception('Job has not yet completed', 500);
}
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
$error = $job->info()['status']['errorResult']['message'];
printf('Error running job: %s' . PHP_EOL, $error);
} else {
print('Data imported successfully' . PHP_EOL);
}
}
Python
试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 BigQuery Python API 参考文档。
如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证。
import io
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
)
body = io.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
source_format=bigquery.SourceFormat.PARQUET,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。