定义用于在工作流内访问 Dataflow 的内置函数的 Workflows 连接器。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
YAML
# This workflow demonstrates how to use the Cloud Dataflow connector.
# The workflow creates a word count job using a Dataflow public job template
# and uses a Cloud Storage bucket as temporary storage for temp files.
# The bucket resource is deleted after the job completes.
# Expected successful output: "SUCCESS"
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location: "us-central1"
- zone: "us-central1-a"
- bucket_name: "[fill in a bucket name]"
- job_name: "[fill in a job name]"
- input_file: "gs://dataflow-samples/shakespeare/kinglear.txt"
- output_storage_file_prefix: ${"gs://" + bucket_name + "/counts"}
- temp_location: ${"gs://" + bucket_name + "/counts/temp"}
- template_path: "gs://dataflow-templates-us-central1/latest/Word_Count"
- create_bucket:
call: googleapis.storage.v1.buckets.insert
args:
project: ${project_id}
body:
name: ${bucket_name}
- create_job:
call: googleapis.dataflow.v1b3.projects.locations.templates.create
args:
projectId: ${project_id}
location: ${location}
body:
jobName: ${job_name}
parameters:
inputFile: ${input_file}
output: ${output_storage_file_prefix}
environment:
numWorkers: 1
maxWorkers: 2
zone: ${zone}
tempLocation: ${temp_location}
gcsPath: ${template_path}
- delete_bucket_object1:
call: googleapis.storage.v1.objects.delete
args:
bucket: ${bucket_name}
object: ${"counts-00000-of-00003"}
- delete_bucket_object2:
call: googleapis.storage.v1.objects.delete
args:
bucket: ${bucket_name}
object: ${"counts-00001-of-00003"}
- delete_bucket_object3:
call: googleapis.storage.v1.objects.delete
args:
bucket: ${bucket_name}
object: ${"counts-00002-of-00003"}
- delete_bucket:
call: googleapis.storage.v1.buckets.delete
args:
bucket: ${bucket_name}
- the_end:
return: "SUCCESS"
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。