Menggunakan pipeline data Dataflow

Ringkasan

Anda dapat menggunakan pipeline data Dataflow untuk tugas berikut:

  • Membuat jadwal tugas berulang.
  • Pahami tempat resource dibelanjakan selama beberapa eksekusi tugas.
  • Menentukan dan mengelola tujuan keaktualan data.
  • Periksa setiap tahap pipeline untuk memperbaiki dan mengoptimalkan pipeline Anda.

Untuk dokumentasi API, lihat referensi Data Pipeline.

Fitur

  • Buat pipeline batch berulang untuk menjalankan tugas batch sesuai jadwal.
  • Buat pipeline batch inkremental berulang untuk menjalankan tugas batch terhadap versi terbaru data input.
  • Gunakan kartu skor ringkasan pipeline untuk melihat penggunaan kapasitas agregat dan penggunaan resource pipeline.
  • Melihat keaktualan data pipeline streaming. Metrik ini, yang berkembang seiring waktu, dapat dikaitkan dengan pemberitahuan yang memberi tahu Anda saat keaktualan turun lebih rendah dari tujuan yang ditentukan.
  • Gunakan grafik metrik pipeline untuk membandingkan tugas pipeline batch dan menemukan anomali.

Batasan

  • Ketersediaan regional: Anda dapat membuat pipeline data di region Cloud Scheduler yang tersedia.

  • Kuota:

    • Jumlah default pipeline per project: 500
    • Jumlah default pipeline per organisasi: 2.500

      Kuota tingkat organisasi dinonaktifkan secara default. Anda dapat memilih untuk menggunakan kuota tingkat organisasi, dan jika melakukannya, setiap organisasi dapat memiliki maksimal 2.500 pipeline secara default.

  • Label: Anda tidak dapat menggunakan label yang ditentukan pengguna untuk memberi label pada pipeline data Dataflow. Namun, saat Anda menggunakan kolom additionalUserLabels, nilai tersebut akan diteruskan ke tugas Dataflow Anda. Untuk informasi selengkapnya tentang cara label diterapkan ke setiap tugas Dataflow, lihat Opsi pipeline.

Jenis pipeline data

Dataflow memiliki dua jenis pipeline data, streaming dan batch. Kedua jenis pipeline menjalankan tugas yang ditentukan dalam template Dataflow.

Streaming data pipeline
Streaming data pipeline menjalankan tugas streaming Dataflow segera setelah dibuat.
Pipeline data batch

Pipeline data batch menjalankan tugas batch Dataflow sesuai jadwal yang ditentukan pengguna. Nama file input pipeline batch dapat diparameterkan untuk memungkinkan pemrosesan pipeline batch inkremental.

Pipeline batch inkremental

Anda dapat menggunakan placeholder tanggal/waktu untuk menentukan format file input tambahan untuk pipeline batch.

  • Placeholder untuk tahun, bulan, tanggal, jam, menit, dan detik dapat digunakan, dan harus mengikuti format strftime(). Placeholder didahului dengan simbol persen (%).
  • Pemformatan parameter tidak diverifikasi selama pembuatan pipeline.
    • Contoh: Jika Anda menentukan "gs://bucket/Y" sebagai jalur input berparameter, jalur tersebut akan dievaluasi sebagai "gs://bucket/Y", karena "Y" tanpa "%" sebelumnya tidak dipetakan ke format strftime().

Pada setiap waktu eksekusi pipeline batch terjadwal, bagian placeholder jalur input dievaluasi ke tanggal dan waktu saat ini (atau diubah waktunya). Nilai tanggal dievaluasi menggunakan tanggal saat ini di zona waktu tugas terjadwal. Jika jalur yang dievaluasi cocok dengan jalur file input, file tersebut akan diambil untuk diproses oleh pipeline batch pada waktu yang dijadwalkan.

  • Contoh: Pipeline batch dijadwalkan untuk diulang pada awal setiap jam PST. Jika Anda membuat parameter jalur input sebagai gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, pada 15 April 2021, pukul 18.00 PST, jalur input akan dievaluasi menjadi gs://bucket-name/2021-04-15/prefix-18_00.csv.

