Python용 Cloud 클라이언트 라이브러리 사용

이 가이드에는 Python용 Google Cloud 클라이언트 라이브러리를 사용해서 Dataproc gRPC API를 프로그래매틱 방식으로 호출하여 클러스터를 만들고 클러스터에 작업을 제출하는 Cloud Shell 둘러보기가 포함되어 있습니다.

다음 섹션에서는 GitHub GoogleCloudPlatform/python-dataproc 저장소에 포함된 둘러보기 코드의 작동을 설명합니다.

Cloud Shell 둘러보기 실행

Cloud Shell에서 열기를 클릭하여 둘러보기를 실행합니다.

Cloud Shell에서 열기

코드 이해하기

애플리케이션 기본 사용자 인증 정보

이 튜토리얼의 Cloud Shell 둘러보기는 Google Cloud 프로젝트 사용자 인증 정보를 사용하여 인증을 제공합니다. 코드를 로컬로 실행할 때는 서비스 계정 사용자 인증 정보를 사용하여 코드를 인증하는 것이 좋습니다.

Dataproc 클러스터 만들기

클러스터를 만들도록 다음 값이 설정됩니다.

  • 클러스터를 만들 프로젝트
  • 클러스터가 생성될 리전
  • 클러스터 이름
  • 마스터 구성 1개와 기본 작업자 2개를 지정하는 클러스터 구성

기본 구성 설정은 나머지 클러스터 설정에 사용됩니다. 사용자는 기본 클러스터 구성 설정을 재정의할 수 있습니다. 예를 들어 보조 VM(기본값 = 0)을 추가하거나 클러스터에 기본이 아닌 VPC 네트워크를 지정할 수 있습니다. 자세한 내용은 CreateCluster를 참조하세요.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

작업 제출

작업을 제출하도록 설정된 값은 다음과 같습니다.

  • 클러스터를 만들 프로젝트
  • 클러스터가 생성될 리전
  • PySpark 작업의 클러스터 이름과 Cloud Storage 파일 경로(URI)를 지정하는 작업 구성

자세한 내용은 SubmitJob을 참조하세요.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

클러스터 삭제

클러스터를 삭제하도록 다음 값이 설정됩니다.

  • 클러스터를 만들 프로젝트
  • 클러스터가 생성될 리전
  • 클러스터 이름

자세한 내용은 DeleteCluster를 참조하세요.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")