本頁面提供的疑難排解提示和偵錯策略,可協助解決您在建構和執行 Dataflow 管道時遇到的問題。這些資訊可協助您偵測管道失敗、判定管道執行作業失敗背後的原因,以及提供一些可修正問題的建議做法。
下圖顯示本頁所述的 Dataflow 疑難排解工作流程。
Dataflow 提供與工作有關的即時回饋資訊和一套基本步驟,可用來檢查錯誤訊息、記錄及工作進度停滯等情況。
如需執行 Dataflow 工作時可能遇到之常見錯誤的指南,請參閱「排解 Dataflow 錯誤」。如要監控及排解管道效能問題,請參閱「監控管道效能」。
管道的最佳做法
以下是 Java、Python 和 Go 管道的最佳做法。
Java
如果是批次工作,建議您為臨時位置設定存留時間 (TTL)。
設定 TTL 前,請先將暫存位置和臨時位置設為不同的位置,這是最佳做法。
請勿刪除暫存位置中的物件,因為這些物件會重複使用。
如果工作完成或停止,但系統未清除暫時物件,請從做為暫時位置的 Cloud Storage 值區中手動移除這些檔案。
Python
臨時位置和暫存位置的前置字元都是 <job_name>.<time>
。
請務必將暫存位置和臨時位置設為不同的位置。
如有需要,請在作業完成或停止後,刪除暫存位置中的物件。此外,Python 管道不會重複使用暫存物件。
如果作業結束,但系統未清除暫存物件,請從做為暫存位置的 Cloud Storage bucket 中手動移除這些檔案。
如果是批次工作,建議您為臨時和暫存位置設定存留時間 (TTL)。
Go
臨時位置和暫存位置的前置字元都是
<job_name>.<time>
。請務必將暫存位置和臨時位置設為不同的位置。
如有需要,請在作業完成或停止後,刪除暫存位置中的物件。此外,Go 管道不會重複使用暫存物件。
如果作業結束,但系統未清除暫存物件,請從做為暫存位置的 Cloud Storage bucket 中手動移除這些檔案。
如果是批次工作,建議您為臨時和暫存位置設定存留時間 (TTL)。
檢查管道狀態
您可以使用 Dataflow 監控介面來偵測管道執行作業中的任何錯誤。
- 前往Google Cloud 控制台。
- 從專案清單中選取 Google Cloud 專案。
- 在導覽選單中,按一下「Big Data」(大數據) 下方的「Dataflow」(資料流)。 右側窗格會顯示正在執行的工作清單。
- 選取您要查看的管道工作。您可以在「Status」(狀態) 欄位中快速查看工作的狀態:「Running」(執行中)、「Succeeded」(成功) 或「Failed」(失敗)。

