Menggunakan pipeline data Dataflow

Ringkasan

Anda dapat menggunakan pipeline data Dataflow untuk tugas berikut:

  • Membuat jadwal tugas berulang.
  • Memahami di mana resource dihabiskan selama beberapa eksekusi tugas.
  • Tentukan dan kelola tujuan keaktualan data.
  • Melihat perincian tahap pipeline individual untuk memperbaiki dan mengoptimalkan pipeline Anda.

Untuk dokumentasi API, lihat Referensi Pipeline Data.

Fitur

  • Buat pipeline batch yang berulang untuk menjalankan tugas batch sesuai jadwal.
  • Buat pipeline batch inkremental berulang untuk menjalankan tugas batch terhadap data input versi terbaru.
  • Gunakan kartu skor ringkasan pipeline untuk melihat penggunaan kapasitas gabungan dan konsumsi resource dari pipeline.
  • Melihat keaktualan data pipeline streaming. Metrik ini, yang berkembang seiring waktu, dapat dikaitkan dengan pemberitahuan yang memberi tahu Anda saat keaktualan turun lebih rendah dari tujuan yang ditentukan.
  • Menggunakan grafik metrik pipeline untuk membandingkan tugas pipeline batch dan menemukan anomali.

Batasan

  • Ketersediaan regional: Anda dapat membuat pipeline data di region Cloud Scheduler yang tersedia.

  • Kuota:

    • Jumlah default pipeline per project: 500
    • Jumlah default pipeline per organisasi: 2.500

      Kuota tingkat organisasi dinonaktifkan secara default. Anda dapat memilih ikut serta dalam kuota tingkat organisasi. Jika melakukannya, setiap organisasi dapat memiliki maksimal 2.500 pipeline secara default.

  • Label: Anda tidak dapat menggunakan label yang ditentukan pengguna untuk memberi label pada pipeline data Dataflow. Namun, saat Anda menggunakan kolom additionalUserLabels, nilai tersebut akan diteruskan ke tugas Dataflow Anda. Untuk mengetahui informasi selengkapnya tentang cara label diterapkan ke setiap tugas Dataflow, lihat Opsi pipeline.

Jenis pipeline data

Dataflow memiliki dua jenis pipeline data, streaming dan batch. Kedua jenis tugas yang dijalankan pipeline yang ditentukan dalam template Dataflow.

Streaming data pipeline
Pipeline data streaming menjalankan tugas streaming Dataflow segera setelah dibuat.
Pipeline data batch

Pipeline data batch menjalankan tugas batch Dataflow pada jadwal yang ditentukan pengguna. Nama file input pipeline batch dapat diparameterisasi untuk memungkinkan pemrosesan pipeline batch inkremental.

Pipeline batch inkremental

Anda dapat menggunakan placeholder tanggal dan waktu untuk menentukan format file input inkremental untuk pipeline batch.

  • Placeholder untuk tahun, bulan, tanggal, jam, menit, dan detik dapat digunakan, dan harus mengikuti format strftime(). Placeholder didahului dengan simbol persentase (%).
  • Pemformatan parameter tidak diverifikasi selama pembuatan pipeline.
    • Contoh: Jika Anda menetapkan "gs://bucket/Y" sebagai jalur input berparameter, parameter ini akan dievaluasi sebagai "gs://bucket/Y", karena "Y" tanpa "%" yang mendahului tidak dipetakan ke format strftime().

Pada setiap waktu eksekusi pipeline batch yang dijadwalkan, bagian placeholder dari jalur input dievaluasi ke tanggal waktu saat ini (atau perubahan waktu). Nilai tanggal dievaluasi menggunakan tanggal saat ini dalam zona waktu tugas terjadwal. Jika jalur yang dievaluasi cocok dengan jalur file input, file tersebut akan diambil untuk diproses oleh pipeline batch pada waktu yang dijadwalkan.

  • Contoh: Pipeline batch dijadwalkan untuk diulang di awal setiap PST jam. Jika Anda membuat parameter jalur input sebagai gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, pada 15 April 2021, pukul 18.00 PST, jalur input akan dievaluasi menjadi gs://bucket-name/2021-04-15/prefix-18_00.csv.

