Mengembangkan dan menguji pipeline Dataflow

Halaman ini memberikan praktik terbaik untuk mengembangkan dan menguji pipeline Dataflow Anda.

Ringkasan

Cara penerapan kode untuk pipeline Anda memiliki pengaruh yang signifikan terhadap seberapa baik performa pipeline dalam produksi. Untuk membantu Anda membuat kode pipeline yang berfungsi dengan benar dan efisien, dokumen ini menjelaskan hal berikut:

  • Runner pipeline untuk mendukung eksekusi kode di berbagai tahap pengembangan dan deployment.
  • Lingkungan deployment yang memungkinkan Anda menjalankan pipeline selama pengembangan, pengujian, praproduksi, dan produksi.
  • Template dan kode pipeline open source yang dapat Anda gunakan sebagaimana adanya, atau sebagai dasar bagi pipeline baru untuk mempercepat pengembangan kode.
  • Praktik terbaik coding untuk mengembangkan pipeline Anda guna meningkatkan kemampuan observasi dan performa pipeline. Banyak dari praktik berikut yang berlaku untuk pemrograman menggunakan Apache Beam SDK (contoh menggunakan Java) dan tidak khusus untuk Dataflow. Namun, dalam banyak kasus, Dataflow menyediakan fitur yang melengkapi praktik coding ini untuk meningkatkan kesiapan produksi.
  • Pendekatan praktik terbaik untuk menguji kode pipeline. Pertama, dokumen ini memberikan ringkasan yang menyertakan cakupan dan hubungan berbagai jenis pengujian, seperti pengujian unit, pengujian integrasi, dan pengujian menyeluruh. Kedua, setiap jenis pengujian dibahas secara mendetail, termasuk metode untuk membuat dan berintegrasi dengan data pengujian, serta runner pipeline mana yang akan digunakan untuk setiap pengujian.

Runner pipeline

Selama pengembangan dan pengujian, Anda menggunakan runner Apache Beam yang berbeda untuk menjalankan kode pipeline. Apache Beam SDK menyediakan Direct Runner untuk pengembangan dan pengujian lokal. Alat otomatisasi rilis Anda juga dapat menggunakan Direct Runner untuk pengujian unit dan pengujian integrasi. Misalnya, Anda dapat menggunakan Direct Runner dalam pipeline continuous integration (CI).

Pipeline yang di-deploy ke Dataflow menggunakan Dataflow Runner, yang menjalankan pipeline Anda di lingkungan seperti produksi. Selain itu, Anda dapat menggunakan Dataflow Runner untuk pengujian pengembangan ad hoc dan untuk pengujian pipeline end-to-end.

Meskipun halaman ini berfokus pada menjalankan pipeline yang dibangun menggunakan Apache Beam Java SDK, Dataflow juga mendukung pipeline Apache Beam yang dikembangkan menggunakan Python dan Go. Apache Beam Java, Python, dan Go SDK tersedia secara umum untuk Dataflow. Developer SQL juga dapat menggunakan Apache Beam SQL untuk membuat pipeline yang menggunakan dialek SQL yang sudah dikenal.

Menyiapkan lingkungan deployment

Untuk memisahkan pengguna, data, kode, dan resource lain di berbagai tahap pengembangan, buat lingkungan deployment. Jika memungkinkan, gunakan project Google Cloud yang terpisah untuk menyediakan lingkungan yang terisolasi untuk berbagai tahap pengembangan pipeline.

Bagian berikut ini menjelaskan kumpulan lingkungan deployment yang umum.

Lingkungan lokal

Lingkungan lokal adalah workstation developer. Untuk pengembangan dan pengujian cepat, gunakan Direct Runner untuk menjalankan kode pipeline secara lokal.

Pipeline yang dijalankan secara lokal menggunakan Direct Runner dapat berinteraksi dengan resource Google Cloud jarak jauh, seperti topik Pub/Sub atau tabel BigQuery. Berikan project Google Cloud terpisah kepada masing-masing developer sehingga mereka memiliki sandbox untuk pengujian ad hoc dengan layanan Google Cloud.

Beberapa layanan Google Cloud, seperti Pub/Sub dan Bigtable, menyediakan emulator untuk pengembangan lokal. Anda dapat menggunakan emulator ini dengan Direct Runner untuk mengaktifkan pengembangan dan pengujian lokal end-to-end.

Lingkungan sandbox

Lingkungan sandbox adalah project Google Cloud yang memberi developer akses ke layanan Google Cloud selama pengembangan kode. Developer pipe dapat membagikan project Google Cloud kepada developer lain, atau menggunakan project individual mereka sendiri. Menggunakan project individual akan mengurangi kompleksitas perencanaan yang terkait dengan penggunaan resource bersama dan pengelolaan kuota.

Developer menggunakan lingkungan sandbox untuk menjalankan eksekusi pipeline ad hoc dengan Dataflow Runner. Lingkungan sandbox berguna untuk proses debug dan pengujian kode terhadap runner produksi selama fase pengembangan kode. Misalnya, eksekusi pipeline ad hoc memungkinkan developer melakukan hal berikut:

  • Amati efek perubahan kode terhadap perilaku penskalaan.
  • Memahami potensi perbedaan antara perilaku Direct Runner dan Dataflow Runner.
  • Pahami cara Dataflow menerapkan pengoptimalan grafik.

