Membuat kueri berkelanjutan
Dokumen ini menjelaskan cara menjalankan layanan continuous di BigQuery.
Kueri berkelanjutan BigQuery adalah pernyataan SQL yang dijalankan secara terus-menerus. Kueri berkelanjutan memungkinkan Anda menganalisis data yang masuk dengan BigQuery secara real time, lalu ekspor hasilnya ke Bigtable atau Pub/Sub, atau menulis hasilnya ke BigQuery.
Pilih jenis akun
Anda dapat membuat dan menjalankan tugas kueri berkelanjutan menggunakan akun pengguna, atau Anda dapat membuat tugas kueri berkelanjutan menggunakan akun pengguna, lalu menjalankannya menggunakan akun layanan. Anda harus menggunakan akun layanan untuk menjalankan kueri berkelanjutan yang mengekspor hasil ke topik Pub/Sub.
Saat Anda menggunakan akun pengguna, kueri berkelanjutan akan berjalan selama dua hari. Saat Anda menggunakan akun layanan, kueri berkelanjutan akan berjalan hingga dibatalkan secara eksplisit. Untuk informasi selengkapnya, lihat Otorisasi.
Izin yang diperlukan
Bagian ini menjelaskan izin yang Anda perlukan untuk membuat dan menjalankan kueri berkelanjutan. Sebagai alternatif peran Identity and Access Management (IAM) menyebutkan, Anda bisa mendapatkan izin yang diperlukan melalui peran khusus.
Izin saat menggunakan akun pengguna
Bagian ini memberikan informasi tentang peran dan izin yang diperlukan membuat dan menjalankan kueri berkelanjutan menggunakan akun pengguna.
Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create
. Setiap peran IAM berikut memberikan izin bigquery.jobs.create
:
- Pengguna BigQuery (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - Admin BigQuery (
roles/bigquery.admin
)
Untuk mengekspor data dari tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.export
. Setiap
peran IAM berikut memberikan bigquery.tables.export
izin akses:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - Editor Data BigQuery (
roles/bigquery.dataEditor
) - Pemilik Data BigQuery (
roles/bigquery.dataOwner
) - Admin BigQuery (
roles/bigquery.admin
)
Untuk memperbarui data di tabel BigQuery, akun pengguna harus memiliki
izin IAM bigquery.tables.updateData
. Setiap
peran IAM berikut memberikan bigquery.tables.updateData
izin akses:
- Editor Data BigQuery (
roles/bigquery.dataEditor
) - Pemilik Data BigQuery (
roles/bigquery.dataOwner
) - Admin BigQuery (
roles/bigquery.admin
)
Jika akun pengguna harus mengaktifkan API yang diperlukan untuk
kueri berkelanjutan, akun pengguna harus memiliki
Admin Penggunaan Layanan (roles/serviceusage.serviceUsageAdmin
)
peran.
Izin saat menggunakan akun layanan
Bagian ini memberikan informasi tentang peran dan izin yang diperlukan oleh akun pengguna yang membuat kueri berkelanjutan, dan akun layanan yang menjalankan kueri berkelanjutan.
Izin akun pengguna
Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create
. Tiap-tiap hal berikut
Peran IAM memberikan izin bigquery.jobs.create
:
- Pengguna BigQuery (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
Untuk mengirimkan tugas yang berjalan menggunakan akun layanan, akun pengguna harus memiliki peran Service Account User (roles/iam.serviceAccountUser
). Jika Anda menggunakan akun pengguna yang
sama untuk membuat akun layanan,
maka akun pengguna harus memiliki
Admin Akun Layanan (roles/iam.serviceAccountAdmin
)
peran. Untuk informasi tentang cara membatasi akses pengguna ke satu akun layanan,
bukan ke semua akun layanan dalam sebuah project, lihat
Berikan satu peran.
Jika akun pengguna harus mengaktifkan API yang diperlukan untuk
kueri berkelanjutan, akun pengguna harus memiliki
Admin Penggunaan Layanan (roles/serviceusage.serviceUsageAdmin
)
peran.
Izin akun layanan
Untuk mengekspor data dari tabel BigQuery, akun layanan harus
memiliki izin IAM bigquery.tables.export
. Setiap peran IAM berikut memberikan izin bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - Editor Data BigQuery (
roles/bigquery.dataEditor
) - Pemilik Data BigQuery (
roles/bigquery.dataOwner
) - Admin BigQuery (
roles/bigquery.admin
)
bigquery.tables.updateData
. Setiap
peran IAM berikut memberikan bigquery.tables.updateData
izin akses:
- Editor Data BigQuery (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
Sebelum memulai
-
Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.
-
Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.
-
Aktifkan API BigQuery.
Membuat pemesanan
Buat pemesanan edisi Enterprise atau Enterprise Plus,
lalu
buat penetapan pemesanan
dengan jenis tugas CONTINUOUS
.
Ekspor ke Pub/Sub
API tambahan, izin IAM, dan resource Google Cloud diperlukan untuk mengekspor data ke Pub/Sub. Untuk mengetahui informasi selengkapnya, lihat artikel Mengekspor ke Pub/Sub.
Menyematkan atribut khusus sebagai metadata dalam pesan Pub/Sub
Anda dapat menggunakan atribut Pub/Sub untuk memberikan informasi tambahan tentang pesan, seperti prioritas, asal, tujuan, atau metadata tambahan. Anda juga dapat menggunakan atribut untuk memfilter pesan pada langganan.
Dalam hasil kueri berkelanjutan, jika kolom diberi nama _ATTRIBUTES
, nilainya akan disalin ke atribut pesan Pub/Sub.
Kolom yang disediakan dalam _ATTRIBUTES
digunakan sebagai kunci atribut.
Kolom _ATTRIBUTES
harus berjenis JSON
, dalam format
ARRAY<STRUCT<STRING, STRING>>
atau STRUCT<STRING>
.
Untuk contoh, lihat mengekspor data ke topik Pub/Sub.
Mengekspor ke Bigtable
API tambahan, izin IAM, dan resource Google Cloud diperlukan untuk mengekspor data ke Bigtable. Untuk selengkapnya informasi, lihat Ekspor ke Bigtable.
Menulis data ke tabel BigQuery
Anda dapat menulis data ke tabel BigQuery menggunakan
Pernyataan INSERT
.
Menggunakan fungsi AI
API tambahan, izin IAM, dan Google Cloud resource diperlukan untuk menggunakan didukung AI generatif dalam kueri berkelanjutan. Untuk informasi selengkapnya, lihat salah satu berdasarkan kasus penggunaan Anda:
- Membuat teks menggunakan fungsi
ML.GENERATE_TEXT
- Membuat penyematan teks menggunakan fungsi
ML.GENERATE_EMBEDDING
- Memahami teks dengan fungsi
ML.UNDERSTAND_TEXT
- Menerjemahkan teks dengan fungsi
ML.TRANSLATE
Saat Anda menggunakan fungsi AI dalam kueri berkelanjutan, pertimbangkan apakah output kueri akan tetap berada dalam kuota untuk fungsi tersebut. Jika melebihi kuota, Anda mungkin harus menangani secara terpisah catatan yang tidak akan diproses.
Menjalankan kueri berkelanjutan menggunakan akun pengguna
Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun pengguna. Setelah kueri berkelanjutan berjalan, Anda dapat menutup konsol Google Cloud, terminal, atau aplikasi tanpa mengganggu eksekusi kueri.
Ikuti langkah-langkah berikut untuk menjalankan kueri berkelanjutan:
Konsol
Di konsol Google Cloud, buka halaman BigQuery.
Di editor kueri, klik More.
Di bagian Choose query mode, pilih Continuous query.
Klik Confirm.
Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
Klik Run.
bq
-
Di konsol Google Cloud, aktifkan Cloud Shell.
Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.
Di Cloud Shell, jalankan kueri berkelanjutan menggunakan perintah
bq query
dengan flag--continuous
:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Ganti
QUERY
dengan pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
API
Jalankan kueri berkelanjutan dengan memanggil
metode jobs.insert
.
Anda harus menetapkan kolom continuous
ke true
di
JobConfigurationQuery
dari resource Job
yang Anda teruskan.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
Ganti kode berikut:
PROJECT_ID
: project ID Anda.QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
Jalankan kueri berkelanjutan menggunakan akun layanan
Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun layanan. Setelah kueri berkelanjutan berjalan, Anda dapat menutup konsol Google Cloud, terminal, atau aplikasi tanpa mengganggu eksekusi kueri.
Ikuti langkah-langkah berikut untuk menggunakan akun layanan guna menjalankan kueri berkelanjutan:
Konsol
- Membuat akun layanan
- Berikan izin yang diperlukan ke akun layanan.
Di Konsol Google Cloud, buka halaman BigQuery.
Di editor kueri, klik More.
Di bagian Choose query mode, pilih Continuous query.
Klik Confirm.
Di editor kueri, klik Lainnya > Setelan kueri.
Di bagian Kueri berkelanjutan, gunakan kotak Akun layanan untuk memilih akun layanan yang dibuat.
Klik Simpan.
Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
Klik Run.
bq
- Membuat akun layanan
- Berikan hal yang diperlukan izin ke akun layanan.
-
Di konsol Google Cloud, aktifkan Cloud Shell.
Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.
Di command line, jalankan kueri berkelanjutan menggunakan perintah
bq query
dengan flag berikut:- Tetapkan flag
--continuous
ketrue
untuk membuat kueri berkelanjutan. - Gunakan tanda
--connection_property
untuk menentukan akun layanan yang akan digunakan.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Ganti kode berikut:
PROJECT_ID
: project ID Anda.SERVICE_ACCOUNT_EMAIL
: layanan email akun Anda. Anda bisa mendapatkan email akun layanan dari halaman Akun layanan di konsol Google Cloud.QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
- Tetapkan flag
API
- Membuat akun layanan
- Berikan izin yang diperlukan ke akun layanan.
Jalankan kueri berkelanjutan dengan memanggil metode Metode
jobs.insert
. Tetapkan kolom berikut di resourceJobConfigurationQuery
dari resourceJob
yang Anda teruskan:- Tetapkan kolom
continuous
ketrue
untuk membuat kueri berkelanjutan. - Gunakan kolom
connection_property
untuk menentukan akun layanan yang akan digunakan.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
Ganti kode berikut:
PROJECT_ID
: project ID Anda.QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.SERVICE_ACCOUNT_EMAIL
: email akun layanan. Anda bisa mendapatkan email akun layanan di Halaman Akun layanan konsol Google Cloud.
- Tetapkan kolom
Contoh
Contoh SQL berikut menunjukkan kasus penggunaan umum untuk kueri berkelanjutan.
Mengekspor data ke topik Pub/Sub
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari Tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan memublikasikan data ke topik Pub/Sub secara real time dengan atribut pesan:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Mengekspor data ke tabel Bigtable
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan mengekspor data ke tabel Bigtable secara real time:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
Menulis data ke tabel BigQuery
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter dan mengubah data dari tabel BigQuery yang menerima layanan taksi online informasi, dan kemudian menulis data ke tabel BigQuery lainnya secara real-time. Hal ini membuat data tersedia untuk analisis downstream lebih lanjut.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Memproses data menggunakan model Vertex AI
Contoh berikut menunjukkan kueri berkelanjutan yang menggunakan Model Vertex AI untuk membuat iklan bagi penumpang taksi berdasarkan lintang dan bujurnya saat ini, lalu mengekspor hasilnya ke topik Pub/Sub secara real time:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Memulai kueri berkelanjutan dari titik waktu tertentu
Saat Anda memulai kueri berkelanjutan, semua baris dalam tabel akan diproses
yang Anda pilih, dan kemudian
memproses baris baru saat mereka masuk. Jika
yang ingin dilewati pemrosesan beberapa atau semua data yang ada, Anda dapat menggunakan
APPENDS
fungsi riwayat perubahan untuk mulai
memproses dari titik waktu tertentu.
Contoh berikut menunjukkan cara memulai kueri berkelanjutan dari titik waktu tertentu menggunakan fungsi APPENDS
:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
Mengubah SQL kueri berkelanjutan
Anda tidak dapat memperbarui SQL yang digunakan dalam kueri berkelanjutan saat kueri berkelanjutan tugas berjalan. Anda harus membatalkan tugas kueri berkelanjutan, memodifikasi SQL, lalu memulai tugas kueri berkelanjutan baru dari titik di mana Anda berhenti tugas kueri berkelanjutan asli.
Ikuti langkah-langkah berikut untuk mengubah SQL yang digunakan dalam kueri berkelanjutan:
- Lihat detail tugas untuk tugas kueri berkelanjutan yang ingin Anda perbarui, dan catat ID tugasnya.
- Jika memungkinkan, jeda pengumpulan data upstream. Jika tidak dapat melakukannya, Anda mungkin mendapatkan beberapa duplikasi data saat kueri berkelanjutan dimulai ulang.
- Batalkan kueri berkelanjutan yang ingin Anda ubah.
Dapatkan nilai
end_time
untuk tugas kueri berkelanjutan asli menggunakan tampilanJOBS
INFORMATION_SCHEMA
:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Ganti kode berikut:
PROJECT_ID
: project ID Anda.REGION
: region yang digunakan oleh project Anda.JOB_ID
: ID tugas kueri berkelanjutan yang Anda identifikasi di Langkah 1.
Ubah pernyataan SQL kueri berkelanjutan untuk memulai kueri berkelanjutan dari titik waktu tertentu, menggunakan nilai
end_time
yang Anda ambil di Langkah 5 sebagai titik awal.Ubah pernyataan SQL kueri berkelanjutan untuk mencerminkan perubahan yang Anda butuhkan.
Jalankan kueri berkelanjutan yang telah diubah.
Membatalkan kueri berkelanjutan
Anda dapat membatalkan tugas kueri berkelanjutan seperti tugas lainnya. Mungkin perlu waktu hingga satu menit agar kueri berhenti berjalan setelah tugas dibatalkan.