Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
이 페이지에서는 Cloud Composer 2를 사용해서 Google Cloud에서 Dataproc Serverless 워크로드를 실행하는 방법을 설명합니다.
다음 섹션의 예시에서는 Dataproc 서버리스 일괄 워크로드를 관리하는 연산자를 사용하는 방법을 보여줍니다. Dataproc 서버리스 Spark 일괄 워크로드를 만들고, 삭제하고, 나열하고, 가져오는 DAG에서 다음 연산자를 사용합니다.
Dataproc 서버리스 일괄 워크로드에 사용되는 연산자에 대해 DAG를 만듭니다.
커스텀 컨테이너 및 Dataproc Metastore를 사용하는 DAG를 만듭니다.
이러한 DAG에 대해 영구 기록 서버를 구성합니다.
시작하기 전에
Dataproc API 사용 설정:
콘솔
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
일괄 워크로드 파일의 위치를 선택합니다. 다음 옵션 중에서 사용할 수 있습니다.
- 이 파일을 저장하는 Cloud Storage 버킷을 만듭니다.
- 사용자 환경의 버킷을 사용합니다. 이 파일을 Airflow와 동기화할 필요가 없으므로
/dags
또는/data
폴더 외부에 별개의 하위 폴더를 만들 수 있습니다. 예를 들면/batches
입니다. - 기존 버킷을 사용합니다.
파일 및 Airflow 변수 설정
이 섹션에서는 이 튜토리얼에서 파일을 설정하고 Airflow 변수를 구성하는 방법을 보여줍니다.
버킷에 Dataproc Serverless Spark ML 워크로드 파일 업로드
이 튜토리얼의 워크로드는 pyspark 스크립트를 실행합니다.
모든 pyspark 스크립트를
spark-job.py
라는 로컬 파일에 저장합니다. 예를 들어 샘플 pyspark 스크립트를 사용할 수 있습니다.
Airflow 변수 설정
다음 섹션의 예시에는 Airflow 변수가 사용됩니다. Airflow에서 이러한 변수의 값을 설정하면 DAG 코드에서 이러한 값에 액세스할 수 있습니다.
이 튜토리얼의 예시에서는 다음 Airflow 변수를 사용합니다. 사용하는 예시에 따라 필요한 대로 설정할 수 있습니다.
DAG 코드에서 사용할 다음 Airflow 변수를 설정합니다.
project_id
: 프로젝트 ID.bucket_name
: 워크로드의 기본 Python 파일(spark-job.py
)이 있는 버킷의 URI입니다. 시작하기 전에에서 이 위치를 선택했습니다.phs_cluster
: 영구 기록 서버 클러스터 이름입니다. 영구 기록 서버 만들기를 수행할 때 이 변수를 설정합니다.image_name
: 커스텀 컨테이너 이미지(image:tag
)의 이름과 태그입니다. DataprocCreateBatchOperator와 함께 커스텀 컨테이너 이미지를 사용할 때 이 변수를 설정합니다.metastore_cluster
: Dataproc Metastore 서비스 이름입니다. DataprocCreateBatchOperator와 함께 Dataproc Metastore 서비스 사용 시 이 변수를 설정합니다.region_name
: Dataproc Metastore 서비스가 있는 리전입니다. DataprocCreateBatchOperator와 함께 Dataproc Metastore 서비스 사용 시 이 변수를 설정합니다.
Google Cloud 콘솔 및 Airflow UI를 사용하여 각 Airflow 변수 설정
Google Cloud 콘솔에서 환경 페이지로 이동합니다.
환경 목록에서 해당 환경의 Airflow 링크를 클릭합니다. Airflow UI가 열립니다.
Airflow UI에서 관리 > 변수를 선택합니다.
새 레코드 추가를 클릭합니다.
키 필드에 변수 이름을 지정하고 값 필드에 해당 값을 설정합니다.
저장을 클릭합니다.
영구 기록 서버 만들기
영구 기록 서버(PHS)를 사용하여 일괄 워크로드의 Spark 기록 파일을 확인합니다.
- 영구 기록 서버 만들기
phs_cluster
Airflow 변수에 PHS 클러스터의 이름을 지정했는지 확인합니다.
DataprocCreateBatchOperator
다음 DAG는 Dataproc 서버리스 일괄 워크로드를 시작합니다.
DataprocCreateBatchOperator
인수에 대한 자세한 내용은 연산자의 소스 코드를 참조하세요.
DataprocCreateBatchOperator
의 batch
매개변수에 전달할 수 있는 속성에 대한 자세한 내용은 Batch 클래스 설명을 참조하세요.
DataprocCreateBatchOperator와 함께 커스텀 컨테이너 이미지 사용
다음 예시에서는 커스텀 컨테이너 이미지를 사용하여 워크로드를 실행하는 방법을 보여줍니다. 예를 들어 커스텀 컨테이너를 사용하면 기본 컨테이너 이미지로 제공되지 않는 Python 종속 항목을 추가할 수 있습니다.
커스텀 컨테이너 이미지를 사용하려면 다음 안내를 따르세요.
image_name
Airflow 변수에 이미지를 지정합니다.커스텀 이미지에서 DataprocCreateBatchOperator를 사용합니다.
DataprocCreateBatchOperator와 함께 Dataproc Metastore 서비스 사용
DAG에서 Dataproc Metastore 서비스를 사용하려면 다음 안내를 따르세요.
메타스토어 서비스가 이미 시작되었는지 확인합니다.
메타스토어 서비스 시작에 대한 자세한 내용은 Dataproc Metastore 사용 설정 및 사용 중지를 참조하세요.
구성을 만드는 일괄 연산자에 대한 자세한 내용은 PeripheralsConfig를 참조하세요.
메타스토어 서비스가 작동되어 실행되면
metastore_cluster
변수에 이름을 지정하고region_name
Airflow 변수에 리전을 지정합니다.DataprocCreateBatchOperator에서 메타스토어 서비스를 사용합니다.
DataprocDeleteBatchOperator
DataprocDeleteBatchOperator를 사용하여 워크로드의 배치 ID를 기반으로 배치를 삭제할 수 있습니다.
DataprocListBatchesOperator
DataprocDeleteBatchOperator는 지정된 project_id 및 리전 내에 존재하는 배치를 나열합니다.
DataprocGetBatchOperator
DataprocGetBatchOperator는 특정 일괄 워크로드 하나를 가져옵니다.