Untuk pengujian ad hoc, developer dapat men-deploy kode dari lingkungan lokal mereka agar dapat menjalankan Dataflow dalam lingkungan sandbox mereka.

Lingkungan praproduksi

Lingkungan praproduksi ditujukan untuk fase pengembangan yang perlu dijalankan dalam kondisi seperti produksi, seperti pengujian menyeluruh. Gunakan project terpisah untuk lingkungan praproduksi dan konfigurasikan agar mirip dengan lingkungan production. Demikian pula, untuk memungkinkan pengujian menyeluruh dengan skala seperti produksi, buat kuota project Google Cloud untuk Dataflow dan layanan lainnya semirip mungkin dengan lingkungan produksi.

Bergantung pada persyaratan, Anda dapat memisahkan lebih lanjut praproduksi ke dalam beberapa lingkungan. Misalnya, lingkungan kendali mutu dapat mendukung pekerjaan analis kualitas untuk menguji tujuan tingkat layanan (SLO) seperti ketepatan data, keaktualan, dan performa dalam berbagai kondisi beban kerja.

Pengujian menyeluruh mencakup integrasi dengan sumber data dan sink dalam cakupan pengujian. Pertimbangkan cara menyediakan fitur ini dalam lingkungan praproduksi. Anda dapat menyimpan data pengujian di lingkungan praproduksi itu sendiri. Misalnya, data pengujian disimpan di bucket Cloud Storage bersama data input Anda. Dalam kasus lain, data pengujian mungkin berasal dari luar lingkungan praproduksi, seperti topik Pub/Sub melalui langganan terpisah yang ada di lingkungan produksi. Untuk pipeline streaming, Anda juga dapat menjalankan pengujian menyeluruh menggunakan data yang dihasilkan, misalnya, menggunakan Generator Data Streaming Dataflow untuk mengemulasikan volume dan karakteristik data seperti produksi.

Untuk pipeline streaming, gunakan lingkungan praproduksi untuk menguji update pipeline sebelum perubahan apa pun dilakukan pada produksi. Penting untuk menguji dan memverifikasi prosedur update untuk pipeline streaming, terutama jika Anda perlu mengoordinasikan beberapa langkah, seperti saat menjalankan pipeline paralel untuk menghindari periode nonaktif.

Lingkungan produksi

Lingkungan produksi adalah project Google Cloud khusus. Continuous delivery menyalin artefak deployment ke lingkungan produksi jika semua pengujian menyeluruh telah lulus.

Praktik terbaik pengembangan

Bagian ini membahas praktik terbaik coding dan pengembangan. Banyak dari praktik ini melengkapi dan meningkatkan aspek pengembangan dan operasionalisasi pipeline, seperti meningkatkan produktivitas developer, meningkatkan kemampuan pengujian pipeline, meningkatkan performa, dan memungkinkan insight yang lebih mendalam dengan pemantauan.

Sebelum memulai pengembangan, siapkan lingkungan deployment yang mendukung siklus proses pengembangan, pengujian, dan pengiriman Anda.

Gunakan template yang disediakan Google

Untuk mempercepat pengembangan pipeline, periksa apakah Google menyediakan template Dataflow yang ada. Beberapa template memungkinkan Anda menambahkan logika kustom sebagai langkah pipeline. Misalnya, template Pub/Sub Subscription to BigQuery menyediakan parameter untuk menjalankan fungsi yang ditentukan pengguna (UDF) JavaScript yang tersimpan di Cloud Storage. Template yang disediakan Google bersifat open source dengan Lisensi Apache 2.0, sehingga Anda dapat menggunakannya sebagai dasar untuk pipeline baru. {i>Template<i} juga berguna sebagai contoh kode untuk referensi.

Membuat library transformasi yang dapat digunakan kembali

Model pemrograman Apache Beam menggabungkan pemrosesan data batch dan streaming, sehingga memungkinkan penggunaan ulang transformasi. Membuat library bersama yang berisi transformasi umum akan mendorong penggunaan kembali, pengujian, dan kepemilikan kode oleh tim yang berbeda.

