Pemecahan masalah dan proses debug pipeline

Halaman ini memberikan tips pemecahan masalah dan strategi proses debug yang mungkin berguna jika Anda mengalami masalah dalam membangun atau menjalankan pipeline Dataflow. Informasi ini dapat membantu Anda mendeteksi kegagalan pipeline, menentukan alasan kegagalan pipeline dijalankan, dan menyarankan beberapa tindakan untuk memperbaiki masalah tersebut.

Diagram berikut menunjukkan alur kerja pemecahan masalah Dataflow yang dijelaskan di halaman ini.

Diagram yang menunjukkan alur kerja pemecahan masalah.

Dataflow memberikan masukan real-time tentang tugas Anda, dan ada serangkaian langkah dasar yang dapat Anda gunakan untuk memeriksa pesan error, log, serta kondisi seperti progres tugas yang terhenti.

Untuk mendapatkan panduan tentang error umum yang mungkin Anda alami saat menjalankan tugas Dataflow, lihat Memecahkan masalah error Dataflow. Untuk memantau dan memecahkan masalah performa pipeline, lihat Memantau performa pipeline.

Praktik terbaik untuk pipeline

Berikut adalah praktik terbaik untuk pipeline Java, Python, dan Go.

Java

  • Untuk tugas batch, sebaiknya tetapkan time to live (TTL) untuk lokasi sementara.

  • Sebelum menyiapkan TTL dan sebagai praktik terbaik umum, pastikan Anda menetapkan lokasi staging dan lokasi sementara ke lokasi yang berbeda.

  • Jangan menghapus objek di lokasi staging karena objek ini digunakan kembali.

  • Jika tugas telah selesai atau dihentikan dan objek sementara tidak dibersihkan, hapus file ini secara manual dari bucket Cloud Storage yang digunakan sebagai lokasi sementara.

Python

Lokasi sementara dan staging memiliki awalan <job_name>.<time>.

  • Pastikan Anda menetapkan lokasi staging dan lokasi sementara ke lokasi yang berbeda.

  • Jika diperlukan, hapus objek di lokasi staging setelah tugas selesai atau dihentikan. Selain itu, objek yang di-stage tidak digunakan kembali di pipeline Python.

  • Jika tugas berakhir dan objek sementara tidak dibersihkan, hapus file ini secara manual dari bucket Cloud Storage yang digunakan sebagai lokasi sementara.

  • Untuk tugas batch, sebaiknya tetapkan time to live (TTL) untuk lokasi staging dan sementara.

Go

  • Lokasi sementara dan staging memiliki awalan <job_name>.<time>.

  • Pastikan Anda menetapkan lokasi staging dan lokasi sementara ke lokasi yang berbeda.

  • Jika diperlukan, hapus objek di lokasi staging setelah tugas selesai atau dihentikan. Selain itu, objek yang di-stage tidak digunakan kembali di pipeline Go.

  • Jika tugas berakhir dan objek sementara tidak dibersihkan, hapus file ini secara manual dari bucket Cloud Storage yang digunakan sebagai lokasi sementara.

  • Untuk tugas batch, sebaiknya tetapkan time to live (TTL) untuk lokasi staging dan sementara.

Memeriksa status pipeline

Anda dapat mendeteksi error apa pun dalam pipeline yang berjalan menggunakan Antarmuka pemantauan Dataflow.

  1. Buka Konsol Google Cloud.
  2. Pilih project Google Cloud Anda dari daftar project.
  3. Di menu navigasi, di bagian Big Data, klik Dataflow. Daftar tugas yang sedang berjalan akan muncul di panel sebelah kanan.
  4. Pilih tugas pipeline yang ingin Anda lihat. Anda dapat melihat status tugas secara sekilas di kolom Status: "Berjalan", "Berhasil", atau "Gagal".
Daftar tugas Dataflow di Developers Console dengan tugas dalam status berjalan, berhasil, dan gagal.
Gambar 1: Daftar tugas Dataflow di Developers Console dengan tugas dalam status berjalan, berhasil, dan gagal.

Menemukan informasi tentang kegagalan pipeline

Jika salah satu tugas pipeline gagal, Anda dapat memilih tugas tersebut untuk melihat informasi yang lebih mendetail tentang error dan menjalankan hasilnya. Saat memilih tugas, Anda dapat melihat diagram utama untuk pipeline, grafik eksekusi, panel Info tugas, dan panel Logs dengan tab Log tugas, Log Pekerja, Diagnostik, dan Rekomendasi.

