Python 用の Cloud クライアント ライブラリの使用

このチュートリアルに含まれる Cloud Shell チュートリアルでは、Python 用の Google Cloud クライアント ライブラリを使用して Dataproc gRPC API をプログラムで呼び出し、クラスタを作成してクラスタにジョブを送信します。

以下のセクションでは、GitHub の GoogleCloudPlatform/python-docs-samples/dataproc リポジトリに含まれるチュートリアル コードの操作について説明します。

Cloud Shell チュートリアルを実行する

[Google Cloud Shell で開く] をクリックしてチュートリアルを実行します。

Cloud Shell でこのプロジェクトを開く

Python サンプルコードを理解する

アプリケーションのデフォルト認証情報

このチュートリアルの Cloud Shell チュートリアルで Google Cloud プロジェクトの認証情報を使用すると、認証ができます。コードをローカルで実行する場合は、サービス アカウント認証情報を使用してコードを認証することをおすすめします。

クラスタとジョブのクライアントを取得する

チュートリアル コードを実行するには、2 つのGoogle Cloud Dataproc API クライアントclusters gRPC API を呼び出す ClusterControllerClientjobs gRPC API を呼び出す JobControllerClient)が必要です。ジョブを作成して実行するクラスタが Dataproc グローバル リージョンにある場合、コードではデフォルトの gRPC エンドポイントが使用されます。クラスタ リージョンが非グローバルの場合、リージョンの gRPC エンドポイントが使用されます。

if global_region:
    region = 'global'
    # Use the default gRPC global endpoints.
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
    dataproc_job_client = dataproc_v1.JobControllerClient()
else:
    region = get_region_from_zone(zone)
    # Use a regional gRPC endpoint. See:
    # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
    client_transport = (
        cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(region)))
    job_transport = (
        job_controller_grpc_transport.JobControllerGrpcTransport(
            address='{}-dataproc.googleapis.com:443'.format(region)))
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
        client_transport)
    dataproc_job_client = dataproc_v1.JobControllerClient(job_transport)

Dataproc クラスタの一覧表示

ListClusters API を呼び出して、プロジェクト内のクラスタを一覧表示できます。出力では、クラスタを一覧表示する JSON オブジェクトが返されます。JSON レスポンスをトランスバースして、クラスタの詳細を出力できます。

def list_clusters_with_details(dataproc, project, region):
    """List the details of clusters in the region."""
    for cluster in dataproc.list_clusters(project, region):
        print(('{} - {}'.format(cluster.cluster_name,
                                cluster.status.State.Name(
                                    cluster.status.state))))

Dataproc クラスタを作成する

CreateCluster API を使用して新しい Dataproc クラスタを作成できます。

クラスタの作成時に次の値を指定する必要があります。

  1. クラスタを作成するプロジェクト
  2. クラスタの名前
  3. 使用するリージョン。global リージョンを指定する場合(グローバル リージョンを選択するには、チュートリアル コードで --global_region フラグを使用します)ゾーンも指定する必要があります(zone_uri をご覧ください)。非グローバル リージョンを指定して zone_uri フィールドを空白のままにすると、Dataproc の自動ゾーン プレースメントによりクラスタのゾーンが選択されます。
def create_cluster(dataproc, project, zone, region, cluster_name):
    """Create the cluster."""
    print('Creating cluster...')
    zone_uri = \
        'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
            project, zone)
    cluster_data = {
        'project_id': project,
        'cluster_name': cluster_name,
        'config': {
            'gce_cluster_config': {
                'zone_uri': zone_uri
            },
            'master_config': {
                'num_instances': 1,
                'machine_type_uri': 'n1-standard-1'
            },
            'worker_config': {
                'num_instances': 2,
                'machine_type_uri': 'n1-standard-1'
            }
        }
    }

    cluster = dataproc.create_cluster(project, region, cluster_data)
    cluster.add_done_callback(callback)
    global waiting_callback
    waiting_callback = True

デフォルトのクラスタ構成設定を上書きすることもできます。たとえば、ワーカーの数(デフォルト = 2)、プリエンプティブル VM(デフォルト = 0)を使用するかどうか、ネットワーク設定(デフォルト = default network)を指定できます。詳細については、CreateClusterRequest をご覧ください。

Dataproc クラスタへのジョブの送信

既存のクラスタにジョブを送信するには、SubmitJob API を使用します。ジョブを送信した場合、非同期で実行されます。

ジョブを送信するには、次の情報を指定する必要があります。

  1. ジョブを送信するクラスタの名前
  2. 使用するリージョン
  3. 送信されるジョブの種類(HadoopSpark、PySpark など)
  4. ジョブの詳細。送信されるジョブの種類については、詳しくは SubmitJobRequest をご覧ください。

次のコードは、PySpark ジョブ をクラスタに送信します。

def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name,
                       filename):
    """Submit the Pyspark job to the cluster (assumes `filename` was uploaded
    to `bucket_name."""
    job_details = {
        'placement': {
            'cluster_name': cluster_name
        },
        'pyspark_job': {
            'main_python_file_uri': 'gs://{}/{}'.format(bucket_name, filename)
        }
    }

    result = dataproc.submit_job(
        project_id=project, region=region, job=job_details)
    job_id = result.reference.job_id
    print('Submitted job ID {}.'.format(job_id))
    return job_id

デフォルトでは、チュートリアル コードは次の小さな PySpark ジョブを実行します。

import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)

ジョブは非同期で実行されるため、出力が表示される前にジョブが完了する必要があります。ジョブが実行されている間に GetJob を呼び出すと、ジョブの完了後に JobStatusジョブの詳細を取得できます。

ジョブ ステータスとジョブの詳細の取得

次の必須情報を使用して GetJobRequest を作成します。

  1. ジョブが送信されるクラスタのプロジェクト
  2. クラスタ リージョン
  3. ジョブ ID(UUID)

次のコードはジョブのステータスを確認し、ジョブの完了時にジョブの詳細を返します。

def wait_for_job(dataproc, project, region, job_id):
    """Wait for job to complete or error out."""
    print('Waiting for job to finish...')
    while True:
        job = dataproc.get_job(project, region, job_id)
        # Handle exceptions
        if job.status.State.Name(job.status.state) == 'ERROR':
            raise Exception(job.status.details)
        elif job.status.State.Name(job.status.state) == 'DONE':
            print('Job finished.')
            return job

Dataproc クラスタの削除

クラスタを削除するには、DeleteCluster API を呼び出します。

def delete_cluster(dataproc, project, region, cluster):
    """Delete the cluster."""
    print('Tearing down cluster.')
    result = dataproc.delete_cluster(
        project_id=project, region=region, cluster_name=cluster)
    return result