Cloud Composer 3 | Cloud Composer 2 | Cloud Composer�
本頁說明如何使用 Cloud Composer 2 在Google Cloud上執行 Serverless for Apache Spark 工作負載。
以下各節的範例說明如何使用運算子管理 Serverless for Apache Spark 批次工作負載。您可以在 DAG 中使用這些運算子,建立、刪除、列出及取得 Serverless for Apache Spark 批次工作負載:
- 為與 Serverless for Apache Spark 批次工作負載搭配使用的運算子建立 DAG: 
- 建立使用自訂容器和 Dataproc Metastore 的 DAG。 
- 為這些 DAG 設定永久記錄伺服器。 
事前準備
- 啟用 Dataproc API: - 主控台- Enable the Dataproc API. - Roles required to enable APIs - To enable APIs, you need the Service Usage Admin IAM role ( - roles/serviceusage.serviceUsageAdmin), which contains the- serviceusage.services.enablepermission. Learn how to grant roles.- gcloud- Enable the Dataproc API: - Roles required to enable APIs - To enable APIs, you need the Service Usage Admin IAM role ( - roles/serviceusage.serviceUsageAdmin), which contains the- serviceusage.services.enablepermission. Learn how to grant roles.- gcloud services enable dataproc.googleapis.com 
- 選取批次工作負載檔案的位置。你可以使用下列任一選項: - 建立 Cloud Storage bucket,用來儲存這個檔案。
- 使用環境的值區。由於您不需要將這個檔案與 Airflow 同步處理,因此可以在 /dags或/data資料夾外建立個別的子資料夾。例如:/batches。
- 使用現有 bucket。
 
設定檔案和 Airflow 變數
本節說明如何設定檔案,以及設定本教學課程的 Airflow 變數。
將 Serverless for Apache Spark ML 工作負載檔案上傳至 bucket
本教學課程中的工作負載會執行 pyspark 指令碼:
- 將任何 pyspark 指令碼儲存至名為 - spark-job.py的本機檔案。舉例來說,您可以使用範例 pyspark 指令碼。
- 將檔案上傳至「開始前」選取的儲存位置。 
設定 Airflow 變數
下列各節的範例會使用 Airflow 變數。您可以在 Airflow 中設定這些變數的值,然後 DAG 程式碼就能存取這些值。
本教學課程中的範例使用下列 Airflow 變數。您可以視需要設定這些值,具體取決於您使用的範例。
設定下列 Airflow 變數,供 DAG 程式碼使用:
- project_id:專案 ID。
- bucket_name:bucket 的 URI,其中包含工作負載的主要 Python 檔案 (- spark-job.py)。您在「事前準備」中選取了這個位置。
- phs_cluster:永久記錄伺服器叢集名稱。您會在建立永久記錄伺服器時設定這個變數。
- image_name:自訂容器映像檔的名稱和標記 (- image:tag)。使用 DataprocCreateBatchOperator 自訂容器映像檔時,您會設定這個變數。
- metastore_cluster:Dataproc Metastore 服務名稱。使用 DataprocCreateBatchOperator 時,您會使用 Dataproc Metastore 服務,並設定這個變數。
- region_name:Dataproc Metastore 服務所在的區域。使用 DataprocCreateBatchOperator 時,您會使用 Dataproc Metastore 服務,並設定這個變數。
使用 Google Cloud 控制台和 Airflow 使用者介面設定每個 Airflow 變數
- 前往 Google Cloud 控制台的「Environments」頁面。 
- 在環境清單中,按一下環境的「Airflow」Airflow連結。Airflow UI 隨即開啟。 
- 在 Airflow UI 中,依序選取「Admin」>「Variables」。 
- 按一下「新增記錄」。 
- 在「鍵」欄位中指定變數名稱,並在「值」欄位中設定變數值。 
- 按一下 [儲存]。 
建立永久記錄伺服器
使用永久記錄伺服器 (PHS) 查看批次工作負載的 Spark 記錄檔:
- 建立永久記錄伺服器。
- 請確認您已在 phs_clusterAirflow 變數中指定 PHS 叢集的名稱。
DataprocCreateBatchOperator
下列 DAG 會啟動 Serverless for Apache Spark 批次工作負載。
如要進一步瞭解 DataprocCreateBatchOperator 引數,請參閱運算子的原始碼。
如要進一步瞭解可在 DataprocCreateBatchOperator 的 batch 參數中傳遞的屬性,請參閱 Batch 類別的說明。
使用 DataprocCreateBatchOperator 搭配自訂容器映像檔
以下範例說明如何使用自訂容器映像檔執行工作負載。舉例來說,您可以使用自訂容器,新增預設容器映像檔未提供的 Python 依附元件。
如要使用自訂容器映像檔,請按照下列步驟操作:
- 在 - image_nameAirflow 變數中指定映像檔。
- 使用 DataprocCreateBatchOperator 搭配自訂映像檔: 
使用 DataprocCreateBatchOperator 搭配 Dataproc Metastore 服務
如要從 DAG 使用 Dataproc Metastore 服務,請按照下列步驟操作:
- 確認 Metastore 服務是否已啟動。 - 如要瞭解如何啟動 Metastore 服務,請參閱「啟用及停用 Dataproc Metastore」。 - 如要進一步瞭解如何使用 Batch 運算子建立設定,請參閱「PeripheralsConfig」。 
- Metastore 服務啟動並執行後,請在 - metastore_cluster變數中指定服務名稱,並在- region_nameAirflow 變數中指定服務區域。
- 在 DataprocCreateBatchOperator 中使用中繼存放區服務: 
DataprocDeleteBatchOperator
您可以使用 DataprocDeleteBatchOperator,根據工作負載的批次 ID 刪除批次。
DataprocListBatchesOperator
DataprocDeleteBatchOperator 會列出特定 project_id 和區域中的批次。
DataprocGetBatchOperator
DataprocGetBatchOperator 會擷取特定批次工作負載。