Menggunakan parameter pergeseran waktu

Anda dapat menggunakan parameter pergeseran waktu + atau - menit atau jam. Untuk mendukung pencocokan jalur input dengan tanggal waktu yang dievaluasi yang digeser sebelum atau setelah tanggal waktu jadwal pipeline saat ini, masukkan parameter ini dalam tanda kurung kurawal. Gunakan format {[+|-][0-9]+[m|h]}. Pipeline batch terus diulang pada waktu yang dijadwalkan, tetapi jalur input dievaluasi dengan offset waktu yang ditentukan.

  • Contoh: Pipeline batch dijadwalkan untuk diulang pada awal setiap jam PST. Jika Anda membuat parameter jalur input sebagai gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, pada 15 April 2021, pukul 18.00 PST, jalur input akan dievaluasi menjadi gs://bucket-name/2021-04-15/prefix-16_00.csv.

Peran pipeline data

Agar operasi pipeline data Dataflow berhasil, Anda memerlukan peran IAM yang diperlukan, sebagai berikut:

  1. Anda memerlukan peran yang sesuai untuk melakukan operasi:

  2. Akun layanan yang digunakan oleh Cloud Scheduler harus memiliki peran roles/iam.serviceAccountUser, baik akun layanan yang ditentukan pengguna maupun akun layanan Compute Engine default. Untuk mengetahui informasi selengkapnya, lihat Peran pipeline data.

  3. Anda harus dapat bertindak sebagai akun layanan yang digunakan oleh Cloud Scheduler dan Dataflow dengan diberi peran roles/iam.serviceAccountUser di akun tersebut. Jika Anda tidak memilih akun layanan untuk Cloud Scheduler dan Dataflow, akun layanan Compute Engine default akan digunakan.

Membuat pipeline data

Anda dapat membuat pipeline data Dataflow dengan dua cara:

  1. Mengimpor tugas, atau
  2. Membuat pipeline data

Halaman penyiapan pipeline data: Saat Anda pertama kali mengakses fitur pipeline Dataflow di konsol Google Cloud, halaman penyiapan akan terbuka. Aktifkan API yang tercantum untuk membuat pipeline data.

Mengimpor tugas

Anda dapat mengimpor tugas streaming atau batch Dataflow yang didasarkan pada template klasik atau fleksibel dan menjadikannya pipeline data.

  1. Di konsol Google Cloud, buka halaman Jobs Dataflow.

    Buka Tugas

  2. Pilih tugas yang telah selesai, lalu di halaman Job Details, pilih +Import as a pipeline.

  3. Di halaman Create pipeline from template, parameter diisi dengan opsi tugas yang diimpor.

  4. Untuk tugas batch, di bagian Jadwalkan pipeline Anda, berikan jadwal pengulangan. Memberikan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika tidak ditentukan, akun layanan Compute Engine default akan digunakan.

Membuat pipeline data

  1. Di konsol Google Cloud, buka halaman Pipeline data Dataflow.

    Buka Data pipelines

  2. Pilih +Create data pipeline.

  3. Di halaman Create pipeline from template, berikan nama pipeline, dan isi kolom parameter dan pemilihan template lainnya.

  4. Untuk tugas batch, di bagian Jadwalkan pipeline Anda, berikan jadwal pengulangan. Memberikan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika nilai tidak ditentukan, akun layanan Compute Engine default akan digunakan.

Membuat pipeline data batch

Untuk membuat contoh pipeline data batch ini, Anda harus memiliki akses ke resource berikut dalam project:

