Konektor untuk Dataflow

Konektor alur kerja yang menentukan fungsi bawaan yang digunakan untuk mengakses Dataflow dalam alur kerja.

Mempelajari lebih lanjut

Untuk dokumentasi mendetail yang menyertakan contoh kode ini, lihat artikel berikut:

Contoh kode

YAML

# This workflow demonstrates how to use the Cloud Dataflow connector.
# The workflow creates a word count job using a Dataflow public job template
# and uses a Cloud Storage bucket as temporary storage for temp files.
# The bucket resource is deleted after the job completes.
# Expected successful output: "SUCCESS"

- init:
    assign:
      - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
      - location: "us-central1"
      - zone: "us-central1-a"
      - bucket_name: "[fill in a bucket name]"
      - job_name: "[fill in a job name]"
      - input_file: "gs://dataflow-samples/shakespeare/kinglear.txt"
      - output_storage_file_prefix: ${"gs://" + bucket_name + "/counts"}
      - temp_location: ${"gs://" + bucket_name + "/counts/temp"}
      - template_path: "gs://dataflow-templates-us-central1/latest/Word_Count"
- create_bucket:
    call: googleapis.storage.v1.buckets.insert
    args:
      project: ${project_id}
      body:
        name: ${bucket_name}
- create_job:
    call: googleapis.dataflow.v1b3.projects.locations.templates.create
    args:
      projectId: ${project_id}
      location: ${location}
      body:
        jobName: ${job_name}
        parameters:
          inputFile: ${input_file}
          output: ${output_storage_file_prefix}
        environment:
          numWorkers: 1
          maxWorkers: 2
          zone: ${zone}
          tempLocation: ${temp_location}
        gcsPath: ${template_path}
- delete_bucket_object1:
    call: googleapis.storage.v1.objects.delete
    args:
      bucket: ${bucket_name}
      object: ${"counts-00000-of-00003"}
- delete_bucket_object2:
    call: googleapis.storage.v1.objects.delete
    args:
      bucket: ${bucket_name}
      object: ${"counts-00001-of-00003"}
- delete_bucket_object3:
    call: googleapis.storage.v1.objects.delete
    args:
      bucket: ${bucket_name}
      object: ${"counts-00002-of-00003"}
- delete_bucket:
    call: googleapis.storage.v1.buckets.delete
    args:
      bucket: ${bucket_name}
- the_end:
    return: "SUCCESS"

Langkah selanjutnya

Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat Google Cloud browser contoh.