Using Flexible Resource Scheduling in Cloud Dataflow

This page explains how to enable Flexible Resource Scheduling (FlexRS) for autoscaled batch pipelines in Cloud Dataflow.

FlexRS

FlexRS reduces batch processing costs by using advanced scheduling techniques, the Cloud Dataflow Shuffle service, and a combination of preemptible virtual machine (VM) instances and regular VMs. By running preemptible VMs and regular VMs in parallel, Cloud Dataflow improves the user experience if Compute Engine stops preemptible VM instances during a system event. FlexRS helps to ensure that the pipeline continues to make progress and that you do not lose previous work when Compute Engine preempts your preemptible VMs.

Jobs with FlexRS use service-based Cloud Dataflow Shuffle for joining and grouping. As a result, FlexRS jobs do not use Persistent Disk resources for storing temporary calculation results. Using Cloud Dataflow Shuffle allows FlexRS to handle the preemption of a worker VM better because the Cloud Dataflow service does not have to redistribute data to the remaining workers. Each Cloud Dataflow worker still needs a small 25 GB Persistent Disk volume to store the machine image and temporary logs.

FlexRS jobs have a scheduling delay. Therefore, FlexRS is most suitable for workloads that are not time-critical, such as daily or weekly jobs that can complete within a certain time window.

Delayed scheduling

When you submit a FlexRS job, the Cloud Dataflow service places the job into a queue and submits it for execution within 6 hours of job creation. Cloud Dataflow finds the best time to start the job within that time window, based on the available capacity and other factors.

When you submit a FlexRS job, the Cloud Dataflow service executes the following steps:

  1. Returns a job ID immediately after job submission.
  2. Performs an early validation run.
  3. Uses the early validation result to determine the next step:

    1. On success, queues the job to wait for the delay launch.
    2. In all other cases, the job fails and the Cloud Dataflow service reports the errors.

Therefore, after you submit a FlexRS job, your job displays an ID and a Status of Queued on the Cloud Dataflow monitoring interface on success and a status of Failed otherwise.

Early validation

FlexRS jobs do not immediately launch upon submission. During early validation, the Cloud Dataflow service verifies the execution parameters and Google Cloud Platform (GCP) environment settings, such as Cloud IAM roles and network configurations. Cloud Dataflow validates the job as much as possible at job submission time and reports potential errors. You are not billed for this early validation process.

The early validation step does not execute user code. You must verify your code to check for issues using the Apache Beam Direct Runner or non-FlexRS jobs. If there are GCP environment changes between job creation and the job's delayed scheduling, the job might succeed during early validation but still fail at launch time.

Enabling FlexRS

When you create a FlexRS job, a job quota is taken, even when the job is in the Queued status. Therefore, before you enable FlexRS, verify that you have enough GCP project resource quotas to launch your job. This includes additional quota for preemptible CPUs, regular CPUs, and IP addresses, unless you turn off the Public IP parameter.

If you do not have enough quota, your account might not have enough resources when your FlexRS job deploys. Cloud Dataflow selects preemptible VMs for 90% of workers in the worker pool by default. When planning for CPU quota, make sure that you have sufficient preemptible VM quota. By default, you do not have separate preemptible VM quota. You must explicitly request preemptible VM quota; otherwise, your FlexRS job will lack the resources to execute in a timely manner.

Pricing

FlexRS jobs are billed for the following resources:

  • Regular and preemptible CPUs
  • Memory resources
  • Cloud Dataflow Shuffle resources
  • 25 GB per worker of Persistent Disk resources

While Cloud Dataflow uses both preemptible and regular workers to execute your FlexRS job, you are billed a uniform discounted rate compared to regular Cloud Dataflow prices regardless of the worker type. Cloud Dataflow Shuffle and Persistent Disk resources are not discounted.

For more information, read the Cloud Dataflow pricing details page.

FlexRS requirements

FlexRS requires the following features:

  • Apache Beam SDK for Java >= 2.12.0 or the Apache Beam SDK for Python >= 2.12.0.
  • Cloud Dataflow Shuffle. Turning on FlexRS also automatically enables Cloud Dataflow Shuffle.

Note: For a limited time, you can enable FlexRS with Apache Beam SDK for Java >= 2.5.0, Apache Beam SDK for Python >= 2.5.0, or Cloud Dataflow SDK 2.5.0. FlexRS support for these SDKs will be deprecated on August 1, 2019.

Pipeline options

Apache Beam SDK >= 2.12.0

Java

To enable a FlexRS job, use the following pipeline option: --flexRSGoal=COST_OPTIMIZED, where the cost-optimized goal means that the Cloud Dataflow service chooses any available discounted resources.

FlexRS jobs affect the following execution parameters:

  • numWorkers only sets the initial number of workers. However, you can set maxNumWorkers for cost control reasons.
  • You cannot set autoscalingAlgorithm=NONE.
  • You cannot specify the zone flag for FlexRS jobs. The Cloud Dataflow service selects the zone for all FlexRS jobs in the region that you specified with the region parameter.
  • During the beta release, you must use the default us-central1 or specify the europe-west1 regional endpoint as your region.
  • During the beta release, you must use the default n1-standard-2 or select n1-highmem-16 for your workerMachineType.

The following example shows how to add parameters to your regular pipeline parameters in order to use FlexRS:

