Membuat kueri berkelanjutan

Dokumen ini menjelaskan cara menjalankan layanan continuous di BigQuery.

Kueri berkelanjutan BigQuery adalah pernyataan SQL yang dijalankan secara terus-menerus. Kueri berkelanjutan memungkinkan Anda menganalisis data yang masuk dengan BigQuery secara real time, lalu ekspor hasilnya ke Bigtable atau Pub/Sub, atau menulis hasilnya ke BigQuery.

Pilih jenis akun

Anda dapat membuat dan menjalankan tugas kueri berkelanjutan dengan menggunakan akun pengguna, atau Anda dapat membuat tugas kueri berkelanjutan dengan menggunakan akun pengguna menggunakan akun layanan. Anda harus menggunakan akun layanan untuk menjalankan kueri berkelanjutan yang mengekspor hasil ke topik Pub/Sub.

Saat Anda menggunakan akun pengguna, kueri berkelanjutan yang berjalan selama dua hari. Saat Anda menggunakan akun layanan, kueri berkelanjutan akan berjalan hingga dibatalkan secara eksplisit. Untuk informasi selengkapnya, lihat Otorisasi.

Izin yang diperlukan

Bagian ini menjelaskan izin yang Anda perlukan untuk membuat dan menjalankan kueri berkelanjutan. Sebagai alternatif peran Identity and Access Management (IAM) menyebutkan, Anda bisa mendapatkan izin yang diperlukan melalui peran khusus.

Izin saat menggunakan akun pengguna

Bagian ini memberikan informasi tentang peran dan izin yang diperlukan membuat dan menjalankan kueri berkelanjutan menggunakan akun pengguna.

Untuk membuat tugas di BigQuery, akun pengguna harus memiliki Izin IAM bigquery.jobs.create. Setiap peran IAM berikut memberikan izin bigquery.jobs.create:

Untuk mengekspor data dari tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.export . Setiap peran IAM berikut memberikan bigquery.tables.export izin akses:

Untuk memperbarui data di tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.updateData. Setiap peran IAM berikut memberikan bigquery.tables.updateData izin akses:

Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kueri berkelanjutan, akun pengguna harus memiliki Admin Penggunaan Layanan (roles/serviceusage.serviceUsageAdmin) peran.

Izin saat menggunakan akun layanan

Bagian ini memberikan informasi tentang peran dan izin yang diperlukan oleh akun pengguna yang membuat kueri berkelanjutan, dan akun layanan yang menjalankan kueri berkelanjutan.

Izin akun pengguna

Untuk membuat tugas di BigQuery, akun pengguna harus memiliki Izin IAM bigquery.jobs.create. Tiap-tiap hal berikut Peran IAM memberikan izin bigquery.jobs.create:

Untuk mengirimkan tugas yang dijalankan menggunakan akun layanan, akun pengguna harus memiliki Pengguna Akun Layanan (roles/iam.serviceAccountUser) peran. Jika Anda menggunakan akun pengguna yang sama untuk membuat akun layanan, maka akun pengguna harus memiliki Admin Akun Layanan (roles/iam.serviceAccountAdmin) peran. Untuk informasi tentang cara membatasi akses pengguna ke satu akun layanan, bukan ke semua akun layanan dalam sebuah project, lihat Berikan satu peran.

Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kueri berkelanjutan, akun pengguna harus memiliki Admin Penggunaan Layanan (roles/serviceusage.serviceUsageAdmin) peran.

Izin akun layanan

Untuk mengekspor data dari tabel BigQuery, akun layanan harus memiliki izin IAM bigquery.tables.export. Setiap peran IAM berikut memberikan bigquery.tables.export izin akses:

Untuk memperbarui data di tabel BigQuery, akun layanan harus memiliki izin IAM bigquery.tables.updateData. Setiap peran IAM berikut memberikan bigquery.tables.updateData izin akses:

Sebelum memulai

  1. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  2. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  3. Enable the BigQuery API.

    Enable the API

Membuat pemesanan

Membuat reservasi edisi Enterprise atau Enterprise Plus, Lalu membuat tugas reservasi dengan jenis tugas CONTINUOUS.

Ekspor ke Pub/Sub

