Menggunakan Library Klien Cloud untuk Python

Tutorial ini mencakup panduan Cloud Shell yang menggunakan library klien Google Cloud untuk Python untuk memanggil Dataproc gRPC API secara terprogram untuk membuat cluster dan mengirimkan tugas ke cluster.

Bagian berikut menjelaskan operasi kode panduan yang terdapat di repositori GitHub GoogleCloudPlatform/python-dataproc.

Menjalankan panduan Cloud Shell

Klik Open in Cloud Shell untuk menjalankan panduan.

Buka di Cloud Shell

Memahami kode

Kredensial Default Aplikasi

Panduan Cloud Shell dalam tutorial ini memberikan autentikasi menggunakan kredensial project Google Cloud Anda. Saat Anda menjalankan kode secara lokal, praktik yang direkomendasikan adalah menggunakan kredensial akun layanan untuk mengautentikasi kode Anda.

Membuat cluster Dataproc

Nilai berikut ditetapkan untuk membuat cluster:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Nama cluster
  • Konfigurasi cluster, yang menentukan satu master dan dua pekerja utama

Setelan konfigurasi default digunakan untuk setelan cluster lainnya. Anda dapat mengganti setelan konfigurasi cluster default. Misalnya, Anda dapat menambahkan VM sekunder (default = 0) atau menentukan jaringan VPC non-default untuk cluster. Untuk mengetahui informasi selengkapnya, lihat CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Mengirim tugas

Nilai berikut ditetapkan untuk mengirimkan tugas:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Konfigurasi tugas, yang menentukan nama cluster dan jalur file (URI) Cloud Storage dari tugas PySpark

Lihat SubmitJob untuk mengetahui informasi selengkapnya.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Menghapus cluster

Nilai berikut ditetapkan untuk menghapus cluster:

  • Project tempat cluster akan dibuat
  • Region tempat cluster akan dibuat
  • Nama cluster

Untuk mengetahui informasi selengkapnya, lihat DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")