尋找管道失敗的相關資訊
如果其中一個管道工作失敗,您可以選取該工作,查看更多有關錯誤和執行結果的詳細資訊。選取工作後,您可以查看管道的主要圖表、執行圖、工作資訊面板,以及包含「工作記錄」、「工作人員記錄」、「診斷」和「建議」分頁的「記錄」面板。
檢查工作錯誤訊息
如要查看管道程式碼和 Dataflow 服務產生的「Job Logs」(工作記錄),請在「Logs」(記錄) 面板中按一下「Show」(顯示)segment。
如要篩選「工作記錄」中顯示的訊息,請按一下「資訊」arrow_drop_down和「篩選器」filter_list。如要只顯示錯誤訊息,請按一下「資訊」arrow_drop_down,然後選取「錯誤」。
如要展開錯誤訊息,請按一下可展開的部分 arrow_right。
或者,也可以按一下「診斷」分頁標籤。這個分頁會顯示所選時間軸上發生錯誤的位置、所有記錄錯誤的計數,以及管道的可能建議。
查看工作的步驟記錄
選取管道圖中的步驟時,記錄面板會從顯示 Dataflow 服務產生的工作記錄,切換為顯示執行管道步驟的 Compute Engine 執行個體記錄。
Cloud Logging 會將從專案 Compute Engine 執行個體收集的所有記錄集中在同一個位置。如要進一步瞭解如何使用 Dataflow 的各種記錄功能,請參閱「記錄管道訊息」。
處理自動拒絕管道動作
在某些情況下,Dataflow 服務會發現管道可能觸發已知的 SDK 問題。為避免提交可能會遇到問題的管道,Dataflow 會自動拒絕管道,並顯示下列訊息:
The workflow was automatically rejected by the service because it might trigger an identified bug in the SDK (details below). If you think this identification is in error, and would like to override this automated rejection, please re-submit this workflow with the following override flag: [OVERRIDE FLAG]. Bug details: [BUG DETAILS]. Contact Google Cloud Support for further help. Please use this identifier in your communication: [BUG ID].
閱讀連結之錯誤詳細資料中的警訊後,如果您仍要嘗試執行管道,則可以覆寫自動拒絕動作。新增 --experiments=<override-flag>
旗標,然後重新提交管道。
判斷管道失敗的原因
失敗的 Apache Beam 管道執行作業通常可歸因於下列其中一個原因:
- 圖形或管道建構錯誤。如果 Dataflow 在建構組成管道的步驟圖時發生問題 (如 Apache Beam 管道所述),就會發生這類錯誤。
- 工作驗證中的錯誤。Dataflow 服務會驗證您啟動的任何管道工作。驗證程序中的錯誤可能會使您無法成功建立或執行工作。驗證錯誤可能包含 Google Cloud 專案 Cloud Storage 值區方面的問題,或是專案權限方面的問題。
- 工作站程式碼中的例外狀況。當使用者提供的程式碼中有錯誤,且 Dataflow 將該程式碼發布到平行工作站時 (例如
ParDo
轉換的DoFn
執行個體),就會發生這類錯誤。 - 其他 Google Cloud 服務中暫時性失敗導致的錯誤。 Dataflow 仰賴的Google Cloud 服務 (例如 Compute Engine 或 Cloud Storage) 中發生暫時服務中斷或其他問題,而管道可能因此失敗。
偵測圖形或管道建構錯誤
當 Dataflow 從 Dataflow 程式中的程式碼建構管道的執行圖時,可能會發生圖形建構錯誤。在圖形建構期間,Dataflow 會檢查是否有非法作業。
如果 Dataflow 在圖形建構期間偵測到錯誤,請留意,系統將不會在 Dataflow 服務上建立任何工作。因此您不會在 Dataflow 監控介面中看到任何意見回饋。而是會在執行 Apache Beam 管道的主控台或終端機視窗中,看到與以下內容類似的錯誤訊息:
Java
舉例來說,如果管道嘗試在已設定全域時間區間、未觸發且不受限的 PCollection
上執行如 GroupByKey
的匯總,您會看到與以下內容類似的錯誤訊息:
... ... Exception in thread "main" java.lang.IllegalStateException: ... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. ... Use a Window.into or Window.triggering transform prior to GroupByKey ...
Python
舉例來說,如果管道使用類型提示,且其中一個轉換的引數類型未如預期,就會發生類似以下的錯誤訊息:
... in <module> run() ... in run | beam.Map('count', lambda (word, ones): (word, sum(ones)))) ... in __or__ return self.pipeline.apply(ptransform, self) ... in apply transform.type_check_inputs(pvalueish) ... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input') ... in type_check_inputs_or_outputs pvalue_.element_type)) google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>
Go
舉例來說,如果管道使用的 `DoFn` 未接收任何輸入內容,就會發生類似下列內容的錯誤訊息:
... panic: Method ProcessElement in DoFn main.extractFn is missing all inputs. A main input is required. ... Full error: ... inserting ParDo in scope root/CountWords ... graph.AsDoFn: for Fn named main.extractFn ... ProcessElement method has no main inputs ... goroutine 1 [running]: ... github.com/apache/beam/sdks/v2/go/pkg/beam.MustN(...) ... (more stacktrace)
萬一遇到這樣的錯誤,請檢查管道程式碼,確保管道的作業符合規定。
偵測 Dataflow 工作驗證中的錯誤
Dataflow 服務收到管道的圖形後,會嘗試驗證工作。此驗證包含以下項目:
- 確認服務可存取與工作相關聯 Cloud Storage 值區,以進行檔案暫存和暫時輸出。
- 檢查 Google Cloud 專案中的必要權限。
- 確認服務可存取輸入和輸出來源,例如檔案。
如果工作驗證程序失敗,Dataflow 監控介面會顯示錯誤訊息,而如果您是使用封鎖執行作業,也會在主控台或終端機視窗中看到錯誤訊息。錯誤訊息看起來與以下內容類似:
Java
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798 Submitted job: 2016-03-08_18_59_25-16868399470801620798 ... ... Starting 3 workers... ... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write ... Executing BigQuery import job "dataflow_job_16868399470801619475". ... Stopping worker pool... ... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed. Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed. Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404 ... Worker pool stopped. ... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run INFO: Job finished with status FAILED Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException: Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155) at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56) at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)
Python
INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477] ... Checking required Cloud APIs are enabled. ... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING. ... Combiner lifting skipped for step group: GroupByKey not followed by a combiner. ... Expanding GroupByKey operations into optimizable parts. ... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns ... Annotating graph with Autotuner information. ... Fusing adjacent ParDo, Read, Write, and Flatten operations ... Fusing consumer split into read ... ... Starting 1 workers... ... ... Executing operation read+split+pair_with_one+group/Reify+group/Write ... Executing failure step failure14 ... Workflow failed. Causes: ... read+split+pair_with_one+group/Reify+group/Write failed. Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt. ... Cleaning up. ... Tearing down pending resources... INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.
Go
Go 目前不支援本節所述的工作驗證。因這些問題而發生的錯誤會顯示為工作站例外狀況。
偵測工作站程式碼中的例外狀況
執行工作時,您可能會在工作站程式碼中遇到錯誤或例外狀況。這些錯誤通常表示管道程式碼中的 DoFn
已產生未處理的例外狀況,而導致 Dataflow 工作中的工作失敗。
Dataflow 監控介面會回報使用者程式碼 (例如 DoFn
執行個體) 中的例外狀況。如果您採用封鎖執行作業來執行管道,主控台或終端機視窗會顯示錯誤訊息,例如以下內容:
Java
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461 Submitted job: 2017-05-23_14_02_46-1117850763061203461 ... ... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461 ... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461. ... The number of workers will be between 1 and 15. ... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461. ... ... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write ... Workers have started successfully. ... ... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146) at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104) at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191) ... ... Cleaning up. ... Stopping worker pool... ... Worker pool stopped.
Python
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. ... INFO:root:... Expanding GroupByKey operations into optimizable parts. INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns INFO:root:... Annotating graph with Autotuner information. INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations ... INFO:root:...: Starting 1 workers... INFO:root:...: Executing operation group/Create INFO:root:...: Value "group/Session" materialized. INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. INFO:root:...: ...: Workers have started successfully. INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING. INFO:root:...: Traceback (most recent call last): File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task) ... File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda> ValueError: invalid literal for int() with base 10: 'www'
Go
... 2022-05-26T18:32:52.752315397Zprocess bundle failed for instruction ... process_bundle-4031463614776698457-2 using plan s02-6 : while executing ... Process for Plan[s02-6] failed: Oh no! This is an error message!
建議您新增例外狀況處理常式以防範程式碼中的錯誤。舉例來說,如果您想要捨棄使 ParDo
中執行的某些自訂輸入驗證作業失敗的元素,請在 DoFn
中處理例外狀況並捨棄元素。
您也可以透過幾個不同的方式來追蹤失敗的元素:
- 您可以記錄失敗的元素,並使用 Cloud Logging 檢查輸出。
- 按照「查看記錄檔」一文中的操作說明,檢查 Dataflow 工作站和工作站啟動記錄檔是否有警告或錯誤。
- 您可以讓
ParDo
將失敗的元素寫入到額外的輸出,以供日後檢查。
如要追蹤執行管道的屬性,可以使用 Metrics
類別,如下列範例所示:
Java
final Counter counter = Metrics.counter("stats", "even-items"); PCollection<Integer> input = pipeline.apply(...); ... input.apply(ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { if (c.element() % 2 == 0) { counter.inc(); } });
Python
class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regex.""" def __init__(self, pattern): self.pattern = pattern # A custom metric can track values in your pipeline as it runs. Create # custom metrics to count unmatched words, and know the distribution of # word lengths in the input PCollection. self.word_len_dist = Metrics.distribution(self.__class__, 'word_len_dist') self.unmatched_words = Metrics.counter(self.__class__, 'unmatched_words') def process(self, element): word = element self.word_len_dist.update(len(word)) if re.match(self.pattern, word): yield element else: self.unmatched_words.inc() filtered_words = ( words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))
Go
func addMetricDoFnToPipeline(s beam.Scope, input beam.PCollection) beam.PCollection { return beam.ParDo(s, &MyMetricsDoFn{}, input) } func executePipelineAndGetMetrics(ctx context.Context, p *beam.Pipeline) (metrics.QueryResults, error) { pr, err := beam.Run(ctx, runner, p) if err != nil { return metrics.QueryResults{}, err } // Request the metric called "counter1" in namespace called "namespace" ms := pr.Metrics().Query(func(r beam.MetricResult) bool { return r.Namespace() == "namespace" && r.Name() == "counter1" }) // Print the metric value - there should be only one line because there is // only one metric called "counter1" in the namespace called "namespace" for _, c := range ms.Counters() { fmt.Println(c.Namespace(), "-", c.Name(), ":", c.Committed) } return ms, nil } type MyMetricsDoFn struct { counter beam.Counter } func init() { beam.RegisterType(reflect.TypeOf((*MyMetricsDoFn)(nil))) } func (fn *MyMetricsDoFn) Setup() { // While metrics can be defined in package scope or dynamically // it's most efficient to include them in the DoFn. fn.counter = beam.NewCounter("namespace", "counter1") } func (fn *MyMetricsDoFn) ProcessElement(ctx context.Context, v beam.V, emit func(beam.V)) { // count the elements fn.counter.Inc(ctx, 1) emit(v) }
排解管道執行速度緩慢或缺少輸出的問題
請參閱「排解工作執行緩慢或停滯的問題」。
常見錯誤與可採取的動作
如果知道導致管道失敗的錯誤,請參閱「排解 Dataflow 錯誤」頁面,瞭解如何排解錯誤。