Contoh pipeline ini menggunakan template pipeline batch Teks Cloud Storage ke BigQuery. Template ini membaca file dalam format CSV dari Cloud Storage, menjalankan transformasi, lalu menyisipkan nilai ke dalam tabel BigQuery dengan tiga kolom.

  1. Buat file berikut di drive lokal Anda:

    1. File bq_three_column_table.json yang berisi skema tabel BigQuery tujuan berikut.

      {
        "BigQuery Schema": [
          {
            "name": "col1",
            "type": "STRING"
          },
          {
            "name": "col2",
            "type": "STRING"
          },
          {
            "name": "col3",
            "type": "INT64"
          }
        ]
      }
      
    2. File JavaScript split_csv_3cols.js, yang menerapkan transformasi sederhana pada data input sebelum disisipkan ke BigQuery.

      function transform(line) {
          var values = line.split(',');
          var obj = new Object();
          obj.col1 = values[0];
          obj.col2 = values[1];
          obj.col3 = values[2];
          var jsonString = JSON.stringify(obj);
          return jsonString;
      }
      
    3. File CSV file01.csv dengan beberapa kumpulan data yang disisipkan ke dalam tabel BigQuery.

      b8e5087a,74,27531
      7a52c051,4a,25846
      672de80f,cd,76981
      111b92bf,2e,104653
      ff658424,f0,149364
      e6c17c75,84,38840
      833f5a69,8f,76892
      d8c833ff,7d,201386
      7d3da7fb,d5,81919
      3836d29b,70,181524
      ca66e6e5,d7,172076
      c8475eb6,03,247282
      558294df,f3,155392
      737b82a8,c7,235523
      82c8f5dc,35,468039
      57ab17f9,5e,480350
      cbcdaf84,bd,354127
      52b55391,eb,423078
      825b8863,62,88160
      26f16d4f,fd,397783
      
  2. Gunakan perintah gcloud storage cp untuk menyalin file ke folder di bucket Cloud Storage dalam project Anda, sebagai berikut:

    1. Salin bq_three_column_table.json dan split_csv_3cols.js ke gs://BUCKET_ID/text_to_bigquery/

      gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
      gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
    2. Salin file01.csv ke gs://BUCKET_ID/inputs/

      gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
  3. Di konsol Google Cloud, buka halaman Bucket Cloud Storage.

    Buka Bucket

  4. Untuk membuat folder tmp di bucket Cloud Storage, pilih nama folder untuk membuka Halaman detail bucket, lalu klik Buat folder.

    Tombol Buat folder di halaman Detail bucket.

  5. Di konsol Google Cloud, buka halaman Pipeline data Dataflow.

    Buka Data pipelines

  6. Pilih Create data pipeline. Masukkan atau pilih item berikut di halaman Create pipeline from template:

    1. Untuk Pipeline name, masukkan text_to_bq_batch_data_pipeline.
    2. Untuk Regional endpoint, pilih region Compute Engine. Region sumber dan tujuan harus cocok. Oleh karena itu, bucket Cloud Storage dan tabel BigQuery Anda harus berada di region yang sama.
    3. Untuk Dataflow template, di Process Data in Bulk (batch), pilih Text Files on Cloud Storage to BigQuery.

    4. Untuk Jadwalkan pipeline Anda, pilih jadwal, seperti Per jam pada menit 25, di zona waktu Anda. Anda dapat mengedit jadwal setelah mengirimkan pipeline. Memberikan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika tidak ditentukan, akun layanan Compute Engine default akan digunakan.

    5. Di Required parameters, masukkan hal berikut:

      1. Untuk JavaScript UDF path in Cloud Storage:
        gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
        
      2. Untuk Jalur JSON:
        BUCKET_ID/text_to_bigquery/bq_three_column_table.json
        
      3. Untuk JavaScript UDF name: transform
      4. Untuk BigQuery output table:
        PROJECT_ID:DATASET_ID.three_column_table
        
      5. Untuk Jalur input Cloud Storage:
        BUCKET_ID/inputs/file01.csv
        
      6. Untuk Temporary BigQuery directory:
        BUCKET_ID/tmp
        
      7. Untuk Temporary location:
        BUCKET_ID/tmp
        
    6. Klik Create pipeline.

  7. Konfirmasi informasi template dan pipeline, serta lihat histori saat ini dan sebelumnya dari halaman Detail pipeline.

    Halaman detail pipeline.

Anda dapat mengedit jadwal pipeline data dari panel Info pipeline di halaman Detail pipeline.

Tombol Edit di samping jadwal pipeline.

Anda juga dapat menjalankan pipeline batch sesuai permintaan menggunakan tombol Run di konsol Dataflow Pipelines.

Membuat contoh pipeline data streaming