Pertimbangkan dua contoh kode Java berikut, yang membaca peristiwa pembayaran. Yang pertama berasal dari sumber Pub/Sub tak terbatas:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments =
    p.apply("Read from topic",
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

Yang kedua adalah dari sumber database relasional terbatas:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments =
    p.apply(
        "Read from database table",
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

Dengan asumsi bahwa kedua pipeline melakukan pemrosesan yang sama, keduanya dapat menggunakan transformasi yang sama melalui library bersama untuk langkah pemrosesan yang tersisa. Cara menerapkan praktik terbaik penggunaan kembali kode bervariasi menurut bahasa pemrograman dan alat build. Misalnya, jika menggunakan Maven, Anda dapat memisahkan kode transformasi ke dalam modulnya sendiri. Kemudian, Anda dapat menyertakan modul sebagai submodul dalam project multi-modul yang lebih besar untuk pipeline yang berbeda, seperti ditunjukkan dalam contoh kode berikut:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

Untuk mengetahui informasi selengkapnya, lihat dokumentasi Apache Beam untuk praktik terbaik tentang menulis kode pengguna untuk transformasi Apache Beam dan untuk panduan gaya yang direkomendasikan untuk PTransform.

Menggunakan antrean yang dihentikan pengirimannya untuk penanganan error

Pipeline Anda mungkin mengalami situasi ketika elemen tidak dapat diproses. Situasi ini dapat terjadi karena berbagai alasan, tetapi penyebab umumnya adalah masalah data. Misalnya, elemen yang berisi JSON yang diformat dengan buruk dapat menyebabkan kegagalan penguraian.

Dalam situasi ini, salah satu pendekatan adalah menangkap pengecualian dalam metode DoFn.ProcessElement. Dalam blok pengecualian, Anda mungkin mencatat error dan melepaskan elemen. Namun, hal ini menyebabkan data hilang dan mencegah data tersebut diperiksa nantinya untuk penanganan atau pemecahan masalah manual.

Pendekatan yang lebih baik adalah menggunakan pola yang disebut antrean yang dihentikan pengirimannya (atau file yang dihentikan pengirimannya). Tangkap pengecualian dalam metode DoFn.ProcessElement dan catat error seperti yang biasa Anda lakukan. Daripada menghapus elemen yang gagal, gunakan output bercabang untuk menulis elemen yang gagal ke objek PCollection yang terpisah. Elemen-elemen ini kemudian ditulis ke sink data untuk diperiksa dan ditangani nanti menggunakan transformasi terpisah.

Contoh kode Java berikut menunjukkan cara menerapkan pola antrean mati:

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

Anda dapat menggunakan Cloud Monitoring untuk menerapkan kebijakan pemantauan dan pemberitahuan yang berbeda untuk antrean yang dihentikan pengirimannya di pipeline. Misalnya, Anda dapat memvisualisasikan jumlah dan ukuran elemen yang diproses oleh transformasi yang dihentikan pengirimannya dan mengonfigurasi pemberitahuan untuk dipicu jika kondisi nilai minimum tertentu terpenuhi.

Menangani mutasi skema

Anda dapat menangani data yang memiliki skema yang tidak terduga tetapi valid menggunakan pola dead-huruf, yang menulis elemen yang gagal ke objek PCollection yang terpisah. Namun, dalam beberapa kasus, Anda mungkin ingin otomatis menangani elemen yang mencerminkan skema yang berubah sebagai elemen yang valid. Misalnya, jika skema elemen mencerminkan mutasi seperti penambahan kolom baru, Anda dapat menyesuaikan skema sink data ini untuk mengakomodasi mutasi.

Mutasi skema otomatis bergantung pada pendekatan percabangan-output yang digunakan oleh pola yang dihentikan. Namun, dalam hal ini, pemicu akan memicu transformasi yang mengubah skema tujuan setiap kali skema tambahan ditemukan. Untuk contoh pendekatan ini, lihat Cara menangani perubahan skema JSON di pipeline streaming, dengan Square Enix di blog Google Cloud.

Pilih dengan benar antara input samping atau CoGroupByKey untuk gabungan

Menggabungkan set data adalah kasus penggunaan umum untuk pipeline data. Input samping menyediakan cara fleksibel untuk menyelesaikan masalah pemrosesan data umum, seperti pengayaan data dan pencarian dengan kunci. Tidak seperti objek PCollection, input samping juga dapat diubah, dan dapat ditentukan saat runtime. Misalnya, nilai dalam input samping mungkin dihitung oleh cabang lain dalam pipeline Anda atau ditentukan dengan memanggil layanan jarak jauh.

Dataflow mendukung input samping dengan menyimpan data ke dalam penyimpanan persisten (mirip disk bersama), yang membuat input samping lengkap tersedia untuk semua pekerja. Ukuran input samping bisa sangat besar dan mungkin tidak masuk ke dalam memori pekerja. Membaca dari input sisi besar dapat menyebabkan masalah performa jika pekerja harus terus-menerus membaca dari penyimpanan persisten.

Transformasi CoGroupByKey adalah transformasi Apache Beam inti yang menggabungkan (meratakan) beberapa objek PCollection dan mengelompokkan elemen yang memiliki kunci yang sama. Tidak seperti input samping, yang membuat seluruh data input samping tersedia untuk setiap pekerja, CoGroupByKey melakukan operasi acak (pengelompokan) untuk mendistribusikan data ke seluruh pekerja. Oleh karena itu, CoGroupByKey ideal jika objek PCollection yang ingin Anda gabungkan sangat besar dan tidak sesuai dengan memori pekerja.

Ikuti panduan ini untuk membantu memutuskan apakah akan menggunakan input samping atau CoGroupByKey:

  • Gunakan input samping saat salah satu objek PCollection yang Anda gabungkan berukuran jauh lebih kecil dari yang lain, dan saat objek PCollection yang lebih kecil cocok dengan memori pekerja. Menyimpan input samping ke dalam cache sepenuhnya ke dalam memori membuat pengambilan elemen dapat dilakukan dengan cepat dan efisien.
  • Gunakan CoGroupByKey jika Anda perlu mengambil sebagian besar objek PCollection yang secara signifikan melebihi memori pekerja.
  • Gunakan input samping jika Anda memiliki objek PCollection yang harus digabungkan beberapa kali di pipeline Anda. Daripada menggunakan beberapa transformasi CoGroupByKey, Anda dapat membuat input satu sisi yang dapat digunakan kembali oleh beberapa transformasi ParDo.

Untuk mengetahui informasi selengkapnya, lihat Memecahkan masalah Dataflow kehabisan memori.

Meminimalkan operasi per elemen yang mahal

Instance DoFn memproses batch elemen yang disebut paket, yang merupakan unit atom pekerjaan yang terdiri dari nol atau beberapa elemen. Setiap elemen kemudian diproses oleh metode DoFn.ProcessElement, yang berjalan untuk setiap elemen. Karena metode DoFn.ProcessElement dipanggil untuk setiap elemen, operasi apa pun yang memakan waktu atau mahal secara komputasi, yang dipanggil oleh metode tersebut akan menyebabkan operasi ini dijalankan untuk setiap elemen yang diproses oleh metode tersebut.

Jika Anda hanya perlu menjalankan operasi yang mahal satu kali untuk sekumpulan elemen, sertakan operasi tersebut dalam metode DoFn.Setup dan DoFn.StartBundle, bukan dalam DoFn.ProcessElement. Contohnya mencakup:

  • Mengurai file konfigurasi yang mengontrol beberapa aspek perilaku instance DoFn. Hanya panggil tindakan ini satu kali, saat instance DoFn diinisialisasi dengan menggunakan metode DoFn.Setup.

  • Membuat instance klien berumur pendek yang digunakan kembali di semua elemen dalam paket, seperti saat semua elemen dalam paket dikirim melalui satu koneksi jaringan. Panggil tindakan ini satu kali per paket dengan menggunakan metode DoFn.StartBundle.

Batasi ukuran tumpukan dan panggilan serentak ke layanan eksternal

Saat memanggil layanan eksternal, Anda dapat mengurangi overhead per panggilan menggunakan transformasi GroupIntoBatches untuk membuat batch elemen dengan ukuran tertentu. Pengelompokan mengirim elemen ke layanan eksternal sebagai satu payload, bukan satu per satu.

Jika digabungkan dengan pengelompokan, Anda dapat membatasi jumlah maksimum panggilan paralel (serentak) ke layanan eksternal dengan memilih kunci yang sesuai untuk mempartisi data yang masuk. Jumlah partisi menentukan paralelisasi maksimum. Misalnya, jika setiap elemen diberi kunci yang sama, transformasi downstream untuk memanggil layanan eksternal tidak akan berjalan secara paralel.

Pertimbangkan salah satu pendekatan berikut untuk menghasilkan kunci elemen:

  • Pilih atribut set data untuk digunakan sebagai kunci data, seperti ID pengguna.
  • Membuat kunci data untuk memisahkan elemen secara acak pada sejumlah partisi tetap, dengan jumlah kemungkinan nilai kunci menentukan jumlah partisi. Anda harus membuat partisi yang cukup untuk paralelisme, dan setiap partisi harus memiliki elemen yang cukup agar GroupIntoBatches dapat berguna.

Contoh kode Java berikut menunjukkan cara membagi elemen secara acak dalam 10 partisi:

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API
// (which has a rate limit but allows large payloads)
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

Mengidentifikasi masalah performa yang disebabkan oleh langkah-langkah yang digabungkan secara tidak tepat

Dataflow membuat grafik langkah-langkah yang merepresentasikan pipeline Anda, berdasarkan transformasi dan data yang Anda gunakan untuk membuatnya. Grafik ini disebut grafik eksekusi pipeline.

Saat Anda men-deploy pipeline, Dataflow mungkin mengubah grafik eksekusi pipeline untuk meningkatkan performa. Misalnya, Dataflow mungkin menggabungkan beberapa operasi, sebuah proses yang dikenal sebagai pengoptimalan fusi, untuk menghindari dampak performa dan biaya saat menulis setiap objek PCollection perantara dalam pipeline Anda.

Dalam beberapa kasus, Dataflow mungkin salah menentukan cara optimal untuk menggabungkan operasi di pipeline, yang dapat membatasi kemampuan layanan Dataflow untuk memanfaatkan semua pekerja yang tersedia. Dalam kasus tersebut, Anda mungkin ingin mencegah beberapa operasi digabungkan.

Perhatikan contoh kode Apache Beam berikut. Transformasi GenerateSequence membuat objek PCollection kecil yang dibatasi, yang kemudian diproses lebih lanjut oleh dua transformasi ParDo downstream.

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number",
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

Transformasi Find Primes Less-than-N mungkin mahal secara komputasi dan mungkin berjalan lambat untuk jumlah besar. Sebaliknya, Anda akan mengharapkan transformasi Increment Number selesai dengan cepat.

Diagram berikut menunjukkan representasi grafis pipeline di antarmuka pemantauan Dataflow.

Representasi alur pipeline di antarmuka Dataflow.

Memantau tugas menggunakan Dataflow Monitoring Interface menunjukkan tingkat pemrosesan lambat yang sama untuk kedua transformasi, yaitu 13 elemen per detik. Anda mungkin mengharapkan transformasi Increment Number memproses elemen dengan cepat, tetapi transformasi tersebut tampak terikat dengan kecepatan pemrosesan yang sama dengan Find Primes Less-than-N.

Alasannya adalah Dataflow menggabungkan langkah-langkah tersebut menjadi satu tahap, sehingga mencegahnya berjalan secara independen. Anda dapat menggunakan perintah gcloud berikut:

gcloud dataflow jobs describe --full job-id --format json

Dalam output yang dihasilkan, langkah-langkah gabungan tersebut dijelaskan dalam objek ExecutionStageSummary di array ComponentTransform:

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

Dalam skenario ini, transformasi Find Primes Less-than-N adalah langkah yang lambat, sehingga memecah fusi sebelum langkah tersebut adalah strategi yang tepat. Salah satu metode untuk memisahkan langkah-langkah adalah dengan menyisipkan transformasi GroupByKey dan membatalkan pengelompokan sebelum langkah, seperti yang ditunjukkan dalam contoh kode Java berikut:

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

Anda juga dapat menggabungkan langkah-langkah yang tidak menyatu ini menjadi transformasi gabungan yang dapat digunakan kembali.

Setelah Anda membatalkan penggabungan langkah-langkah, saat menjalankan pipeline, Anda akan melihat bahwa Increment Number selesai dalam hitungan detik, dan transformasi Find Primes Less-than-N yang berjalan lebih lama dijalankan dalam tahap terpisah.

Contoh ini menerapkan operasi grup dan pemisahan untuk membatalkan penggabungan langkah. Anda dapat menggunakan pendekatan lain untuk keadaan lain. Dalam hal ini, menangani output duplikat tidak menjadi masalah, mengingat output berturut-turut dari transformasi GenerateSequence. Objek KV dengan kunci duplikat dihapus duplikatnya menjadi satu kunci dalam transformasi grup (GroupByKey) dan pisahkan (Keys). Untuk mempertahankan duplikat setelah operasi kelompok dan pemisahan, buat pasangan KV menggunakan kunci acak dan input asli sebagai nilai, kelompokkan menggunakan kunci acak, lalu tampilkan nilai untuk setiap kunci sebagai output.

Menggunakan metrik Apache Beam untuk mengumpulkan insight pipeline

Metrik Apache Beam adalah class utilitas untuk menghasilkan berbagai metrik guna melaporkan properti pipeline yang berjalan. Saat Anda menggunakan Cloud Monitoring, metrik Apache Beam tersedia sebagai metrik kustom Cloud Monitoring.

Cuplikan Java berikut adalah contoh metrik Counter yang digunakan di subclass DoFn.

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

Kode contoh menggunakan dua penghitung. Satu penghitung melacak kegagalan penguraian JSON (malformedCounter), dan penghitung lainnya melacak apakah pesan JSON valid tetapi berisi payload kosong (emptyCounter). Di Cloud Monitoring, nama metrik kustom adalah custom.googleapis.com/dataflow/malformedJson dan custom.googleapis.com/dataflow/emptyPayload. Anda dapat menggunakan metrik kustom untuk membuat visualisasi dan kebijakan pemberitahuan di Cloud Monitoring.

Menguji pipeline Anda

Dalam pengembangan software, pengujian unit, pengujian integrasi, dan pengujian menyeluruh adalah jenis pengujian software yang umum. Jenis pengujian ini juga berlaku untuk pipeline data.

Apache Beam SDK menyediakan fungsi untuk mengaktifkan pengujian ini. Idealnya, setiap jenis pengujian menargetkan lingkungan deployment yang berbeda. Diagram berikut menggambarkan bagaimana pengujian unit, pengujian integrasi, dan pengujian menyeluruh berlaku untuk berbagai bagian pipeline dan data Anda.

Menguji jenis dan kaitannya dengan transformasi, pipeline, sumber data, dan sink data.

Diagram menunjukkan cakupan berbagai pengujian dan kaitannya dengan transformasi (subclass DoFn dan PTransform), pipeline, sumber data, dan sink data.

Bagian berikut menjelaskan bagaimana berbagai pengujian software formal diterapkan pada pipeline data menggunakan Dataflow. Saat Anda membaca bagian ini, lihat kembali diagram untuk memahami keterkaitan berbagai jenis pengujian.

Sampling data

Untuk mengamati data di setiap langkah pipeline Dataflow, aktifkan sampling data selama pengujian. Hal ini memungkinkan Anda melihat output transformasi, untuk memastikan outputnya sudah benar.

Pengujian unit

Pengujian unit menilai fungsi yang benar dari subclass DoFn dan transformasi gabungan (subclass PTransform) dengan membandingkan output transformasi tersebut dengan kumpulan input dan output data yang terverifikasi. Biasanya, developer dapat menjalankan pengujian ini di lingkungan lokal. Pengujian juga dapat berjalan secara otomatis melalui otomatisasi pengujian unit menggunakan continuous integration (CI) di lingkungan build.

Direct Runner menjalankan pengujian unit menggunakan subkumpulan data pengujian referensi yang lebih kecil dan berfokus pada pengujian logika bisnis transformasi Anda. Data pengujian harus cukup kecil agar masuk ke dalam memori lokal pada mesin yang menjalankan pengujian.

Apache Beam SDK menyediakan aturan JUnit yang disebut TestPipeline untuk pengujian unit masing-masing transformasi (subclass DoFn), transformasi gabungan (subclass PTransform), dan seluruh pipeline. Anda dapat menggunakan TestPipeline pada runner pipeline Apache Beam seperti Direct Runner atau Dataflow Runner untuk menerapkan pernyataan pada konten objek PCollection menggunakan PAssert, seperti yang ditunjukkan dalam cuplikan kode class pengujian JUnit berikut:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

Pengujian unit untuk transformasi individual

Dengan memperhitungkan kode ke dalam transformasi yang dapat digunakan kembali, misalnya, sebagai class level atas atau bertingkat statis, Anda dapat membuat pengujian yang ditargetkan untuk berbagai bagian pipeline Anda. Selain manfaat pengujian, transformasi yang dapat digunakan kembali meningkatkan pengelolaan dan penggunaan kembali kode dengan secara alami mengenkapsulasi logika bisnis pipeline Anda menjadi bagian-bagian komponen. Sebaliknya, menguji setiap bagian pipeline mungkin akan sulit jika pipeline menggunakan class dalam anonim untuk mengimplementasikan transformasi.

Cuplikan Java berikut menunjukkan implementasi transformasi sebagai class dalam anonim, yang tidak memungkinkan pengujian dengan mudah.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

Bandingkan contoh sebelumnya dengan contoh berikut, saat class dalam anonim difaktorkan ulang menjadi subclass DoFn konkret bernama. Anda dapat membuat pengujian unit individual untuk setiap subclass DoFn konkret yang membentuk pipeline end-to-end.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

Menguji setiap subclass DoFn mirip dengan pengujian unit pipeline batch yang berisi satu transformasi. Gunakan transformasi Create untuk membuat objek PCollection dari data pengujian, lalu teruskan ke objek DoFn. Gunakan PAssert untuk menyatakan bahwa konten objek PCollection sudah benar. Contoh kode Java berikut menggunakan class PAssert untuk memeriksa bentuk output yang benar.

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

Pengujian integrasi

Pengujian integrasi memverifikasi fungsi yang benar dari seluruh pipeline Anda. Pertimbangkan jenis pengujian integrasi berikut:

  • Pengujian integrasi transformasi yang menilai fungsi terintegrasi dari semua transformasi individual yang membentuk pipeline data Anda. Anggap pengujian integrasi transformasi sebagai pengujian unit untuk seluruh pipeline Anda, kecuali integrasi dengan sumber data eksternal dan sink. Apache Beam SDK menyediakan metode untuk menyediakan data pengujian ke pipeline data Anda dan memverifikasi hasil pemrosesan. Direct Runner digunakan untuk menjalankan pengujian integrasi transformasi.
  • Pengujian integrasi sistem yang menilai integrasi pipeline data Anda dengan sumber data live dan sink. Agar pipeline dapat berkomunikasi dengan sistem eksternal, Anda harus mengonfigurasi pengujian dengan kredensial yang sesuai untuk mengakses layanan eksternal. Pipeline streaming berjalan tanpa batas waktu, sehingga Anda perlu memutuskan waktu dan cara menghentikan pipeline yang sedang berjalan. Dengan menggunakan Direct Runner untuk menjalankan pengujian integrasi sistem, Anda dengan cepat memverifikasi integrasi antara pipeline dan sistem lain tanpa perlu mengirimkan tugas Dataflow dan menunggu hingga selesai.

Desain transformasi dan pengujian integrasi sistem untuk memberikan deteksi kerusakan dan masukan yang cepat tanpa memperlambat produktivitas developer. Untuk pengujian yang berjalan lebih lama, seperti yang berjalan sebagai tugas Dataflow, Anda dapat menggunakan pengujian menyeluruh yang berjalan lebih jarang.

Bayangkan pipeline data sebagai satu atau beberapa transformasi terkait. Anda dapat membuat transformasi komposit enkapsulasi untuk pipeline dan menggunakan TestPipeline untuk melakukan pengujian integrasi seluruh pipeline Anda. Bergantung pada apakah Anda ingin menguji pipeline dalam mode batch atau streaming, berikan data pengujian menggunakan transformasi Create atau TestStream.

Menggunakan data pengujian untuk pengujian integrasi

Di lingkungan produksi, pipeline Anda kemungkinan besar terintegrasi dengan berbagai sumber data dan sink. Namun, untuk pengujian unit dan pengujian integrasi transformasi, fokuslah untuk memverifikasi logika bisnis kode pipeline dengan memberikan input pengujian dan memverifikasi output secara langsung. Selain menyederhanakan pengujian, pendekatan ini memungkinkan Anda mengisolasi masalah khusus pipeline dari masalah yang mungkin disebabkan oleh sumber data dan sink.

Menguji pipeline batch

Untuk pipeline batch, gunakan transformasi Create untuk membuat objek PCollection data pengujian input Anda dari koleksi dalam memori standar, seperti objek List Java. Transformasi Create adalah pilihan tepat jika data pengujian Anda cukup kecil untuk disertakan dalam kode. Anda kemudian dapat menggunakan PAssert pada objek PCollection output untuk menentukan ketepatan kode pipeline Anda. Pendekatan ini didukung oleh Direct Runner dan oleh Dataflow Runner.

Cuplikan kode Java berikut menampilkan pernyataan terhadap objek PCollection output dari transformasi gabungan yang menyertakan beberapa atau semua transformasi individual yang membentuk pipeline (WeatherStatsPipeline). Pendekatan ini mirip dengan pengujian unit setiap transformasi dalam pipeline.

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

Untuk menguji perilaku windowing, Anda juga dapat menggunakan transformasi Create untuk membuat elemen dengan stempel waktu, seperti yang ditunjukkan dalam cuplikan kode berikut:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

Menguji pipeline streaming

Pipeline streaming berisi asumsi yang menentukan cara menangani data tidak terbatas. Asumsi ini sering kali berkaitan dengan ketepatan waktu data dalam kondisi dunia nyata, sehingga berdampak pada kebenaran bergantung pada apakah asumsi tersebut terbukti benar atau salah. Pengujian integrasi untuk pipeline streaming idealnya mencakup pengujian yang menyimulasikan sifat kedatangan data streaming yang tidak deterministik.

Untuk mengaktifkan pengujian tersebut, Apache Beam SDK menyediakan class TestStream untuk membuat model efek pengaturan waktu elemen (data awal, tepat waktu, atau terlambat) pada hasil pipeline data Anda. Gunakan pengujian ini bersama dengan class PAssert untuk memverifikasi hasil yang diharapkan.

TestStream didukung oleh Direct Runner dan Runner Dataflow. Contoh kode berikut akan membuat transformasi TestStream:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

Untuk informasi selengkapnya tentang TestStream, lihat Menguji Pipeline Tanpa Batas di Apache Beam. Untuk informasi selengkapnya tentang cara menggunakan Apache Beam SDK untuk pengujian unit, lihat dokumentasi Apache Beam.

Menggunakan layanan Google Cloud dalam pengujian integrasi

Direct Runner dapat terintegrasi dengan layanan Google Cloud sehingga pengujian ad hoc di lingkungan lokal dan pengujian integrasi sistem dapat menggunakan Pub/Sub, BigQuery, dan layanan lainnya sesuai kebutuhan. Saat Anda menggunakan Direct Runner, pipeline Anda akan berjalan sebagai akun pengguna yang dikonfigurasi menggunakan alat command line gcloud atau sebagai akun layanan yang Anda tentukan menggunakan variabel lingkungan GOOGLE_APPLICATION_CREDENTIALS. Oleh karena itu, Anda harus memberikan izin yang memadai ke akun ini untuk resource yang diperlukan sebelum menjalankan pipeline Anda. Untuk mengetahui detail selengkapnya, lihat Keamanan dan izin Dataflow.

Untuk pengujian integrasi lokal sepenuhnya, Anda dapat menggunakan emulator lokal untuk beberapa layanan Google Cloud. Emulator lokal tersedia untuk Pub/Sub dan Bigtable.

Untuk pengujian integrasi sistem pipeline streaming, Anda dapat menggunakan metode setBlockOnRun (ditentukan dalam antarmuka DirectOptions) agar Direct Runner menjalankan pipeline Anda secara asinkron. Jika tidak, eksekusi pipeline akan memblokir proses induk panggilan (misalnya, skrip di pipeline build Anda) hingga pipeline dihentikan secara manual. Jika menjalankan pipeline secara asinkron, Anda dapat menggunakan instance PipelineResult yang ditampilkan untuk membatalkan eksekusi pipeline, seperti yang ditunjukkan dalam contoh kode berikut:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

Pengujian menyeluruh

Pengujian end-to-end memverifikasi operasi pipeline end-to-end yang benar dengan menjalankannya di Dataflow Runner dalam kondisi yang sangat mirip dengan produksi. Pengujian ini memverifikasi bahwa logika bisnis berfungsi dengan benar menggunakan Dataflow Runner dan menguji apakah pipeline berfungsi seperti yang diharapkan pada pemuatan seperti produksi. Anda biasanya menjalankan pengujian menyeluruh dalam project Google Cloud khusus yang ditetapkan sebagai lingkungan praproduksi.

Untuk menguji pipeline pada skala yang berbeda, gunakan berbagai jenis pengujian end-to-end, misalnya:

  • Jalankan pengujian menyeluruh berskala kecil menggunakan sebagian kecil (seperti satu persen) set data pengujian untuk memvalidasi fungsi pipeline dengan cepat di lingkungan praproduksi.
  • Jalankan pengujian menyeluruh berskala besar menggunakan set data pengujian lengkap untuk memvalidasi fungsi pipeline berdasarkan volume dan kondisi data seperti produksi.

Untuk pipeline streaming, sebaiknya jalankan pipeline pengujian secara paralel dengan pipeline produksi jika keduanya dapat menggunakan data yang sama. Proses ini memungkinkan Anda membandingkan hasil dan perilaku operasional, seperti penskalaan otomatis dan performa.

Pengujian menyeluruh membantu memprediksi seberapa baik pipeline Anda akan memenuhi SLO produksi. Lingkungan praproduksi menguji pipeline Anda dalam kondisi seperti produksi. Dalam pengujian menyeluruh, pipeline dijalankan menggunakan Dataflow Runner untuk memproses set data referensi lengkap yang cocok atau sangat mirip dengan set data dalam produksi.

Anda mungkin tidak dapat menghasilkan data sintetis untuk pengujian yang menyimulasikan data nyata secara akurat. Untuk mengatasi masalah ini, salah satu pendekatan adalah menggunakan ekstrak yang telah dibersihkan dari sumber data produksi untuk membuat set data referensi, dengan de-identifikasi setiap data sensitif melalui transformasi yang sesuai. Sebaiknya gunakan Perlindungan Data Sensitif untuk tujuan ini. Perlindungan Data Sensitif dapat mendeteksi data sensitif dari berbagai jenis konten dan sumber data serta menerapkan berbagai teknik de-identifikasi, termasuk penyamaran, penyamaran, enkripsi dengan mempertahankan format, dan perubahan tanggal.

Perbedaan pengujian end-to-end untuk pipeline batch dan streaming

Sebelum menjalankan pengujian lengkap secara menyeluruh terhadap set data pengujian besar, sebaiknya jalankan pengujian dengan persentase data pengujian yang lebih kecil (misalnya satu persen) dan verifikasi perilaku yang diharapkan dalam waktu yang lebih singkat. Seperti pada pengujian integrasi menggunakan Direct Runner, Anda dapat menggunakan PAssert pada objek PCollection saat menjalankan pipeline menggunakan Dataflow Runner. Untuk mengetahui informasi selengkapnya tentang PAssert, lihat bagian Pengujian unit di halaman ini.

Bergantung pada kasus penggunaan Anda, memverifikasi output yang sangat besar dari pengujian menyeluruh mungkin tidak praktis, mahal, atau sulit. Dalam hal ini, Anda dapat memverifikasi sampel representatif dari kumpulan hasil output. Misalnya, Anda dapat menggunakan BigQuery untuk mengambil sampel dan membandingkan baris output dengan set data referensi hasil yang diharapkan.

Untuk pipeline streaming, menyimulasikan kondisi streaming yang realistis dengan data sintetis mungkin sulit dilakukan. Cara umum untuk menyediakan data streaming untuk pengujian menyeluruh adalah dengan mengintegrasikan pengujian dengan sumber data produksi. Jika menggunakan Pub/Sub sebagai sumber data, Anda dapat mengaktifkan aliran data terpisah untuk pengujian menyeluruh melalui langganan tambahan ke topik yang ada. Kemudian, Anda dapat membandingkan hasil berbagai pipeline yang memakai data yang sama, yang berguna untuk memverifikasi pipeline kandidat dengan pipeline praproduksi dan produksi lainnya.

Diagram berikut menunjukkan cara metode ini memungkinkan pipeline produksi dan pipeline pengujian berjalan secara paralel di lingkungan deployment yang berbeda.

Menjalankan pipeline pengujian secara paralel dengan pipeline produksi menggunakan satu sumber streaming Pub/Sub.

Dalam diagram, kedua pipeline membaca dari topik Pub/Sub yang sama, tetapi menggunakan langganan terpisah. Dengan penyiapan ini, kedua pipeline dapat memproses data yang sama secara independen dan Anda dapat membandingkan hasilnya. Pipeline pengujian menggunakan akun layanan yang terpisah dari project produksi, sehingga menghindari penggunaan kuota pelanggan Pub/Sub untuk project produksi.

Tidak seperti pipeline batch, pipeline streaming terus berjalan hingga dibatalkan secara eksplisit. Dalam pengujian menyeluruh, Anda harus memutuskan apakah akan membiarkan pipeline berjalan, mungkin hingga pengujian menyeluruh berikutnya dijalankan, atau membatalkan pipeline pada titik yang menunjukkan penyelesaian pengujian sehingga Anda dapat memeriksa hasilnya.

Jenis data pengujian yang Anda gunakan memengaruhi keputusan ini. Misalnya, jika Anda menggunakan kumpulan data pengujian terbatas yang disediakan ke pipeline streaming, Anda dapat membatalkan pipeline saat semua elemen telah menyelesaikan pemrosesan. Atau, jika Anda menggunakan sumber data sebenarnya, seperti topik Pub/Sub yang ada yang digunakan dalam produksi, atau jika Anda menghasilkan data pengujian secara terus-menerus, sebaiknya pipeline pengujian tetap berjalan dalam periode yang lebih lama. Yang terakhir memungkinkan Anda membandingkan perilaku dengan lingkungan produksi, atau bahkan dengan pipeline pengujian lainnya.