Memeriksa pesan error tugas

Untuk melihat Job Logs yang dibuat oleh kode pipeline dan layanan Dataflow, di panel Logs, klik Show.

Anda dapat memfilter pesan yang muncul di Log lowongan dengan mengklik Info dan Filter. Untuk menampilkan pesan error saja, klik Info dan pilih Error.

Untuk meluaskan pesan error, klik bagian yang dapat diperluas .

Panel log yang menampilkan log tugas dengan perluasan pesan error yang ditandai.

Atau, Anda dapat mengklik tab Diagnostik. Tab ini menunjukkan lokasi terjadinya error di sepanjang linimasa yang dipilih, jumlah semua error yang dicatat, dan rekomendasi yang memungkinkan untuk pipeline Anda.

Tab diagnostik dengan dua error yang dilaporkan.

Melihat log langkah untuk tugas Anda

Saat Anda memilih langkah di grafik pipeline, panel log akan beralih dari menampilkan Log Tugas yang dibuat oleh layanan Dataflow ke menampilkan log dari instance Compute Engine yang menjalankan langkah pipeline Anda.

Langkah pipeline yang dipilih dengan log pekerja langkah ditandai.

Cloud Logging menggabungkan semua log yang dikumpulkan dari instance Compute Engine project Anda di satu lokasi. Lihat Pesan pipeline logging untuk mengetahui informasi lebih lanjut tentang penggunaan berbagai kemampuan logging Dataflow.

Menangani penolakan pipeline otomatis

Dalam beberapa kasus, layanan Dataflow mengidentifikasi bahwa pipeline Anda mungkin memicu masalah SDK umum. Untuk mencegah pipeline yang mungkin mengalami masalah dikirim, Dataflow secara otomatis menolak pipeline Anda dan menampilkan pesan berikut:

The workflow was automatically rejected by the service because it might trigger an
identified bug in the SDK (details below). If you think this identification is
in error, and would like to override this automated rejection, please re-submit
this workflow with the following override flag: [OVERRIDE FLAG].
Bug details: [BUG DETAILS].
Contact Google Cloud Support for further help.
Please use this identifier in your communication: [BUG ID].

Setelah membaca peringatan dalam detail bug yang ditautkan, jika ingin tetap mencoba menjalankan pipeline, Anda dapat mengganti penolakan otomatis. Tambahkan tanda --experiments=<override-flag> dan kirim ulang pipeline Anda.

Menentukan penyebab kegagalan pipeline

Biasanya, kegagalan operasi pipeline Apache Beam dapat disebabkan oleh salah satu penyebab berikut:

  • Error konstruksi grafik atau pipeline. Error ini terjadi saat Dataflow mengalami masalah saat membuat grafik langkah-langkah yang menyusun pipeline, seperti yang dijelaskan oleh pipeline Apache Beam Anda.
  • Error dalam validasi tugas. Layanan Dataflow memvalidasi tugas pipeline apa pun yang Anda luncurkan. Error dalam proses validasi dapat mencegah tugas Anda berhasil dibuat atau dieksekusi. Error validasi dapat mencakup masalah pada bucket Cloud Storage project Google Cloud, atau pada izin project Anda.
  • Pengecualian dalam kode pekerja. Error ini terjadi saat ada error atau bug dalam kode yang diberikan pengguna yang didistribusikan Dataflow ke pekerja paralel, seperti instance DoFn transformasi ParDo.
  • Error yang disebabkan oleh kegagalan sementara di layanan Google Cloud lainnya. Pipeline Anda mungkin gagal karena terjadi pemadaman sementara atau masalah lain di layanan Google Cloud yang menjadi dependensi Dataflow, seperti Compute Engine atau Cloud Storage.

Mendeteksi kesalahan konstruksi grafik atau pipeline

Error konstruksi grafik dapat terjadi saat Dataflow membuat grafik eksekusi untuk pipeline dari kode dalam program Dataflow Anda. Selama waktu pembuatan grafik, Dataflow akan memeriksa operasi ilegal.

Jika Dataflow mendeteksi error dalam pembuatan grafik, ingatlah bahwa tidak ada tugas yang dibuat di layanan Dataflow. Oleh karena itu, Anda tidak melihat masukan apa pun di antarmuka pemantauan Dataflow. Namun, pesan error yang mirip dengan berikut ini akan muncul di jendela konsol atau terminal tempat Anda menjalankan pipeline Apache Beam:

