Membuat kueri berkelanjutan
Dokumen ini menjelaskan cara menjalankan kueri berkelanjutan di BigQuery.
Kueri berkelanjutan BigQuery adalah pernyataan SQL yang berjalan secara berkelanjutan. Kueri berkelanjutan memungkinkan Anda menganalisis data yang masuk di BigQuery secara real time, lalu mengekspor hasilnya ke Bigtable atau Pub/Sub, atau menulis hasilnya ke tabel BigQuery.
Memilih jenis akun
Anda dapat membuat dan menjalankan tugas kueri berkelanjutan menggunakan akun pengguna, atau Anda dapat membuat tugas kueri berkelanjutan menggunakan akun pengguna, lalu menjalankannya menggunakan akun layanan. Anda harus menggunakan akun layanan untuk menjalankan kueri berkelanjutan yang mengekspor hasil ke topik Pub/Sub.
Saat Anda menggunakan akun pengguna, kueri berkelanjutan akan berjalan selama dua hari. Saat Anda menggunakan akun layanan, kueri berkelanjutan akan berjalan hingga dibatalkan secara eksplisit. Untuk mengetahui informasi selengkapnya, lihat Otorisasi.
Izin yang diperlukan
Bagian ini menjelaskan izin yang Anda perlukan untuk membuat dan menjalankan kueri berkelanjutan. Sebagai alternatif untuk peran Identity and Access Management (IAM) yang disebutkan, Anda bisa mendapatkan izin yang diperlukan melalui peran kustom.
Izin saat menggunakan akun pengguna
Bagian ini memberikan informasi tentang peran dan izin yang diperlukan untuk membuat dan menjalankan kueri berkelanjutan menggunakan akun pengguna.
Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create
. Setiap peran IAM berikut memberikan izin bigquery.jobs.create
:
- Pengguna BigQuery (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Untuk mengekspor data dari tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.export
. Setiap peran IAM berikut memberikan izin bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Untuk memperbarui data dalam tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.updateData
. Setiap peran IAM berikut memberikan izin bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kasus penggunaan kueri berkelanjutan, akun pengguna harus memiliki peran Service Usage Admin (roles/serviceusage.serviceUsageAdmin
).
Izin saat menggunakan akun layanan
Bagian ini memberikan informasi tentang peran dan izin yang diperlukan oleh akun pengguna yang membuat kueri berkelanjutan, dan akun layanan yang menjalankan kueri berkelanjutan.
Izin akun pengguna
Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create
. Setiap peran IAM berikut memberikan izin bigquery.jobs.create
:
- Pengguna BigQuery (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Untuk mengirimkan tugas yang berjalan menggunakan akun layanan, akun pengguna harus memiliki peran Service Account User (roles/iam.serviceAccountUser
). Jika Anda menggunakan akun pengguna yang sama untuk membuat akun layanan, akun pengguna tersebut harus memiliki peran Service Account Admin (roles/iam.serviceAccountAdmin
). Untuk informasi tentang cara membatasi akses pengguna ke satu akun layanan,
bukan ke semua akun layanan dalam project, lihat
Memberikan satu peran.
Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kasus penggunaan kueri berkelanjutan, akun pengguna harus memiliki peran Service Usage Admin (roles/serviceusage.serviceUsageAdmin
).
Izin akun layanan
Untuk mengekspor data dari tabel BigQuery, akun layanan harus memiliki izin IAM bigquery.tables.export
. Setiap peran IAM berikut memberikan izin bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
bigquery.tables.updateData
. Setiap peran IAM berikut memberikan izin bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Sebelum memulai
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Membuat pemesanan
Buat pemesanan edisi Enterprise atau Enterprise Plus,
lalu
buat penetapan pemesanan
dengan jenis tugas CONTINUOUS
.
Saat Anda membuat penetapan reservasi untuk kueri berkelanjutan, pemesanan terkait dibatasi hingga 500 slot atau kurang, dan tidak dapat dikonfigurasi untuk menggunakan penskalaan otomatis.
Ekspor ke Pub/Sub
API tambahan, izin IAM, dan resource Google Cloud diperlukan untuk mengekspor data ke Pub/Sub. Untuk mengetahui informasi selengkapnya, lihat artikel Mengekspor ke Pub/Sub.
Menyematkan atribut kustom sebagai metadata dalam pesan Pub/Sub
Anda dapat menggunakan atribut Pub/Sub untuk memberikan informasi tambahan tentang pesan, seperti prioritas, asal, tujuan, atau metadata tambahan. Anda juga dapat menggunakan atribut untuk memfilter pesan di langganan.
Dalam hasil kueri berkelanjutan, jika kolom diberi nama _ATTRIBUTES
, nilainya akan disalin ke atribut pesan Pub/Sub.
Kolom yang disediakan dalam _ATTRIBUTES
digunakan sebagai kunci atribut.
Kolom _ATTRIBUTES
harus berjenis JSON
, dalam format
ARRAY<STRUCT<STRING, STRING>>
atau STRUCT<STRING>
.
Untuk contohnya, lihat mengekspor data ke topik Pub/Sub.
Mengekspor ke Bigtable
API tambahan, izin IAM, dan resource Google Cloud diperlukan untuk mengekspor data ke Bigtable. Untuk mengetahui informasi selengkapnya, lihat Mengekspor ke Bigtable.
Menulis data ke tabel BigQuery
Anda dapat menulis data ke tabel BigQuery menggunakan pernyataan INSERT
.
Menggunakan fungsi AI
API tambahan, izin IAM, dan resource Google Cloud diperlukan untuk menggunakan fungsi AI yang didukung dalam kueri berkelanjutan. Untuk informasi selengkapnya, lihat salah satu topik berikut, berdasarkan kasus penggunaan Anda:
- Membuat teks menggunakan fungsi
ML.GENERATE_TEXT
- Membuat penyematan teks menggunakan fungsi
ML.GENERATE_EMBEDDING
- Memahami teks dengan fungsi
ML.UNDERSTAND_TEXT
- Menerjemahkan teks dengan fungsi
ML.TRANSLATE
Saat Anda menggunakan fungsi AI dalam kueri berkelanjutan, pertimbangkan apakah output kueri akan tetap berada dalam kuota untuk fungsi tersebut. Jika melebihi kuota, Anda mungkin harus menangani data yang tidak diproses secara terpisah.
Menjalankan kueri berkelanjutan menggunakan akun pengguna
Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun pengguna. Setelah kueri berkelanjutan berjalan, Anda dapat menutup konsol Google Cloud, jendela terminal, atau aplikasi tanpa mengganggu eksekusi kueri.
Ikuti langkah-langkah berikut untuk menjalankan kueri berkelanjutan:
Konsol
Di konsol Google Cloud, buka halaman BigQuery.
Di editor kueri, klik More.
Di bagian Choose query mode, pilih Continuous query.
Klik Konfirmasi.
Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
Klik Run.
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Di Cloud Shell, jalankan kueri berkelanjutan menggunakan perintah
bq query
dengan flag--continuous
:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Ganti
QUERY
dengan pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
API
Jalankan kueri berkelanjutan dengan memanggil
metode jobs.insert
.
Anda harus menetapkan kolom continuous
ke true
di
JobConfigurationQuery
resource Job
yang Anda teruskan.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
Ganti kode berikut:
PROJECT_ID
: project ID Anda.QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
Menjalankan kueri berkelanjutan menggunakan akun layanan
Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun layanan. Setelah kueri berkelanjutan berjalan, Anda dapat menutup konsol Google Cloud, jendela terminal, atau aplikasi tanpa mengganggu eksekusi kueri.
Ikuti langkah-langkah berikut untuk menggunakan akun layanan guna menjalankan kueri berkelanjutan:
Konsol
- Membuat akun layanan
- Berikan izin yang diperlukan ke akun layanan.
Di Konsol Google Cloud, buka halaman BigQuery.
Di editor kueri, klik More.
Di bagian Choose query mode, pilih Continuous query.
Klik Konfirmasi.
Di editor kueri, klik Lainnya > Setelan kueri.
Di bagian Continuous query, gunakan kotak Service account untuk memilih akun layanan yang Anda buat.
Klik Simpan.
Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
Klik Run.
bq
- Membuat akun layanan
- Berikan izin yang diperlukan ke akun layanan.
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Di command line, jalankan kueri berkelanjutan menggunakan perintah
bq query
dengan flag berikut:- Tetapkan flag
--continuous
ketrue
untuk membuat kueri berkelanjutan. - Gunakan flag
--connection_property
untuk menentukan akun layanan yang akan digunakan.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Ganti kode berikut:
PROJECT_ID
: project ID Anda.SERVICE_ACCOUNT_EMAIL
: email akun layanan. Anda bisa mendapatkan email akun layanan dari halaman Akun layanan di konsol Google Cloud.QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
- Tetapkan flag
API
- Membuat akun layanan
- Berikan izin yang diperlukan ke akun layanan.
Jalankan kueri berkelanjutan dengan memanggil metode
jobs.insert
. Tetapkan kolom berikut di resourceJobConfigurationQuery
dari resourceJob
yang Anda teruskan:- Tetapkan kolom
continuous
ketrue
untuk membuat kueri berkelanjutan. - Gunakan kolom
connection_property
untuk menentukan akun layanan yang akan digunakan.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
Ganti kode berikut:
PROJECT_ID
: project ID Anda.QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.SERVICE_ACCOUNT_EMAIL
: email akun layanan. Anda bisa mendapatkan email akun layanan di halaman Service accounts di konsol Google Cloud.
- Tetapkan kolom
Contoh
Contoh SQL berikut menunjukkan kasus penggunaan umum untuk kueri berkelanjutan.
Mengekspor data ke topik Pub/Sub
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan memublikasikan data ke topik Pub/Sub secara real time dengan atribut pesan:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Mengekspor data ke tabel Bigtable
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan mengekspor data ke tabel Bigtable secara real time:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
Menulis data ke tabel BigQuery
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter dan mengubah data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, lalu menulis data ke tabel BigQuery lain secara real time. Hal ini membuat data tersedia untuk analisis downstream lebih lanjut.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Memproses data menggunakan model Vertex AI
Contoh berikut menunjukkan kueri berkelanjutan yang menggunakan model Vertex AI untuk membuat iklan bagi penumpang taksi berdasarkan lintang dan bujur mereka saat ini, lalu mengekspor hasilnya ke topik Pub/Sub secara real time:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Memulai kueri berkelanjutan dari titik waktu tertentu
Saat Anda memulai kueri berkelanjutan, kueri akan memproses semua baris dalam tabel
yang Anda pilih, lalu memproses baris baru saat masuk. Jika ingin melewati pemrosesan beberapa atau semua data yang ada, Anda dapat menggunakan fungsi histori perubahan APPENDS
untuk memulai pemrosesan dari titik waktu tertentu.
Contoh berikut menunjukkan cara memulai kueri berkelanjutan dari titik waktu tertentu menggunakan fungsi APPENDS
:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
Mengubah SQL kueri berkelanjutan
Anda tidak dapat memperbarui SQL yang digunakan dalam kueri berkelanjutan saat tugas kueri berkelanjutan berjalan. Anda harus membatalkan tugas kueri berkelanjutan, mengubah SQL, lalu memulai tugas kueri berkelanjutan baru dari titik tempat Anda menghentikan tugas kueri berkelanjutan asli.
Ikuti langkah-langkah berikut untuk mengubah SQL yang digunakan dalam kueri berkelanjutan:
- Lihat detail tugas untuk tugas kueri berkelanjutan yang ingin Anda perbarui, dan catat ID tugasnya.
- Jika memungkinkan, jeda pengumpulan data upstream. Jika tidak dapat melakukannya, Anda mungkin mendapatkan beberapa duplikasi data saat kueri berkelanjutan dimulai ulang.
- Batalkan kueri berkelanjutan yang ingin Anda ubah.
Dapatkan nilai
end_time
untuk tugas kueri berkelanjutan asli menggunakan tampilanJOBS
INFORMATION_SCHEMA
:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Ganti kode berikut:
PROJECT_ID
: project ID Anda.REGION
: region yang digunakan oleh project Anda.JOB_ID
: ID tugas kueri berkelanjutan yang Anda identifikasi di Langkah 1.
Ubah pernyataan SQL kueri berkelanjutan untuk memulai kueri berkelanjutan dari titik waktu tertentu, menggunakan nilai
end_time
yang Anda ambil di Langkah 5 sebagai titik awal.Ubah pernyataan SQL kueri berkelanjutan untuk mencerminkan perubahan yang Anda perlukan.
Jalankan kueri berkelanjutan yang diubah.
Membatalkan kueri berkelanjutan
Anda dapat membatalkan tugas kueri berkelanjutan seperti tugas lainnya. Mungkin perlu waktu hingga satu menit agar kueri berhenti berjalan setelah tugas dibatalkan.