Pemecahan masalah dan proses debug pipeline

Halaman ini memberikan tips pemecahan masalah dan strategi proses debug yang mungkin berguna jika Anda mengalami masalah saat mem-build atau menjalankan pipeline Dataflow. Informasi ini dapat membantu Anda mendeteksi kegagalan pipeline, menentukan alasan di balik kegagalan pengoperasian pipeline, dan menyarankan beberapa tindakan untuk memperbaiki masalah.

Diagram berikut menunjukkan alur kerja pemecahan masalah Dataflow yang dijelaskan di halaman ini.

Diagram yang menunjukkan alur kerja pemecahan masalah.

Dataflow memberikan masukan real-time tentang tugas Anda, dan ada serangkaian langkah dasar yang dapat Anda gunakan untuk memeriksa pesan error, log, dan kondisi seperti progres tugas Anda yang terhenti.

Untuk mendapatkan panduan tentang error umum yang mungkin Anda temui saat menjalankan tugas Dataflow, lihat Memecahkan masalah error Dataflow. Untuk memantau dan memecahkan masalah performa pipeline, lihat Memantau performa pipeline.

Praktik terbaik untuk pipeline

Berikut adalah praktik terbaik untuk pipeline Java, Python, dan Go.

Java

  • Untuk tugas batch, sebaiknya tetapkan time to live (TTL) untuk lokasi sementara.

  • Sebelum menyiapkan TTL dan sebagai praktik terbaik umum, pastikan Anda menetapkan lokasi staging dan lokasi sementara ke lokasi yang berbeda.

  • Jangan hapus objek di lokasi staging karena objek ini akan digunakan kembali.

  • Jika tugas selesai atau dihentikan dan objek sementara tidak dibersihkan, hapus file ini secara manual dari bucket Cloud Storage yang digunakan sebagai lokasi sementara.

Python

Lokasi sementara dan staging memiliki awalan <job_name>.<time>.

  • Pastikan Anda menetapkan lokasi staging dan lokasi sementara ke lokasi yang berbeda.

  • Jika diperlukan, hapus objek di lokasi staging setelah tugas selesai atau berhenti. Selain itu, objek yang di-stage tidak digunakan kembali di pipeline Python.

  • Jika tugas berakhir dan objek sementara tidak dibersihkan, hapus file ini secara manual dari bucket Cloud Storage yang digunakan sebagai lokasi sementara.

  • Untuk tugas batch, sebaiknya tetapkan time to live (TTL) untuk lokasi sementara dan staging.

Go

  • Lokasi sementara dan staging memiliki awalan <job_name>.<time>.

  • Pastikan Anda menetapkan lokasi staging dan lokasi sementara ke lokasi yang berbeda.

  • Jika diperlukan, hapus objek di lokasi staging setelah tugas selesai atau berhenti. Selain itu, objek yang di-stage tidak digunakan kembali di pipeline Go.

  • Jika tugas berakhir dan objek sementara tidak dibersihkan, hapus file ini secara manual dari bucket Cloud Storage yang digunakan sebagai lokasi sementara.

  • Untuk tugas batch, sebaiknya tetapkan time to live (TTL) untuk lokasi sementara dan staging.

Memeriksa status pipeline

Anda dapat mendeteksi error dalam operasi pipeline menggunakan antarmuka pemantauan Dataflow.

  1. Buka Konsol Google Cloud.
  2. Pilih project Google Cloud Anda dari daftar project.
  3. Di menu navigasi, pada bagian Big Data, klik Dataflow. Daftar tugas yang sedang berjalan akan muncul di panel sebelah kanan.
  4. Pilih tugas pipeline yang ingin Anda lihat. Anda dapat melihat status tugas secara sekilas di kolom Status: "Berjalan", "Berhasil", atau "Gagal".
Daftar tugas Dataflow di Konsol Play dengan tugas dalam status berjalan, berhasil, dan gagal.
Gambar 1: Daftar tugas Dataflow di Konsol Developer dengan tugas dalam status berjalan, berhasil, dan gagal.

Menemukan informasi tentang kegagalan pipeline

Jika salah satu tugas pipeline gagal, Anda dapat memilih tugas untuk melihat informasi yang lebih mendetail tentang error dan hasil yang dijalankan. Saat memilih tugas, Anda dapat melihat diagram utama untuk pipeline, grafik eksekusi, panel Info tugas, dan panel Log dengan tab Log tugas, Log pekerja, Diagnostik, dan Rekomendasi.