API tambahan, izin IAM, dan resource Google Cloud yang diperlukan untuk mengekspor data ke Pub/Sub. Untuk informasi selengkapnya, lihat Ekspor ke Pub/Sub.

Mengekspor ke Bigtable

API tambahan, izin IAM, dan Google Cloud resource diperlukan untuk mengekspor data ke Bigtable. Untuk selengkapnya informasi, lihat Ekspor ke Bigtable.

Menulis data ke tabel BigQuery

Anda dapat menulis data ke tabel BigQuery menggunakan Pernyataan INSERT.

Menggunakan fungsi AI

API tambahan, izin IAM, dan Google Cloud resource diperlukan untuk menggunakan didukung AI generatif dalam kueri berkelanjutan. Untuk informasi selengkapnya, lihat salah satu berdasarkan kasus penggunaan Anda:

Saat Anda menggunakan fungsi AI dalam kueri berkelanjutan, pertimbangkan apakah kueri tersebut output akan tetap berada dalam kuota untuk fungsi. Jika melebihi kuota, Anda mungkin harus menangani secara terpisah catatan yang tidak akan diproses.

Jalankan kueri berkelanjutan menggunakan akun pengguna

Bagian ini menjelaskan cara menjalankan kueri berkelanjutan dengan menggunakan akun pengguna. Setelah kueri berkelanjutan berjalan, Anda dapat menutup konsol Google Cloud, terminal, atau aplikasi tanpa mengganggu eksekusi kueri.

Ikuti langkah-langkah berikut untuk menjalankan kueri berkelanjutan:

Konsol

  1. Di konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, klik Lainnya.

  3. Di bagian Choose query mode, pilih Continuous query.

  4. Klik Confirm.

  5. Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

  6. Klik Run.

bq

  1. Di konsol Google Cloud, aktifkan Cloud Shell.

    Aktifkan Cloud Shell

    Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.

  2. Di Cloud Shell, jalankan kueri berkelanjutan dengan menggunakan Perintah bq query dengan tanda --continuous:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'
    

    Ganti QUERY dengan pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

API

Jalankan kueri berkelanjutan dengan memanggil metode Metode jobs.insert. Anda harus menetapkan kolom continuous ke true di JobConfigurationQuery dari resource Job yang Anda teruskan.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Ganti kode berikut:

  • PROJECT_ID: project ID Anda.
  • QUERY: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

Jalankan kueri berkelanjutan menggunakan akun layanan

Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun layanan. Setelah kueri berkelanjutan berjalan, Anda dapat menutup konsol Google Cloud, terminal, atau aplikasi tanpa mengganggu eksekusi kueri.

Ikuti langkah-langkah berikut untuk menggunakan akun layanan guna menjalankan kueri berkelanjutan:

Konsol

  1. Membuat akun layanan
  2. Berikan hal yang diperlukan izin ke akun layanan.
  3. Di Konsol Google Cloud, buka halaman BigQuery.

    Buka BigQuery

  4. Di editor kueri, klik Lainnya.

  5. Di bagian Choose query mode, pilih Continuous query.

  6. Klik Confirm.

  7. Di editor kueri, klik Lainnya > Setelan kueri.

  8. Di bagian Kueri berkelanjutan, gunakan kotak Akun layanan untuk memilih akun layanan yang dibuat.

  9. Klik Simpan.

  10. Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

  11. Klik Run.

bq

  1. Membuat akun layanan
  2. Berikan hal yang diperlukan izin ke akun layanan.
  3. Di konsol Google Cloud, aktifkan Cloud Shell.

    Aktifkan Cloud Shell

    Di bagian bawah Google Cloud Console, Cloud Shell sesi akan terbuka dan menampilkan perintah command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi sesi.

  4. Pada command line, jalankan kueri berkelanjutan menggunakan Perintah bq query dengan flag berikut:

    • Tetapkan tanda --continuous ke true untuk membuat kueri berkelanjutan.
    • Gunakan tanda --connection_property untuk menentukan akun layanan yang akan digunakan.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'
    

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • SERVICE_ACCOUNT_EMAIL: layanan email akun Anda. Anda bisa mendapatkan email akun layanan dari Halaman Akun layanan konsol Google Cloud.
    • QUERY: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.

