Membuat kueri berkelanjutan

Dokumen ini menjelaskan cara menjalankan layanan continuous di BigQuery.

Kueri berkelanjutan BigQuery adalah pernyataan SQL yang dijalankan secara terus-menerus. Kueri berkelanjutan memungkinkan Anda menganalisis data yang masuk dengan BigQuery secara real time, lalu ekspor hasilnya ke Bigtable atau Pub/Sub, atau menulis hasilnya ke BigQuery.

Pilih jenis akun

Anda dapat membuat dan menjalankan tugas kueri berkelanjutan menggunakan akun pengguna, atau Anda dapat membuat tugas kueri berkelanjutan menggunakan akun pengguna, lalu menjalankannya menggunakan akun layanan. Anda harus menggunakan akun layanan untuk menjalankan kueri berkelanjutan yang mengekspor hasil ke topik Pub/Sub.

Saat Anda menggunakan akun pengguna, kueri berkelanjutan akan berjalan selama dua hari. Saat Anda menggunakan akun layanan, kueri berkelanjutan akan berjalan hingga dibatalkan secara eksplisit. Untuk informasi selengkapnya, lihat Otorisasi.

Izin yang diperlukan

Bagian ini menjelaskan izin yang Anda perlukan untuk membuat dan menjalankan kueri berkelanjutan. Sebagai alternatif peran Identity and Access Management (IAM) menyebutkan, Anda bisa mendapatkan izin yang diperlukan melalui peran khusus.

Izin saat menggunakan akun pengguna

Bagian ini memberikan informasi tentang peran dan izin yang diperlukan membuat dan menjalankan kueri berkelanjutan menggunakan akun pengguna.

Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create. Setiap peran IAM berikut memberikan izin bigquery.jobs.create:

Untuk mengekspor data dari tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.export. Setiap peran IAM berikut memberikan bigquery.tables.export izin akses:

Untuk memperbarui data di tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.updateData. Setiap peran IAM berikut memberikan bigquery.tables.updateData izin akses:

Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kueri berkelanjutan, akun pengguna harus memiliki Admin Penggunaan Layanan (roles/serviceusage.serviceUsageAdmin) peran.

Izin saat menggunakan akun layanan

Bagian ini memberikan informasi tentang peran dan izin yang diperlukan oleh akun pengguna yang membuat kueri berkelanjutan, dan akun layanan yang menjalankan kueri berkelanjutan.

Izin akun pengguna

Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create. Tiap-tiap hal berikut Peran IAM memberikan izin bigquery.jobs.create:

Untuk mengirimkan tugas yang berjalan menggunakan akun layanan, akun pengguna harus memiliki peran Service Account User (roles/iam.serviceAccountUser). Jika Anda menggunakan akun pengguna yang sama untuk membuat akun layanan, maka akun pengguna harus memiliki Admin Akun Layanan (roles/iam.serviceAccountAdmin) peran. Untuk informasi tentang cara membatasi akses pengguna ke satu akun layanan, bukan ke semua akun layanan dalam sebuah project, lihat Berikan satu peran.

Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kueri berkelanjutan, akun pengguna harus memiliki Admin Penggunaan Layanan (roles/serviceusage.serviceUsageAdmin) peran.

Izin akun layanan

Untuk mengekspor data dari tabel BigQuery, akun layanan harus memiliki izin IAM bigquery.tables.export. Setiap peran IAM berikut memberikan izin bigquery.tables.export:

Untuk memperbarui data di tabel BigQuery, akun layanan harus memiliki izin IAM bigquery.tables.updateData. Setiap peran IAM berikut memberikan bigquery.tables.updateData izin akses:

Sebelum memulai

  1. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  2. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  3. Aktifkan API BigQuery.

    Mengaktifkan API

Membuat pemesanan

Buat pemesanan edisi Enterprise atau Enterprise Plus, lalu buat penetapan pemesanan dengan jenis tugas CONTINUOUS.

Ekspor ke Pub/Sub

API tambahan, izin IAM, dan resource Google Cloud diperlukan untuk mengekspor data ke Pub/Sub. Untuk mengetahui informasi selengkapnya, lihat artikel Mengekspor ke Pub/Sub.

Menyematkan atribut khusus sebagai metadata dalam pesan Pub/Sub