Memeriksa pesan error tugas

Untuk melihat Log Tugas yang dihasilkan oleh kode pipeline dan layanan Dataflow, di panel Log, klik Tampilkan.

Anda dapat memfilter pesan yang muncul di Log tugas dengan mengklik Info dan Filter. Untuk hanya menampilkan pesan error, klik Info, lalu pilih Error.

Untuk meluaskan pesan error, klik bagian yang dapat diluaskan .

Panel log yang menampilkan log tugas dengan perluasan pesan error yang ditandai.

Atau, Anda dapat mengklik tab Diagnostik. Tab ini menunjukkan tempat terjadinya error di sepanjang linimasa yang dipilih, jumlah semua error yang dicatat ke dalam log, dan kemungkinan rekomendasi untuk pipeline Anda.

Tab diagnostik dengan dua error yang dilaporkan.

Melihat log langkah untuk tugas Anda

Saat Anda memilih langkah dalam grafik pipeline, panel log akan beralih dari menampilkan Log Tugas yang dihasilkan oleh layanan Dataflow ke menampilkan log dari instance Compute Engine yang menjalankan langkah pipeline Anda.

Langkah pipeline yang dipilih dengan log pekerja langkah ditandai.

Cloud Logging menggabungkan semua log yang dikumpulkan dari instance Compute Engine project Anda di satu lokasi. Lihat Mencatat pesan pipeline ke dalam log untuk mengetahui informasi selengkapnya tentang cara menggunakan berbagai kemampuan logging Dataflow.

Menangani penolakan pipeline otomatis

Dalam beberapa kasus, layanan Dataflow mengidentifikasi bahwa pipeline Anda mungkin memicu masalah SDK yang diketahui. Untuk mencegah pengiriman pipeline yang kemungkinan akan mengalami masalah, Dataflow akan otomatis menolak pipeline Anda dan menampilkan pesan berikut:

The workflow was automatically rejected by the service because it might trigger an
identified bug in the SDK (details below). If you think this identification is
in error, and would like to override this automated rejection, please re-submit
this workflow with the following override flag: [OVERRIDE FLAG].
Bug details: [BUG DETAILS].
Contact Google Cloud Support for further help.
Please use this identifier in your communication: [BUG ID].

Setelah membaca peringatan dalam detail bug tertaut, jika Anda ingin mencoba menjalankan pipeline, Anda dapat mengganti penolakan otomatis. Tambahkan tanda --experiments=<override-flag> dan kirim ulang pipeline Anda.

Menentukan penyebab kegagalan pipeline

Biasanya, kegagalan pengoperasian pipeline Apache Beam dapat diatribusikan ke salah satu penyebab berikut:

  • Error konstruksi grafik atau pipeline. Error ini terjadi saat Dataflow mengalami masalah saat mem-build grafik langkah yang membuat pipeline, seperti yang dijelaskan oleh pipeline Apache Beam Anda.
  • Error dalam validasi tugas. Layanan Dataflow memvalidasi setiap tugas pipeline yang Anda luncurkan. Error dalam proses validasi dapat mencegah tugas Anda berhasil dibuat atau dijalankan. Error validasi dapat mencakup masalah pada bucket Cloud Storage project Google Cloud Anda, atau dengan izin project Anda.
  • Pengecualian dalam kode pekerja. Error ini terjadi saat ada error atau bug dalam kode yang disediakan pengguna yang didistribusikan Dataflow ke pekerja paralel, seperti instance DoFn dari transformasi ParDo.
  • Error yang disebabkan oleh kegagalan sementara di layanan Google Cloud lainnya. Pipeline Anda mungkin gagal karena pemadaman layanan sementara atau masalah lain di layanan Google Cloud yang menjadi dependensi Dataflow, seperti Compute Engine atau Cloud Storage.

Mendeteksi error konstruksi grafik atau pipeline

Error konstruksi grafik dapat terjadi saat Dataflow mem-build grafik eksekusi untuk pipeline Anda dari kode dalam program Dataflow. Selama waktu konstruksi grafik, Dataflow akan memeriksa operasi ilegal.

Jika Dataflow mendeteksi error dalam konstruksi grafik, perlu diingat bahwa tidak ada tugas yang dibuat di layanan Dataflow. Dengan demikian, Anda tidak akan melihat masukan apa pun di antarmuka pemantauan Dataflow. Sebagai gantinya, pesan error yang mirip dengan berikut ini akan muncul di konsol atau jendela terminal tempat Anda menjalankan pipeline Apache Beam:

Java

Misalnya, jika pipeline Anda mencoba melakukan agregasi seperti GroupByKey pada PCollection yang tidak terpicu, tanpa batas, dan berbingkai global, pesan error yang mirip dengan berikut akan muncul:

...
... Exception in thread "main" java.lang.IllegalStateException:
... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger.
... Use a Window.into or Window.triggering transform prior to GroupByKey
...

Python

Misalnya, jika pipeline Anda menggunakan petunjuk jenis dan jenis argumen di salah satu transformasi tidak seperti yang diharapkan, pesan error yang mirip dengan berikut akan terjadi:

... in <module> run()
... in run | beam.Map('count', lambda (word, ones): (word, sum(ones))))
... in __or__ return self.pipeline.apply(ptransform, self)
... in apply transform.type_check_inputs(pvalueish)
... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input')
... in type_check_inputs_or_outputs pvalue_.element_type))
google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>

Go

Misalnya, jika pipeline Anda menggunakan `DoFn` yang tidak menerima input apa pun, pesan error yang mirip dengan berikut akan muncul:

... panic: Method ProcessElement in DoFn main.extractFn is missing all inputs. A main input is required.
... Full error:
...     inserting ParDo in scope root/CountWords
...     graph.AsDoFn: for Fn named main.extractFn
... ProcessElement method has no main inputs

... goroutine 1 [running]:
... github.com/apache/beam/sdks/v2/go/pkg/beam.MustN(...)
... (more stacktrace)

Jika Anda mengalami error tersebut, periksa kode pipeline untuk memastikan bahwa operasi pipeline Anda sah.

Mendeteksi error dalam validasi tugas Dataflow

Setelah layanan Dataflow menerima grafik pipeline, layanan tersebut akan mencoba memvalidasi tugas Anda. Validasi ini mencakup hal-hal berikut:

  • Memastikan layanan dapat mengakses bucket Cloud Storage terkait tugas Anda untuk staging file dan output sementara.
  • Memeriksa izin yang diperlukan di project Google Cloud Anda.
  • Memastikan layanan dapat mengakses sumber input dan output, seperti file.

Jika tugas Anda gagal dalam proses validasi, pesan error akan muncul di antarmuka pemantauan Dataflow, serta di konsol atau jendela terminal jika Anda menggunakan eksekusi pemblokiran. Pesan error terlihat mirip dengan yang berikut ini:

Java

INFO: To access the Dataflow monitoring console, please navigate to
  https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
  Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155)
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)

Python

INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477]
... Checking required Cloud APIs are enabled.
... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING.
... Combiner lifting skipped for step group: GroupByKey not followed by a combiner.
... Expanding GroupByKey operations into optimizable parts.
... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
... Annotating graph with Autotuner information.
... Fusing adjacent ParDo, Read, Write, and Flatten operations
... Fusing consumer split into read
...
... Starting 1 workers...
...
... Executing operation read+split+pair_with_one+group/Reify+group/Write
... Executing failure step failure14
... Workflow failed.
Causes: ... read+split+pair_with_one+group/Reify+group/Write failed.
Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt.
... Cleaning up.
... Tearing down pending resources...
INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.

Go

Validasi tugas yang dijelaskan di bagian ini saat ini tidak didukung untuk Go. Error karena masalah ini muncul sebagai pengecualian pekerja.

Mendeteksi pengecualian dalam kode pekerja

Saat tugas berjalan, Anda mungkin mengalami error atau pengecualian dalam kode pekerja. Error ini biasanya berarti bahwa DoFn dalam kode pipeline Anda telah menghasilkan pengecualian yang tidak ditangani, yang mengakibatkan tugas gagal dalam tugas Dataflow Anda.

Pengecualian dalam kode pengguna (misalnya, instance DoFn Anda) dilaporkan di antarmuka pemantauan aliran data. Jika Anda menjalankan pipeline dengan eksekusi pemblokiran, pesan error akan dicetak di konsol atau jendela terminal, seperti berikut:

Java

INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461
Submitted job: 2017-05-23_14_02_46-1117850763061203461
...
... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461
... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461.
... The number of workers will be between 1 and 15.
... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461.
...
... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
... Workers have started successfully.
...
... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146)
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
        at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191)
...
... Cleaning up.
... Stopping worker pool...
... Worker pool stopped.

Python

INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
...
INFO:root:... Expanding GroupByKey operations into optimizable parts.
INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:... Annotating graph with Autotuner information.
INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations
...
INFO:root:...: Starting 1 workers...
INFO:root:...: Executing operation group/Create
INFO:root:...: Value "group/Session" materialized.
INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: ...: Workers have started successfully.
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: Traceback (most recent call last):
  File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task)
  ...
  File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda>
ValueError: invalid literal for int() with base 10: 'www'

Go

... 2022-05-26T18:32:52.752315397Zprocess bundle failed for instruction
...     process_bundle-4031463614776698457-2 using plan s02-6 : while executing
...     Process for Plan[s02-6] failed: Oh no! This is an error message!

Pertimbangkan untuk mencegah error dalam kode Anda dengan menambahkan pengendali pengecualian. Misalnya, jika Anda ingin menghapus elemen yang gagal dalam beberapa validasi input kustom yang dilakukan di ParDo, tangani pengecualian dalam DoFn dan hapus elemen tersebut.

Anda juga dapat melacak elemen yang gagal dengan beberapa cara:

  • Anda dapat mencatat ke dalam log elemen yang gagal dan memeriksa output menggunakan Cloud Logging.
  • Anda dapat memeriksa pekerja Dataflow dan log startup pekerja untuk menemukan peringatan atau error dengan mengikuti petunjuk di Melihat log.
  • Anda dapat meminta ParDo menulis elemen yang gagal ke output tambahan untuk pemeriksaan nanti.

Untuk melacak properti pipeline yang sedang berjalan, Anda dapat menggunakan class Metrics, seperti yang ditunjukkan dalam contoh berikut:

Java

final Counter counter = Metrics.counter("stats", "even-items");
PCollection<Integer> input = pipeline.apply(...);
...
input.apply(ParDo.of(new DoFn<Integer, Integer>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    if (c.element() % 2 == 0) {
      counter.inc();
    }
});

Python

class FilterTextFn(beam.DoFn):
      """A DoFn that filters for a specific key based on a regex."""

      def __init__(self, pattern):
        self.pattern = pattern
        # A custom metric can track values in your pipeline as it runs. Create
        # custom metrics to count unmatched words, and know the distribution of
        # word lengths in the input PCollection.
        self.word_len_dist = Metrics.distribution(self.__class__,
                                                  'word_len_dist')
        self.unmatched_words = Metrics.counter(self.__class__,
                                               'unmatched_words')

      def process(self, element):
        word = element
        self.word_len_dist.update(len(word))
        if re.match(self.pattern, word):
          yield element
        else:
          self.unmatched_words.inc()

    filtered_words = (
        words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))

Go

func addMetricDoFnToPipeline(s beam.Scope, input beam.PCollection) beam.PCollection {
    return beam.ParDo(s, &MyMetricsDoFn{}, input)
}

func executePipelineAndGetMetrics(ctx context.Context, p *beam.Pipeline) (metrics.QueryResults, error) {
    pr, err := beam.Run(ctx, runner, p)
    if err != nil {
        return metrics.QueryResults{}, err
    }

    // Request the metric called "counter1" in namespace called "namespace"
    ms := pr.Metrics().Query(func(r beam.MetricResult) bool {
        return r.Namespace() == "namespace" && r.Name() == "counter1"
    })

    // Print the metric value - there should be only one line because there is
    // only one metric called "counter1" in the namespace called "namespace"
    for _, c := range ms.Counters() {
        fmt.Println(c.Namespace(), "-", c.Name(), ":", c.Committed)
    }
    return ms, nil
}

type MyMetricsDoFn struct {
    counter beam.Counter
}

func init() {
    beam.RegisterType(reflect.TypeOf((*MyMetricsDoFn)(nil)))
}

func (fn *MyMetricsDoFn) Setup() {
    // While metrics can be defined in package scope or dynamically
    // it's most efficient to include them in the DoFn.
    fn.counter = beam.NewCounter("namespace", "counter1")
}

func (fn *MyMetricsDoFn) ProcessElement(ctx context.Context, v beam.V, emit func(beam.V)) {
    // count the elements
    fn.counter.Inc(ctx, 1)
    emit(v)
}

Memecahkan masalah pipeline yang berjalan lambat atau tidak ada output

Lihat Memecahkan masalah tugas yang lambat dan macet.

Error umum dan tindakan yang dapat dilakukan

Jika Anda mengetahui error yang menyebabkan kegagalan pipeline, lihat halaman Memecahkan masalah error Dataflow untuk mendapatkan panduan pemecahan masalah error.