API

  1. Membuat akun layanan
  2. Berikan hal yang diperlukan izin ke akun layanan.
  3. Jalankan kueri berkelanjutan dengan memanggil metode Metode jobs.insert. Tetapkan kolom berikut di kolom Referensi JobConfigurationQuery dari resource Job yang Anda teruskan:

    • Tetapkan kolom continuous ke true untuk membuat kueri berkelanjutan.
    • Gunakan kolom connection_property untuk menentukan akun layanan yang akan digunakan.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed
    

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • QUERY: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
    • SERVICE_ACCOUNT_EMAIL: layanan email akun Anda. Anda bisa mendapatkan email akun layanan di Halaman Akun layanan konsol Google Cloud.

Contoh

Contoh SQL berikut menunjukkan kasus penggunaan umum untuk kueri berkelanjutan.

Mengekspor data ke topik Pub/Sub

Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari Tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan memublikasikan data ke topik Pub/Sub secara real time:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Mengekspor data ke tabel Bigtable

Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari Tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan mengekspor data ke tabel Bigtable secara real time:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Menulis data ke tabel BigQuery

Contoh berikut menunjukkan kueri berkelanjutan yang memfilter dan mengubah data dari tabel BigQuery yang menerima layanan taksi online informasi, dan kemudian menulis data ke tabel BigQuery lainnya secara real-time. Hal ini membuat data tersedia untuk analisis downstream lebih lanjut.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Memproses data menggunakan model Vertex AI

Contoh berikut menunjukkan kueri berkelanjutan yang menggunakan Model Vertex AI untuk membuat iklan bagi penumpang taksi berdasarkan lintang dan bujurnya saat ini, lalu mengekspor hasilnya ke topik Pub/Sub secara real time:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Memulai kueri berkelanjutan dari titik waktu tertentu

Saat Anda memulai kueri berkelanjutan, semua baris dalam tabel akan diproses yang Anda pilih, dan kemudian memproses baris baru saat mereka masuk. Jika Anda ingin melewatkan pemrosesan beberapa atau semua data yang ada, Anda dapat menggunakan APPENDS fungsi riwayat perubahan untuk mulai memproses dari titik waktu tertentu.

Contoh berikut menampilkan cara memulai kueri berkelanjutan dari titik waktu dengan menggunakan fungsi APPENDS:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

Mengubah SQL kueri berkelanjutan

Anda tidak dapat memperbarui SQL yang digunakan dalam kueri berkelanjutan saat kueri berkelanjutan tugas berjalan. Anda harus membatalkan tugas kueri berkelanjutan, memodifikasi SQL, lalu mulai tugas kueri berkelanjutan baru dari titik di mana Anda berhenti tugas kueri berkelanjutan asli.

Ikuti langkah-langkah ini untuk memodifikasi SQL yang digunakan dalam kueri berkelanjutan:

  1. Melihat detail lowongan untuk tugas kueri berkelanjutan yang ingin Anda perbarui, dan catat ID tugas.
  2. Jika memungkinkan, jeda pengumpulan data upstream. Jika Anda tidak dapat melakukan ini, Anda mungkin mendapatkan beberapa duplikasi data ketika kueri berkelanjutan dimulai ulang.
  3. Membatalkan kueri berkelanjutan yang ingin Anda modifikasi.
  4. Dapatkan nilai end_time untuk tugas kueri berkelanjutan asli dengan menggunakan INFORMATION_SCHEMA Tampilan JOBS:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';
    

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • REGION: region yang digunakan oleh project Anda.
    • JOB_ID: ID tugas kueri berkelanjutan yang Anda yang diidentifikasi pada Langkah 1.
  5. Ubah pernyataan SQL kueri berkelanjutan menjadi memulai kueri berkelanjutan dari titik waktu tertentu, menggunakan nilai end_time yang Anda ambil di Langkah 5 sebagai permulaan poin.

  6. Ubah pernyataan SQL kueri berkelanjutan untuk mencerminkan perubahan yang Anda butuhkan.

  7. Jalankan kueri berkelanjutan yang telah diubah.

Membatalkan kueri berkelanjutan

Anda dapat membatalkan kueri berkelanjutan pekerjaan sama seperti pekerjaan lainnya. Kueri mungkin memerlukan waktu hingga satu menit berhenti berjalan setelah tugas dibatalkan.