Anda dapat menggunakan atribut Pub/Sub untuk memberikan informasi tambahan tentang pesan, seperti prioritas, asal, tujuan, atau metadata tambahan. Anda juga dapat menggunakan atribut untuk memfilter pesan pada langganan.

Dalam hasil kueri berkelanjutan, jika kolom diberi nama _ATTRIBUTES, nilainya akan disalin ke atribut pesan Pub/Sub. Kolom yang disediakan dalam _ATTRIBUTES digunakan sebagai kunci atribut.

Kolom _ATTRIBUTES harus berjenis JSON, dalam format ARRAY<STRUCT<STRING, STRING>> atau STRUCT<STRING>.

Untuk contoh, lihat mengekspor data ke topik Pub/Sub.

Mengekspor ke Bigtable

API tambahan, izin IAM, dan resource Google Cloud diperlukan untuk mengekspor data ke Bigtable. Untuk selengkapnya informasi, lihat Ekspor ke Bigtable.

Menulis data ke tabel BigQuery

Anda dapat menulis data ke tabel BigQuery menggunakan Pernyataan INSERT.

Menggunakan fungsi AI

API tambahan, izin IAM, dan Google Cloud resource diperlukan untuk menggunakan didukung AI generatif dalam kueri berkelanjutan. Untuk informasi selengkapnya, lihat salah satu berdasarkan kasus penggunaan Anda:

Saat Anda menggunakan fungsi AI dalam kueri berkelanjutan, pertimbangkan apakah output kueri akan tetap berada dalam kuota untuk fungsi tersebut. Jika melebihi kuota, Anda mungkin harus menangani secara terpisah catatan yang tidak akan diproses.

Menjalankan kueri berkelanjutan menggunakan akun pengguna

Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun pengguna. Setelah kueri berkelanjutan berjalan, Anda dapat menutup konsol Google Cloud, terminal, atau aplikasi tanpa mengganggu eksekusi kueri.

Ikuti langkah-langkah berikut untuk menjalankan kueri berkelanjutan:

Konsol

  1. Di konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, klik More.

  3. Di bagian Choose query mode, pilih Continuous query.

  4. Klik Confirm.

  5. Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

  6. Klik Run.

bq

  1. Di konsol Google Cloud, aktifkan Cloud Shell.

    Aktifkan Cloud Shell

    Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.

  2. Di Cloud Shell, jalankan kueri berkelanjutan menggunakan perintah bq query dengan flag --continuous:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    Ganti QUERY dengan pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

API

Jalankan kueri berkelanjutan dengan memanggil metode jobs.insert. Anda harus menetapkan kolom continuous ke true di JobConfigurationQuery dari resource Job yang Anda teruskan.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Ganti kode berikut:

  • PROJECT_ID: project ID Anda.
  • QUERY: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

Jalankan kueri berkelanjutan menggunakan akun layanan

Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun layanan. Setelah kueri berkelanjutan berjalan, Anda dapat menutup konsol Google Cloud, terminal, atau aplikasi tanpa mengganggu eksekusi kueri.

Ikuti langkah-langkah berikut untuk menggunakan akun layanan guna menjalankan kueri berkelanjutan:

Konsol

  1. Membuat akun layanan
  2. Berikan izin yang diperlukan ke akun layanan.
  3. Di Konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  4. Di editor kueri, klik More.

  5. Di bagian Choose query mode, pilih Continuous query.

  6. Klik Confirm.

  7. Di editor kueri, klik Lainnya > Setelan kueri.

  8. Di bagian Kueri berkelanjutan, gunakan kotak Akun layanan untuk memilih akun layanan yang dibuat.

  9. Klik Simpan.

  10. Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

  11. Klik Run.

bq

  1. Membuat akun layanan
  2. Berikan hal yang diperlukan izin ke akun layanan.
  3. Di konsol Google Cloud, aktifkan Cloud Shell.

    Aktifkan Cloud Shell

    Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.

  4. Di command line, jalankan kueri berkelanjutan menggunakan perintah bq query dengan flag berikut:

    • Tetapkan flag --continuous ke true untuk membuat kueri berkelanjutan.
    • Gunakan tanda --connection_property untuk menentukan akun layanan yang akan digunakan.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • SERVICE_ACCOUNT_EMAIL: layanan email akun Anda. Anda bisa mendapatkan email akun layanan dari halaman Akun layanan di konsol Google Cloud.
    • QUERY: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

