Python 用の Cloud クライアント ライブラリの使用

このチュートリアルに含まれる Cloud Shell チュートリアルでは、Python 用の Google Cloud クライアント ライブラリを使用して Cloud Dataproc gRPC API をプログラムで呼び出し、クラスタを作成してクラスタにジョブを送信します。

以下のセクションでは、GitHub の GoogleCloudPlatform/python-docs-samples/dataproc リポジトリに含まれるチュートリアル コードの操作について説明します。

Cloud Shell チュートリアルを実行する

[Google Cloud Shell で開く] をクリックしてチュートリアルを実行します。

Cloud Shell でこのプロジェクトを開く

Python サンプルコードを理解する

アプリケーションのデフォルト認証情報

このチュートリアルの Cloud Shell チュートリアルでは、GCP プロジェクトの認証情報を使用して認証されます。コードをローカルで実行する場合は、サービス アカウント認証情報を使用してコードを認証することをおすすめします。

クラスタとジョブのクライアントを取得する

チュートリアル コードを実行するには、2 つの Google Cloud Dataproc API クライアントclusters gRPC API を呼び出す ClusterControllerClientjobs gRPC API を呼び出す JobControllerClient)が必要です。ジョブを作成して実行するクラスタが Cloud Dataproc グローバル リージョンにある場合、コードではデフォルトの gRPC エンドポイントが使用されます。クラスタ リージョンが非グローバルの場合、リージョンの gRPC エンドポイントが使用されます。

if global_region:
    region = 'global'
    # Use the default gRPC global endpoints.
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
    dataproc_job_client = dataproc_v1.JobControllerClient()
else:
    region = get_region_from_zone(zone)
    # Use a regional gRPC endpoint. See:
    # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
    client_transport = (
        cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(region)))
    job_transport = (
        job_controller_grpc_transport.JobControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(region)))
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
        client_transport)
    dataproc_job_client = dataproc_v1.JobControllerClient(job_transport)

Cloud Dataproc クラスタの一覧表示

プロジェクト内のクラスタを一覧表示するには、ListClusters API を呼び出します。出力でクラスタを一覧表示する JSON オブジェクトが返されます。JSON レスポンスをトランスバースして、クラスタの詳細を出力できます。

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(project, region):
        print(('{} - {}'.format(cluster.cluster_name,
                                cluster.status.State.Name(
                                    cluster.status.state))))

Cloud Dataproc クラスタの作成

新しい Cloud Dataproc クラスタを作成するには、CreateCluster API を使用します。

クラスタの作成時に次の値を指定する必要があります。

  1. クラスタを作成するプロジェクト
  2. クラスタの名前
  3. 使用するリージョン。global リージョンを指定する場合(グローバル リージョンを選択するには、チュートリアル コードで --global_region フラグを使用します)、ゾーンも指定する必要があります(zone_uri をご覧ください)。非グローバル リージョンを指定して zone_uri フィールドを空白のままにすると、Cloud Dataproc の自動ゾーン プレースメントによりクラスタのゾーンが選択されます。
def create_cluster(dataproc, project, zone, region, cluster_name):
    """Create the cluster."""
    print('Creating cluster...')
    zone_uri = \
        'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
            project, zone)
    cluster_data = {
        'project_id': project,
        'cluster_name': cluster_name,
        'config': {
            'gce_cluster_config': {
                'zone_uri': zone_uri
            },
            'master_config': {
                'num_instances': 1,
                'machine_type_uri': 'n1-standard-1'
            },
            'worker_config': {
                'num_instances': 2,
                'machine_type_uri': 'n1-standard-1'
            }
        }
    }

    cluster = dataproc.create_cluster(project, region, cluster_data)
    cluster.add_done_callback(callback)
    global waiting_callback
    waiting_callback = True

デフォルトのクラスタ構成設定を上書きすることもできます。たとえば、ワーカーの数(デフォルト = 2)、プリエンプティブ VM を使用するかどうか(デフォルト = 0)、ネットワーク設定(デフォルト = default network)などを指定できます。詳細については、CreateClusterRequest をご覧ください。

Cloud Dataproc クラスタへのジョブの送信

既存のクラスタにジョブを送信するには、SubmitJob API を使用します。

ジョブを送信するには、次の情報を指定する必要があります。

  1. ジョブを送信するクラスタの名前
  2. 使用する地域
  3. 送信するジョブのタイプ(HadoopSpark、PySpark など)
  4. ジョブの詳細。送信されるジョブの種類については、詳しくは SubmitJobRequest をご覧ください。

次のコードは、PySpark ジョブをクラスタに送信します。

def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name,
                       filename):
    """Submit the Pyspark job to the cluster (assumes `filename` was uploaded
    to `bucket_name."""
    job_details = {
        'placement': {
            'cluster_name': cluster_name
        },
        'pyspark_job': {
            'main_python_file_uri': 'gs://{}/{}'.format(bucket_name, filename)
        }
    }

    result = dataproc.submit_job(
        project_id=project, region=region, job=job_details)
    job_id = result.reference.job_id
    print('Submitted job ID {}.'.format(job_id))
    return job_id

デフォルトでは、チュートリアル コードは次の小さな PySpark ジョブを実行します。

import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)

ジョブは非同期で実行されるため、出力が表示される前にジョブが完了する必要があります。ジョブが実行されている間に GetJob を呼び出すと、ジョブの完了後に JobStatusジョブの詳細を取得できます。

ジョブ ステータスとジョブの詳細の取得

次の必須情報を使用して GetJobRequest を作成します。

  1. ジョブが送信されるクラスタのプロジェクト
  2. クラスタ リージョン
  3. ジョブ ID(UUID)

次のコードはジョブのステータスを確認し、ジョブの完了時にジョブの詳細を返します。

def wait_for_job(dataproc, project, region, job_id):
    """Wait for job to complete or error out."""
    print('Waiting for job to finish...')
    while True:
        job = dataproc.get_job(project, region, job_id)
        # Handle exceptions
        if job.status.State.Name(job.status.state) == 'ERROR':
            raise Exception(job.status.details)
        elif job.status.State.Name(job.status.state) == 'DONE':
            print('Job finished.')
            return job

Cloud Dataproc クラスタを削除する

クラスタを削除するには、DeleteCluster API を呼び出します。

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print('Tearing down cluster.')
    result = dataproc.delete_cluster(
        project_id=project, region=region, cluster_name=cluster)
    return result

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Dataproc ドキュメント
ご不明な点がありましたら、Google のサポートページをご覧ください。