Menggunakan parameter pergeseran waktu

Anda dapat menggunakan parameter pergeseran waktu + atau - menit atau jam. Untuk mendukung pencocokan jalur input dengan tanggal dan waktu yang dievaluasi yang digeser sebelum atau setelah tanggal dan waktu saat ini dari jadwal pipeline, apit parameter tersebut dalam tanda kurung kurawal. Gunakan format {[+|-][0-9]+[m|h]}. Pipeline batch akan terus diulang pada waktu yang dijadwalkan, tetapi jalur input dievaluasi dengan offset waktu yang ditentukan.

  • Contoh: Pipeline batch dijadwalkan untuk diulang di awal setiap PST jam. Jika Anda membuat parameter jalur input sebagai gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, pada 15 April 2021, pukul 18.00 PST, jalur input akan dievaluasi menjadi gs://bucket-name/2021-04-15/prefix-16_00.csv.

Peran pipeline data

Agar operasi pipeline data Dataflow berhasil, Anda memerlukan peran IAM yang diperlukan, sebagai berikut:

  1. Anda memerlukan peran yang sesuai untuk melakukan operasi:

  2. Akun layanan yang digunakan oleh Cloud Scheduler harus memiliki peran roles/iam.serviceAccountUser, baik akun layanan ditentukan oleh pengguna maupun akun layanan Compute Engine default. Untuk mengetahui informasi selengkapnya, lihat Peran pipeline data.

  3. Anda harus dapat bertindak sebagai akun layanan yang digunakan oleh Cloud Scheduler dan Dataflow dengan diberi peran roles/iam.serviceAccountUser di akun tersebut. Jika Anda tidak memilih akun layanan untuk Cloud Scheduler dan Dataflow, akun layanan Compute Engine default akan digunakan.

Membuat pipeline data

Anda dapat membuat pipeline data Dataflow dengan dua cara:

  1. Mengimpor tugas, atau
  2. Membuat pipeline data

Halaman penyiapan pipeline data: Saat Anda pertama kali mengakses fitur pipeline Dataflow di Konsol Google Cloud, halaman penyiapan akan terbuka. Aktifkan API yang tercantum untuk membuat pipeline data.

Mengimpor tugas

Anda dapat mengimpor tugas streaming atau batch Dataflow yang didasarkan pada template klasik atau fleksibel dan menjadikannya pipeline data.

  1. Di konsol Google Cloud, buka halaman Jobs Dataflow.

    Buka Tugas

  2. Pilih tugas yang sudah selesai, lalu di halaman Job Details, pilih +Import as a pipeline.

  3. Di halaman Buat pipeline dari template, parameter diisi dengan opsi tugas yang diimpor.

  4. Untuk tugas batch, di bagian Jadwalkan pipeline Anda, berikan jadwal pengulangan. Menyediakan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika tidak ditentukan, akun layanan Compute Engine default akan digunakan.

Membuat pipeline data

  1. Di konsol Google Cloud, buka halaman Data pipelines Dataflow.

    Buka Pipeline data

  2. Pilih +Create data pipeline.

  3. Di halaman Create pipeline from template, berikan nama pipeline, lalu isi kolom pemilihan template dan parameter lainnya.

  4. Untuk tugas batch, di bagian Jadwalkan pipeline Anda, berikan jadwal pengulangan. Menyediakan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika nilai tidak ditentukan, akun layanan Compute Engine default akan digunakan.

Membuat pipeline data batch

Untuk membuat contoh pipeline data batch ini, Anda harus memiliki akses ke resource berikut dalam project Anda:

Contoh pipeline ini menggunakan template pipeline batch Cloud Storage Text to BigQuery. Template ini membaca file dalam format CSV dari Cloud Storage, menjalankan transformasi, lalu menyisipkan nilai ke dalam tabel BigQuery dengan tiga kolom.

  1. Buat file berikut di drive lokal Anda:

    1. File bq_three_column_table.json yang berisi skema tabel BigQuery tujuan berikut.

      {
        "BigQuery Schema": [
          {
            "name": "col1",
            "type": "STRING"
          },
          {
            "name": "col2",
            "type": "STRING"
          },
          {
            "name": "col3",
            "type": "INT64"
          }
        ]
      }
      
    2. File JavaScript split_csv_3cols.js, yang menerapkan transformasi sederhana pada data input sebelum penyisipan ke BigQuery.

      function transform(line) {
          var values = line.split(',');
          var obj = new Object();
          obj.col1 = values[0];
          obj.col2 = values[1];
          obj.col3 = values[2];
          var jsonString = JSON.stringify(obj);
          return jsonString;
      }
      
    3. File CSV file01.csv dengan beberapa catatan yang disisipkan ke dalam tabel BigQuery.

      b8e5087a,74,27531
      7a52c051,4a,25846
      672de80f,cd,76981
      111b92bf,2e,104653
      ff658424,f0,149364
      e6c17c75,84,38840
      833f5a69,8f,76892
      d8c833ff,7d,201386
      7d3da7fb,d5,81919
      3836d29b,70,181524
      ca66e6e5,d7,172076
      c8475eb6,03,247282
      558294df,f3,155392
      737b82a8,c7,235523
      82c8f5dc,35,468039
      57ab17f9,5e,480350
      cbcdaf84,bd,354127
      52b55391,eb,423078
      825b8863,62,88160
      26f16d4f,fd,397783
      
  2. Gunakan perintah gcloud storage cp untuk menyalin file ke folder di bucket Cloud Storage di project Anda, sebagai berikut:

    1. Salin bq_three_column_table.json dan split_csv_3cols.js ke gs://BUCKET_ID/text_to_bigquery/

      gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
      gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
    2. Salin file01.csv ke gs://BUCKET_ID/inputs/

      gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
  3. Di konsol Google Cloud, buka halaman Buckets Cloud Storage.

    Buka Buckets

  4. Untuk membuat folder tmp di bucket Cloud Storage, pilih nama folder Anda untuk membuka halaman detail Bucket, lalu klik Create folder.

    Tombol Buat folder pada halaman detail Bucket.

  5. Di konsol Google Cloud, buka halaman Data pipelines Dataflow.

    Buka Pipeline data

  6. Pilih Buat pipeline data. Masukkan atau pilih item berikut di halaman Create pipeline from template:

    1. Untuk Pipeline name, masukkan text_to_bq_batch_data_pipeline.
    2. Untuk Regional endpoint, pilih region Compute Engine. Region sumber dan tujuan harus cocok. Oleh karena itu, bucket Cloud Storage dan tabel BigQuery harus berada di region yang sama.
    3. Untuk Dataflow template, di Process Data in Bulk (batch), pilih Text Files on Cloud Storage to BigQuery.

    4. Untuk Jadwalkan pipeline Anda, pilih jadwal, seperti Per jam pada menit 25, di zona waktu Anda. Anda dapat mengedit jadwal setelah mengirimkan pipeline. Menyediakan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika tidak ditentukan, akun layanan Compute Engine default akan digunakan.

    5. Di Parameter yang diperlukan, masukkan hal berikut:

      1. Untuk jalur UDF JavaScript di Cloud Storage:
        gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
        
      2. Untuk jalur JSON:
        BUCKET_ID/text_to_bigquery/bq_three_column_table.json
        
      3. Untuk nama UDF JavaScript: transform
      4. Untuk tabel output BigQuery:
        PROJECT_ID:DATASET_ID.three_column_table
        
      5. Untuk Jalur input Cloud Storage:
        BUCKET_ID/inputs/file01.csv
        
      6. Untuk Direktori BigQuery sementara:
        BUCKET_ID/tmp
        
      7. Untuk Lokasi sementara:
        BUCKET_ID/tmp
        
    6. Klik Create pipeline.

  7. Konfirmasi informasi pipeline dan template serta lihat histori saat ini dan sebelumnya dari halaman Detail pipeline.

    Halaman detail pipeline.

