Python 用の Cloud クライアント ライブラリの使用

このチュートリアルに含まれる Cloud Shell チュートリアルでは、Python 用の Google Cloud クライアント ライブラリを使用して Dataproc gRPC API をプログラムで呼び出し、クラスタを作成してクラスタにジョブを送信します。

以下のセクションでは、GitHub の GoogleCloudPlatform/python-dataproc リポジトリに含まれるチュートリアル コードの操作について説明します。

Cloud Shell チュートリアルを実行する

[Cloud Shell で開く] をクリックしてチュートリアルを実行します。

Cloud Shell で開く

コードを理解する

アプリケーションのデフォルト認証情報

このチュートリアルの Cloud Shell チュートリアルでは、Google Cloud プロジェクトの認証情報を使用して認証が行われます。コードをローカルで実行する場合は、サービス アカウント認証情報を使用してコードを認証することをおすすめします。

Dataproc クラスタを作成する

クラスタを作成するために、次の値が設定されています。

  • クラスタを作成するプロジェクト
  • クラスタが作成されるリージョン
  • クラスタの名前
  • クラスタ構成ファイル。これは、1 つのマスター ワーカーと 2 つのプライマリ ワーカーを指定します。

残りのクラスタ設定には、デフォルトの構成設定が使用されます。デフォルトのクラスタ構成設定を上書きすることができます。たとえば、セカンダリ VM(デフォルト = 0)を追加したり、クラスタにデフォルト以外の VPC ネットワークを指定したりできます。詳細については、CreateCluster をご覧ください。

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

ジョブの送信

ジョブを送信するために、次の値が設定されています。

  • クラスタを作成するプロジェクト
  • クラスタが作成されるリージョン
  • ジョブ構成ファイル。これは、クラスタ名と PySpark ジョブの Cloud Storage ファイルパス(URI)を指定します。

詳細については、SubmitJob をご覧ください。

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

クラスタの削除

クラスタを削除するために、次の値が設定されています。

  • クラスタを作成するプロジェクト
  • クラスタが作成されるリージョン
  • クラスタの名前

詳細については、DeleteCluster をご覧ください。

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")