Java

Misalnya, jika pipeline Anda mencoba melakukan agregasi seperti GroupByKey pada PCollection yang memiliki jendela global, tidak dipicu, dan tidak terikat, pesan error yang serupa dengan berikut akan muncul:

...
... Exception in thread "main" java.lang.IllegalStateException:
... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger.
... Use a Window.into or Window.triggering transform prior to GroupByKey
...

Python

Misalnya, jika pipeline Anda menggunakan petunjuk jenis dan jenis argumen dalam salah satu transformasi tidak seperti yang diharapkan, pesan error yang mirip dengan berikut ini akan muncul:

... in <module> run()
... in run | beam.Map('count', lambda (word, ones): (word, sum(ones))))
... in __or__ return self.pipeline.apply(ptransform, self)
... in apply transform.type_check_inputs(pvalueish)
... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input')
... in type_check_inputs_or_outputs pvalue_.element_type))
google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>

Go

Misalnya, jika pipeline Anda menggunakan `DoFn` yang tidak menerima input apa pun, akan muncul pesan error yang mirip dengan berikut ini:

... panic: Method ProcessElement in DoFn main.extractFn is missing all inputs. A main input is required.
... Full error:
...     inserting ParDo in scope root/CountWords
...     graph.AsDoFn: for Fn named main.extractFn
... ProcessElement method has no main inputs

... goroutine 1 [running]:
... github.com/apache/beam/sdks/v2/go/pkg/beam.MustN(...)
... (more stacktrace)

Jika mengalami error tersebut, periksa kode pipeline untuk memastikan bahwa operasi pipeline Anda sah.

Mendeteksi error dalam validasi tugas Dataflow

Setelah layanan Dataflow menerima grafik pipeline, layanan ini mencoba memvalidasi tugas Anda. Validasi ini mencakup hal berikut:

  • Memastikan layanan dapat mengakses bucket Cloud Storage terkait tugas Anda untuk staging file dan output sementara.
  • Memeriksa izin yang diperlukan di project Google Cloud Anda.
  • Memastikan layanan dapat mengakses sumber input dan output, seperti file.

Jika tugas Anda menggagalkan proses validasi, pesan error akan muncul di antarmuka pemantauan Dataflow, serta di jendela konsol atau terminal jika Anda menggunakan eksekusi pemblokiran. Pesan error terlihat mirip dengan berikut ini:

Java

INFO: To access the Dataflow monitoring console, please navigate to
  https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
  Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155)
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)

Python

INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477]
... Checking required Cloud APIs are enabled.
... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING.
... Combiner lifting skipped for step group: GroupByKey not followed by a combiner.
... Expanding GroupByKey operations into optimizable parts.
... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
... Annotating graph with Autotuner information.
... Fusing adjacent ParDo, Read, Write, and Flatten operations
... Fusing consumer split into read
...
... Starting 1 workers...
...
... Executing operation read+split+pair_with_one+group/Reify+group/Write
... Executing failure step failure14
... Workflow failed.
Causes: ... read+split+pair_with_one+group/Reify+group/Write failed.
Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt.
... Cleaning up.
... Tearing down pending resources...
INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.

Go

Validasi tugas yang dijelaskan di bagian ini saat ini tidak didukung untuk Go. Error akibat masalah ini muncul sebagai pengecualian pekerja.

Mendeteksi pengecualian dalam kode pekerja

Saat tugas berjalan, Anda mungkin menemukan error atau pengecualian dalam kode pekerja. Error ini umumnya berarti bahwa DoFn dalam kode pipeline Anda telah menghasilkan pengecualian yang tidak tertangani, yang mengakibatkan kegagalan tugas dalam tugas Dataflow Anda.

Pengecualian dalam kode pengguna (misalnya, instance DoFn) dilaporkan dalam Antarmuka pemantauan Dataflow. Jika Anda menjalankan pipeline dengan eksekusi pemblokiran, pesan error akan dicetak di jendela konsol atau terminal Anda, seperti berikut:

Java

INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461
Submitted job: 2017-05-23_14_02_46-1117850763061203461
...
... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461
... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461.
... The number of workers will be between 1 and 15.
... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461.
...
... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
... Workers have started successfully.
...
... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146)
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
        at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191)
...
... Cleaning up.
... Stopping worker pool...
... Worker pool stopped.

Python

INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
...
INFO:root:... Expanding GroupByKey operations into optimizable parts.
INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:... Annotating graph with Autotuner information.
INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations
...
INFO:root:...: Starting 1 workers...
INFO:root:...: Executing operation group/Create
INFO:root:...: Value "group/Session" materialized.
INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: ...: Workers have started successfully.
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: Traceback (most recent call last):
  File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task)
  ...
  File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda>
ValueError: invalid literal for int() with base 10: 'www'

Go

... 2022-05-26T18:32:52.752315397Zprocess bundle failed for instruction
...     process_bundle-4031463614776698457-2 using plan s02-6 : while executing
...     Process for Plan[s02-6] failed: Oh no! This is an error message!

Pertimbangkan untuk melindungi dari error dalam kode Anda dengan menambahkan pengendali pengecualian. Misalnya, jika Anda ingin menghapus elemen yang gagal saat melakukan beberapa validasi input kustom di ParDo, tangani pengecualian dalam DoFn dan hapus elemen tersebut.

Anda juga dapat melacak elemen yang gagal dengan beberapa cara:

  • Anda dapat membuat log elemen yang gagal dan memeriksa output menggunakan Cloud Logging.
  • Anda dapat memeriksa peringatan atau error pada log startup pekerja dan pekerja Dataflow dengan mengikuti petunjuk di bagian Melihat log.
  • Anda dapat meminta ParDo menulis elemen yang gagal ke output tambahan untuk diperiksa nanti.

Untuk melacak properti pipeline yang dieksekusi, Anda dapat menggunakan class Metrics, seperti ditunjukkan dalam contoh berikut:

Java

final Counter counter = Metrics.counter("stats", "even-items");
PCollection<Integer> input = pipeline.apply(...);
...
input.apply(ParDo.of(new DoFn<Integer, Integer>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    if (c.element() % 2 == 0) {
      counter.inc();
    }
});

Python

class FilterTextFn(beam.DoFn):
      """A DoFn that filters for a specific key based on a regex."""

      def __init__(self, pattern):
        self.pattern = pattern
        # A custom metric can track values in your pipeline as it runs. Create
        # custom metrics to count unmatched words, and know the distribution of
        # word lengths in the input PCollection.
        self.word_len_dist = Metrics.distribution(self.__class__,
                                                  'word_len_dist')
        self.unmatched_words = Metrics.counter(self.__class__,
                                               'unmatched_words')

      def process(self, element):
        word = element
        self.word_len_dist.update(len(word))
        if re.match(self.pattern, word):
          yield element
        else:
          self.unmatched_words.inc()

    filtered_words = (
        words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))

Go

func addMetricDoFnToPipeline(s beam.Scope, input beam.PCollection) beam.PCollection {
    return beam.ParDo(s, &MyMetricsDoFn{}, input)
}

func executePipelineAndGetMetrics(ctx context.Context, p *beam.Pipeline) (metrics.QueryResults, error) {
    pr, err := beam.Run(ctx, runner, p)
    if err != nil {
        return metrics.QueryResults{}, err
    }

    // Request the metric called "counter1" in namespace called "namespace"
    ms := pr.Metrics().Query(func(r beam.MetricResult) bool {
        return r.Namespace() == "namespace" && r.Name() == "counter1"
    })

    // Print the metric value - there should be only one line because there is
    // only one metric called "counter1" in the namespace called "namespace"
    for _, c := range ms.Counters() {
        fmt.Println(c.Namespace(), "-", c.Name(), ":", c.Committed)
    }
    return ms, nil
}

type MyMetricsDoFn struct {
    counter beam.Counter
}

func init() {
    beam.RegisterType(reflect.TypeOf((*MyMetricsDoFn)(nil)))
}

func (fn *MyMetricsDoFn) Setup() {
    // While metrics can be defined in package scope or dynamically
    // it's most efficient to include them in the DoFn.
    fn.counter = beam.NewCounter("namespace", "counter1")
}

func (fn *MyMetricsDoFn) ProcessElement(ctx context.Context, v beam.V, emit func(beam.V)) {
    // count the elements
    fn.counter.Inc(ctx, 1)
    emit(v)
}

Memecahkan masalah pipeline yang berjalan lambat atau kurangnya output

Lihat Memecahkan masalah tugas yang lambat dan macet.

Kesalahan umum dan tindakan yang diambil

Setelah Anda mengetahui error yang menyebabkan kegagalan pipeline, lihat halaman Memecahkan masalah error Dataflow untuk panduan pemecahan masalah error.