Anda dapat membuat contoh pipeline data streaming dengan mengikuti contoh petunjuk pipeline batch, dengan perbedaan berikut:

  • Untuk Jadwal pipeline, jangan tentukan jadwal untuk pipeline data streaming. Tugas streaming Dataflow akan segera dimulai.
  • Untuk Dataflow template, di Process Data Continuously (stream), pilih Text Files on Cloud Storage to BigQuery.
  • Untuk Jenis mesin pekerja, pipeline memproses kumpulan awal file yang cocok dengan pola gs://BUCKET_ID/inputs/file01.csv dan file tambahan apa pun yang cocok dengan pola ini yang Anda upload ke folder inputs/. Jika ukuran file CSV melebihi beberapa GB, untuk menghindari kemungkinan error kehabisan memori, pilih jenis mesin dengan memori yang lebih tinggi daripada jenis mesin n1-standard-4 default, seperti n1-highmem-8.

Pemecahan masalah

Bagian ini menunjukkan cara menyelesaikan masalah pada pipeline data Dataflow.

Tugas pipeline data gagal diluncurkan

Saat Anda menggunakan pipeline data untuk membuat jadwal tugas berulang, tugas Dataflow Anda mungkin tidak diluncurkan, dan error status 503 akan muncul di file log Cloud Scheduler.

Masalah ini terjadi saat Dataflow untuk sementara tidak dapat menjalankan tugas.

Untuk mengatasi masalah ini, konfigurasikan Cloud Scheduler untuk mencoba kembali tugas. Karena masalah tersebut bersifat sementara, saat dicoba ulang, tugas mungkin berhasil. Untuk mengetahui informasi selengkapnya tentang cara menetapkan nilai percobaan ulang di Cloud Scheduler, lihat Membuat tugas.

Menyelidiki pelanggaran tujuan pipeline

Bagian berikut menjelaskan cara menyelidiki pipeline yang tidak memenuhi tujuan performa.

Pipeline batch berulang

Untuk analisis awal kondisi pipeline Anda, di halaman Info pipeline di konsol Google Cloud, gunakan grafik Status tugas individual dan Waktu thread per langkah. Grafik ini terletak di panel status pipeline.

Contoh investigasi:

  1. Anda memiliki pipeline batch berulang yang berjalan setiap jam pada pukul 3 menit setelah jam. Setiap tugas biasanya berjalan selama sekitar 9 menit. Anda memiliki tujuan agar semua tugas dapat diselesaikan dalam waktu kurang dari 10 menit.

  2. Grafik status tugas menunjukkan bahwa tugas berjalan selama lebih dari 10 menit.

  3. Di tabel histori Pembaruan/Eksekusi, temukan tugas yang berjalan selama jam yang diinginkan. Klik halaman detail tugas Dataflow. Di halaman tersebut, temukan tahap yang berjalan lebih lama, lalu cari kemungkinan error dalam log untuk menentukan penyebab penundaan.

Pipeline streaming

Untuk analisis awal kondisi pipeline Anda, di halaman Detail Pipeline, di tab Info pipeline, gunakan grafik keaktualan data. Grafik ini terletak di panel status pipeline.

Contoh investigasi:

  1. Anda memiliki pipeline streaming yang biasanya menghasilkan output dengan keaktualan data 20 detik.

  2. Anda menetapkan tujuan untuk memiliki jaminan keaktualan data 30 detik. Saat meninjau grafik keaktualan data, Anda melihat bahwa antara pukul 09.00 dan 10.00, keaktualan data melonjak menjadi hampir 40 detik.

    Grafik keaktualan data yang menunjukkan peningkatan jumlah menit keaktualan data.

  3. Beralihlah ke tab Pipeline metrics, lalu lihat grafik CPU Utilization dan Memory Utilization untuk analisis lebih lanjut.

Error: ID pipeline sudah ada dalam project

Jika Anda mencoba membuat pipeline baru dengan nama yang sudah ada dalam project, Anda akan menerima pesan error ini: Pipeline Id already exist within the project. Untuk menghindari masalah ini, selalu pilih nama unik untuk pipeline Anda.