API

  1. Membuat akun layanan
  2. Berikan izin yang diperlukan ke akun layanan.
  3. Jalankan kueri berkelanjutan dengan memanggil metode Metode jobs.insert. Tetapkan kolom berikut di resource JobConfigurationQuery dari resource Job yang Anda teruskan:

    • Tetapkan kolom continuous ke true untuk membuat kueri berkelanjutan.
    • Gunakan kolom connection_property untuk menentukan akun layanan yang akan digunakan.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • QUERY: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
    • SERVICE_ACCOUNT_EMAIL: email akun layanan. Anda bisa mendapatkan email akun layanan di Halaman Akun layanan konsol Google Cloud.

Contoh

Contoh SQL berikut menunjukkan kasus penggunaan umum untuk kueri berkelanjutan.

Mengekspor data ke topik Pub/Sub

Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari Tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan memublikasikan data ke topik Pub/Sub secara real time dengan atribut pesan:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Mengekspor data ke tabel Bigtable

Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan mengekspor data ke tabel Bigtable secara real time:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Menulis data ke tabel BigQuery

Contoh berikut menunjukkan kueri berkelanjutan yang memfilter dan mengubah data dari tabel BigQuery yang menerima layanan taksi online informasi, dan kemudian menulis data ke tabel BigQuery lainnya secara real-time. Hal ini membuat data tersedia untuk analisis downstream lebih lanjut.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Memproses data menggunakan model Vertex AI

Contoh berikut menunjukkan kueri berkelanjutan yang menggunakan Model Vertex AI untuk membuat iklan bagi penumpang taksi berdasarkan lintang dan bujurnya saat ini, lalu mengekspor hasilnya ke topik Pub/Sub secara real time:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Memulai kueri berkelanjutan dari titik waktu tertentu

Saat Anda memulai kueri berkelanjutan, semua baris dalam tabel akan diproses yang Anda pilih, dan kemudian memproses baris baru saat mereka masuk. Jika yang ingin dilewati pemrosesan beberapa atau semua data yang ada, Anda dapat menggunakan APPENDS fungsi riwayat perubahan untuk mulai memproses dari titik waktu tertentu.

Contoh berikut menunjukkan cara memulai kueri berkelanjutan dari titik waktu tertentu menggunakan fungsi APPENDS:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

Mengubah SQL kueri berkelanjutan

Anda tidak dapat memperbarui SQL yang digunakan dalam kueri berkelanjutan saat kueri berkelanjutan tugas berjalan. Anda harus membatalkan tugas kueri berkelanjutan, memodifikasi SQL, lalu memulai tugas kueri berkelanjutan baru dari titik di mana Anda berhenti tugas kueri berkelanjutan asli.

Ikuti langkah-langkah berikut untuk mengubah SQL yang digunakan dalam kueri berkelanjutan:

  1. Lihat detail tugas untuk tugas kueri berkelanjutan yang ingin Anda perbarui, dan catat ID tugasnya.
  2. Jika memungkinkan, jeda pengumpulan data upstream. Jika tidak dapat melakukannya, Anda mungkin mendapatkan beberapa duplikasi data saat kueri berkelanjutan dimulai ulang.
  3. Batalkan kueri berkelanjutan yang ingin Anda ubah.
  4. Dapatkan nilai end_time untuk tugas kueri berkelanjutan asli menggunakan tampilan JOBS INFORMATION_SCHEMA:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • REGION: region yang digunakan oleh project Anda.
    • JOB_ID: ID tugas kueri berkelanjutan yang Anda identifikasi di Langkah 1.
  5. Ubah pernyataan SQL kueri berkelanjutan untuk memulai kueri berkelanjutan dari titik waktu tertentu, menggunakan nilai end_time yang Anda ambil di Langkah 5 sebagai titik awal.

  6. Ubah pernyataan SQL kueri berkelanjutan untuk mencerminkan perubahan yang Anda butuhkan.

  7. Jalankan kueri berkelanjutan yang telah diubah.

Membatalkan kueri berkelanjutan

Anda dapat membatalkan tugas kueri berkelanjutan seperti tugas lainnya. Mungkin perlu waktu hingga satu menit agar kueri berhenti berjalan setelah tugas dibatalkan.

Langkah selanjutnya