Konektor untuk Dataflow

Konektor Workflows yang menentukan fungsi bawaan yang digunakan untuk mengakses Dataflow dalam alur kerja.

Jelajahi lebih lanjut

Untuk dokumentasi mendetail yang menyertakan contoh kode ini, lihat artikel berikut:

Contoh kode

YAML

# This workflow demonstrates how to use the Cloud Dataflow connector.
# The workflow creates a word count job using a Dataflow public job template
# and uses a Cloud Storage bucket as temporary storage for temp files.
# The bucket resource is deleted after the job completes.
# Expected successful output: "SUCCESS"

- init:
    assign:
      - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
      - location: "us-central1"
      - zone: "us-central1-a"
      - bucket_name: "[fill in a bucket name]"
      - job_name: "[fill in a job name]"
      - input_file: "gs://dataflow-samples/shakespeare/kinglear.txt"
      - output_storage_file_prefix: ${"gs://" + bucket_name + "/counts"}
      - temp_location: ${"gs://" + bucket_name + "/counts/temp"}
      - template_path: "gs://dataflow-templates-us-central1/latest/Word_Count"
- create_bucket:
    call: googleapis.storage.v1.buckets.insert
    args:
      project: ${project_id}
      body:
        name: ${bucket_name}
- create_job:
    call: googleapis.dataflow.v1b3.projects.locations.templates.create
    args:
      projectId: ${project_id}
      location: ${location}
      body:
        jobName: ${job_name}
        parameters:
          inputFile: ${input_file}
          output: ${output_storage_file_prefix}
        environment:
          numWorkers: 1
          maxWorkers: 2
          zone: ${zone}
          tempLocation: ${temp_location}
        gcsPath: ${template_path}
- delete_bucket_object1:
    call: googleapis.storage.v1.objects.delete
    args:
      bucket: ${bucket_name}
      object: ${"counts-00000-of-00003"}
- delete_bucket_object2:
    call: googleapis.storage.v1.objects.delete
    args:
      bucket: ${bucket_name}
      object: ${"counts-00001-of-00003"}
- delete_bucket_object3:
    call: googleapis.storage.v1.objects.delete
    args:
      bucket: ${bucket_name}
      object: ${"counts-00002-of-00003"}
- delete_bucket:
    call: googleapis.storage.v1.buckets.delete
    args:
      bucket: ${bucket_name}
- the_end:
    return: "SUCCESS"

Langkah selanjutnya

Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat browser contoh Google Cloud.