Anda juga dapat menjalankan pipeline batch sesuai permintaan menggunakan tombol Run di konsol Dataflow Pipelines.

Membuat contoh pipeline data streaming

Anda dapat membuat contoh pipeline data streaming dengan mengikuti petunjuk contoh pipeline batch, dengan perbedaan berikut:

  • Untuk Jadwal pipeline, jangan tentukan jadwal untuk pipeline data streaming. Tugas streaming Dataflow akan segera dimulai.
  • Untuk Dataflow template, di Process Data Continuously (stream), pilih Text Files on Cloud Storage to BigQuery.
  • Untuk Jenis mesin pekerja, pipeline memproses kumpulan file awal yang cocok dengan pola gs://BUCKET_ID/inputs/file01.csv dan file tambahan apa pun yang cocok dengan pola ini, yang Anda upload ke folder inputs/. Jika ukuran file CSV melebihi beberapa GB, untuk menghindari kemungkinan error kehabisan memori, pilih jenis mesin dengan memori yang lebih tinggi daripada jenis mesin n1-standard-4 default, seperti n1-highmem-8.

Pemecahan masalah

Bagian ini menunjukkan cara menyelesaikan masalah terkait pipeline data Dataflow.

Tugas pipeline data gagal diluncurkan

Saat Anda menggunakan pipeline data untuk membuat jadwal tugas berulang, tugas Dataflow mungkin tidak diluncurkan, dan error status 503 muncul di file log Cloud Scheduler.

Masalah ini terjadi saat Dataflow tidak dapat menjalankan tugas untuk sementara.

Untuk mengatasi masalah ini, konfigurasikan Cloud Scheduler untuk mencoba kembali tugas tersebut. Karena masalah ini bersifat sementara, saat pekerjaan dicoba lagi, pekerjaan tersebut mungkin berhasil. Untuk mengetahui informasi selengkapnya tentang cara menetapkan nilai coba lagi di Cloud Scheduler, lihat Membuat tugas.

Menyelidiki pelanggaran tujuan pipeline

Bagian berikut menjelaskan cara menyelidiki pipeline yang tidak memenuhi tujuan performa.

Pipeline batch berulang

Untuk analisis awal kondisi pipeline Anda, pada halaman Info pipeline di Konsol Google Cloud, gunakan grafik Status tugas individual dan Waktu thread per langkah. Grafik ini terletak di panel status pipeline.

Contoh investigasi:

  1. Anda memiliki pipeline batch berulang yang berjalan setiap jam pada 3 menit setelah satu jam. Setiap tugas biasanya berjalan sekitar 9 menit. Anda memiliki tujuan untuk menyelesaikan semua tugas dalam waktu kurang dari 10 menit.

  2. Grafik status tugas menunjukkan bahwa tugas berjalan selama lebih dari 10 menit.

  3. Di tabel histori Update/Execution, temukan tugas yang dijalankan selama jam yang diminati. Klik halaman detail tugas Dataflow. Di halaman tersebut, temukan tahap yang berjalan lebih lama, lalu lihat log untuk menemukan kemungkinan error guna menentukan penyebab penundaan.

Pipeline streaming

Untuk analisis awal kondisi pipeline Anda, gunakan grafik keaktualan data di halaman Pipeline Details, pada tab Pipeline info. Grafik ini terletak di panel status pipeline.

Contoh investigasi:

  1. Anda memiliki pipeline streaming yang biasanya menghasilkan output dengan keaktualan data selama 20 detik.

  2. Anda menetapkan tujuan untuk mendapatkan jaminan keaktualan data selama 30 detik. Saat meninjau grafik keaktualan data, Anda melihat bahwa antara pukul 09.00 hingga 10.00, keaktualan data melonjak menjadi hampir 40 detik.

    Grafik keaktualan data yang menampilkan peningkatan jumlah menit keaktualan data.

  3. Beralihlah ke tab Pipeline metrics, lalu lihat grafik Pemakaian CPU dan Pemakaian Memori untuk analisis lebih lanjut.