--flexRSGoal=COST_OPTIMIZED \
--region=europe-west1 \
--maxNumWorkers=10 \
--workerMachineType=n1-highmem-16

If you omit region, maxNumWorkers, and workerMachineType, the Cloud Dataflow service determines the default value.

Python

To enable a FlexRS job, use the following pipeline option: --flexrs_goal=COST_OPTIMIZED, where the cost-optimized goal means that the Cloud Dataflow service chooses any available discounted resources.

FlexRS jobs affect the following execution parameters:

  • num_workers only sets the initial number of workers. However, you can set max_num_workers for cost control reasons.
  • You cannot set autoscaling_algorithm=NONE.
  • You cannot specify the zone flag for FlexRS jobs. The Cloud Dataflow service selects the zone for all FlexRS jobs in the region that you specified with the region parameter.
  • During the beta release, you must use the default us-central1 or specify the europe-west1 regional endpoint as your region.
  • During the beta release, you must use the default n1-standard-2 or select n1-highmem-16 for your machine_type.

The following example shows how to add parameters to your regular pipeline parameters in order to use FlexRS:

--flexrs_goal=COST_OPTIMIZED \
--region=europe-west1 \
--max_num_workers=10 \
--machine_type=n1-highmem-16

If you omit region, max_num_workers, and machine_type, the Cloud Dataflow service determines the default value.

Apache Beam SDK >= 2.5.0 or Cloud Dataflow SDK 2.5.0

Java

Caution: For a limited time, you can use FlexRS with Apache Beam SDK for Java >= 2.5.0 or Cloud Dataflow SDK 2.5.0. FlexRS support for these SDKs will be deprecated on August 1, 2019.

Do not specify the flexRSGoal parameter. To temporarily enable a FlexRS job with Apache Beam SDK for Java >= 2.5.0 or Cloud Dataflow SDK 2.5.0, use the following pipeline options:

--experiments=flexible_resource_scheduling \
--experiments=shuffle_mode=service

If you misspell the flexible_resource_scheduling or shuffle_mode=service parameters, the Cloud Dataflow service does not report an error message.

FlexRS jobs affect the following execution parameters:

  • numWorkers only sets the initial number of workers. However, you can set maxNumWorkers for cost control reasons.
  • You cannot set autoscalingAlgorithm=NONE.
  • You cannot specify the zone flag for FlexRS jobs. The Cloud Dataflow service selects the zone for all FlexRS jobs in the region that you specified with the region parameter.
  • During the beta release, you must use the default us-central1 or specify the europe-west1 regional endpoint as your region.
  • During the beta release, you must use the default n1-standard-2 or select n1-highmem-16 for your workerMachineType.

The following example shows how to add parameters to your regular pipeline parameters in order to use FlexRS:

--experiments=flexible_resource_scheduling \
--experiments=shuffle_mode=service \
--region=europe-west1 \
--maxNumWorkers=10 \
--workerMachineType=n1-highmem-16

If you omit region, maxNumWorkers, and workerMachineType, the Cloud Dataflow service determines the default value.

Python

Caution: For a limited time, you can use FlexRS with Apache Beam SDK for Python >= 2.5.0 or Cloud Dataflow SDK 2.5.0. FlexRS support for these SDKs will be deprecated on August 1, 2019.

Do not specify the flexrs_goal parameter. To temporarily enable a FlexRS job with Apache Beam SDK for Python >= 2.5.0 or Cloud Dataflow SDK 2.5.0, use the following pipeline options:

--experiments=flexible_resource_scheduling \
--experiments=shuffle_mode=service

If you misspell the flexible_resource_scheduling or shuffle_mode=service parameters, the Cloud Dataflow service does not report an error message.

FlexRS jobs affect the following execution parameters:

  • num_workers only sets the initial number of workers. However, you can set max_num_workers for cost control reasons.
  • You cannot set autoscaling_algorithm=NONE.
  • You cannot specify the zone flag for FlexRS jobs. The Cloud Dataflow service selects the zone for all FlexRS jobs in the region that you specified with the region parameter.
  • During the beta release, you must use the default us-central1 or specify the europe-west1 regional endpoint as your region.
  • During the beta release, you must use the default n1-standard-2 or select n1-highmem-16 for your machine_type.

The following example shows how to add parameters to your regular pipeline parameters in order to use FlexRS:

--experiments=flexible_resource_scheduling \
--experiments=shuffle_mode=service \
--region=europe-west1 \
--max_num_workers=10 \
--machine_type=n1-highmem-16

If you omit region, max_num_workers, and machine_type, the Cloud Dataflow service determines the default value.

Monitoring FlexRS jobs

You can monitor the status of your FlexRS job on the Google Cloud Platform Console in two places:

  1. The Jobs page that shows all your jobs.
  2. The Job summary page of the job you submitted.

On the Jobs page, jobs that have not started show the status Queued.

A list of Cloud Dataflow
      jobs in the GCP console with statuses of Running, Queued,
      Failed, and Succeeded.
Figure 1: A list of Cloud Dataflow jobs in the GCP Console with statuses of Running, Queued, Failed, and Succeeded.

On the Job summary page, jobs that are waiting in the queue display the message "This job is queued and will run soon."

A queued individual pipeline job in the Cloud Dataflow monitoring
              interface.
Figure 2: A queued individual pipeline job shown in the Cloud Dataflow monitoring interface.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.