Python 用の Cloud クライアント ライブラリの使用

このチュートリアルに含まれる Cloud Shell チュートリアルでは、Python 用の Google Cloud クライアント ライブラリを使用して Dataproc gRPC API をプログラムで呼び出し、クラスタを作成してクラスタにジョブを送信します。

以下のセクションでは、GitHub の GoogleCloudPlatform/python-dataproc リポジトリに含まれるチュートリアル コードの操作について説明します。

Cloud Shell チュートリアルを実行する

[Cloud Shell で開く] をクリックしてチュートリアルを実行します。

Cloud Shell で開く

Python サンプルコードを理解する

アプリケーションのデフォルト認証情報

このチュートリアルの Cloud Shell チュートリアルで Google Cloud プロジェクトの認証情報を使用すると、認証ができます。コードをローカルで実行する場合は、サービス アカウント認証情報を使用してコードを認証することをおすすめします。

Dataproc クラスタを作成する

CreateCluster API を使用して新しい Dataproc クラスタを作成できます。

クラスタの作成時に次の値を指定する必要があります。

  1. クラスタを作成するプロジェクト
  2. クラスタの名前
  3. 使用するリージョン。global リージョンを指定する場合(グローバル リージョンを選択するには、チュートリアル コードで --global_region フラグを使用します)ゾーンも指定する必要があります(zone_uri をご覧ください)。非グローバル リージョンを指定して zone_uri フィールドを空白のままにすると、Dataproc の自動ゾーン プレースメントによりクラスタのゾーンが選択されます。
from google.cloud import dataproc_v1 as dataproc

def create_cluster(project_id, region, cluster_name):
    """This sample walks a user through creating a Cloud Dataproc cluster
    using the Python client library.

    Args:
        project_id (string): Project to use for creating resources.
        region (string): Region where the resources should live.
        cluster_name (string): Name to use for creating a cluster.
    """

    # Create a client with the endpoint set to the desired cluster region.
    cluster_client = dataproc.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    # Output a success message.
    print(f"Cluster created successfully: {result.cluster_name}")

デフォルトのクラスタ構成設定を上書きすることもできます。たとえば、ワーカーの数(デフォルト = 2)、プリエンプティブル VM(デフォルト = 0)を使用するかどうか、ネットワーク設定(デフォルト = default network)を指定できます。詳細については、CreateClusterRequest をご覧ください。

Dataproc クラスタの一覧表示

ListClusters API を呼び出して、プロジェクト内のクラスタを一覧表示できます。出力では、クラスタを一覧表示する JSON オブジェクトが返されます。JSON レスポンスをトランスバースして、クラスタの詳細を出力できます。

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(
        request={"project_id": project, "region": region}
    ):
        print(
            (
                "{} - {}".format(
                    cluster.cluster_name,
                    cluster.status.state.name,
                )
            )
        )

Dataproc クラスタへのジョブの送信

既存のクラスタにジョブを送信するには、SubmitJob API を使用します。

ジョブを送信するには、次の情報を指定する必要があります。

  1. ジョブを送信するクラスタの名前
  2. 使用するリージョン
  3. 送信されるジョブの種類(HadoopSpark、PySpark など)
  4. ジョブの詳細。送信されるジョブの種類については、詳しくは SubmitJobRequest をご覧ください。

次のコードは、Spark ジョブをクラスタに送信します。

import re

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

def submit_job(project_id, region, cluster_name):
    # Create the job client.
    job_client = dataproc.JobControllerClient(
        client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)}
    )

    # Create the job config. 'main_jar_file_uri' can also be a
    # Google Cloud Storage URL.
    job = {
        "placement": {"cluster_name": cluster_name},
        "spark_job": {
            "main_class": "org.apache.spark.examples.SparkPi",
            "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
            "args": ["1000"],
        },
    }

    operation = job_client.submit_job_as_operation(
        request={"project_id": project_id, "region": region, "job": job}
    )
    response = operation.result()

    # Dataproc job output gets saved to the Google Cloud Storage bucket
    # allocated to the job. Use a regex to obtain the bucket and blob info.
    matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

    output = (
        storage.Client()
        .get_bucket(matches.group(1))
        .blob(f"{matches.group(2)}.000000000")
        .download_as_string()
    )

    print(f"Job finished successfully: {output}")

デフォルトでは、チュートリアル コードにより、Spark に含まれる SparkPi ジョブの例が実行されます。

Dataproc クラスタの削除

クラスタを削除するには、DeleteCluster API を呼び出します。

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print("Tearing down cluster.")
    result = dataproc.delete_cluster(
        request={"project_id": project, "region": region, "cluster_name": cluster}
    )
    return result