[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["上次更新時間:2025-09-04 (世界標準時間)。"],[[["\u003cp\u003ePipelines use multiple executors across a cluster to run work in parallel, and the speed of the pipeline increases with the number of data splits, or partitions.\u003c/p\u003e\n"],["\u003cp\u003eSources divide data into splits at the beginning of each pipeline run, and checking if sources are creating enough splits is crucial for maximizing parallelism.\u003c/p\u003e\n"],["\u003cp\u003eShuffles, caused by plugins like Group By, move data between executors and are expensive due to heavy I/O, but most shuffling plugins allow for setting the number of partitions.\u003c/p\u003e\n"],["\u003cp\u003eChoosing the right number of partitions is crucial, with a useful guideline being \u003ccode\u003emax(cluster CPUs, input records / 500,000)\u003c/code\u003e, and it is generally better to have more partitions than fewer.\u003c/p\u003e\n"],["\u003cp\u003eData skew, where a few keys are much more common, can severely impact performance by disproportionately loading a small number of executors, especially with joins.\u003c/p\u003e\n"]]],[],null,["# Parallel processing\n\nPipelines are executed on clusters of machines. They achieve high throughput by\nsplitting up the work that needs to be done, and then running the work in\nparallel on the multiple executors spread out across the cluster. In general,\nthe greater the number of splits (also called partitions), the faster the\npipeline can be run. The level of parallelism in your pipeline is determined by\nthe sources and shuffle stages in the pipeline.\n\nSources\n-------\n\nAt the start of each pipeline run, every source in your pipeline calculates what\ndata needs to be read, and how that data can be divided into splits. For\nexample, consider a basic pipeline that reads from Cloud Storage,\nperforms some Wrangler transformations, and then writes back to\nCloud Storage.\n\nWhen the pipeline starts, the Cloud Storage source examines the input\nfiles and breaks them up into splits based on the file sizes. For example, a\nsingle gigabyte file can be broken up into 100 splits, each 10 MB in\nsize. Each executor reads the data for that split, runs the Wrangler\ntransformations, and then writes the output to a *part* file.\n\nIf your pipeline is running slowly, one of the first things to check is whether\nyour sources are creating enough splits to take full advantage of parallelism.\nFor example, some types of compression make plaintext files unsplittable. If you\nare reading files that have been gzipped, you might notice that your pipeline\nruns much slower than if you were reading uncompressed files, or files\ncompressed with BZIP (which is splittable). Similarly, if you are using the\ndatabase source and have configured it to use just a single split, it runs much\nslower than if you configure it to use more splits.\n\nShuffles\n--------\n\nCertain types of plugins cause data to be shuffled across the cluster. This\nhappens when records being processed by one executor need to be sent to another\nexecutor to perform the computation. Shuffles are expensive operations because\nthey involve a lot of I/O. Plugins that cause data to be shuffled all show up in\nthe **Analytics** section of the Pipeline Studio. These include plugins, such as\nGroup By, Deduplicate, Distinct, and Joiner. For example, suppose a **Group By**\nstage is added to the pipeline in the preceding example.\n\nAlso suppose the data being read represents purchases made at a grocery store.\nEach record contains an `item` field and a `num_purchased` field. In the **Group\nBy** stage, we configure the pipeline to group records on the `item` field and\ncalculate the sum of the `num_purchased` field.\n\nWhen the pipeline runs, the input files are split up as described earlier. After\nthat, each record is shuffled across the cluster such that every record with the\nsame item belongs to the same executor.\n\nAs illustrated in the preceding example, records for apple purchases were\noriginally spread out across several executors. To perform the aggregation, all\nof those records needed to be sent across the cluster to the same executor.\n\nMost plugins that require a shuffle let you to specify the number of partitions\nto use when shuffling the data. This controls how many executors are used to\nprocess the shuffled data.\n\nIn the preceding example, if the number of partitions is set to `2`, each executor calculates aggregates for two items instead of one.\n\nNote that it is possible to decrease the parallelism of your pipeline after that\nstage. For example, consider the logical view of the pipeline:\n\nIf the source divides data across 500 partitions but the Group By shuffles using\n200 partitions, the maximum level of parallelism after the Group By drops from\n500 to 200. Instead of 500 different part files written to\nCloud Storage, you only have 200.\n\nChoosing partitions\n-------------------\n\nIf the number of partitions is too low, you won't be using the full capacity of\nyour cluster to parallelize as much work as you can. Setting the partitions too\nhigh increases the amount of unnecessary overhead. In general, it is better to\nuse too many partitions than too few. Extra overhead is something to worry about\nif your pipeline takes a few minutes to run and you are trying to shave off a\ncouple minutes. If your pipeline takes hours to run, overhead is generally not\nsomething you need to worry about.\n\nA useful, but overly simplistic, way to determine the number of partitions to\nuse is to set it to `max(cluster CPUs, input records / 500,000)`. In other\nwords, take the number of input records and divide by 500,000. If that number is\ngreater than the number of cluster CPUs, use that for the number of partitions.\nOtherwise, use the number of cluster CPUs. For example, if your cluster has\n100 CPUs and the shuffle stage is expected to have 100 million input\nrecords, use 200 partitions.\n\nA more complete answer is that shuffles perform best when the intermediate\nshuffle data for each partition can fit completely in an executor's memory so\nthat nothing needs to be spilled to disk. Spark reserves just under 30% of an\nexecutor's memory for holding shuffle data. The exact number is (total memory -\n300 MB) \\* 30%. If we assume each executor is set to use 2 GB memory,\nthat means each partition should hold no more than (2 GB - 300 MB) \\* 30% =\napproximately 500 MB of records. If we assume each record compresses down\nto 1 KB in size, then that means (500 MB / partition) / (1 KB /\nrecord) = 500,000 records per partition. If your executors are using more\nmemory, or your records are smaller, you can adjust this number accordingly.\n\nData skew\n---------\n\nNote that in the preceding example, purchases for various items were evenly\ndistributed. That is, there were three purchases each for apples, bananas,\ncarrots, and eggs. Shuffling on an evenly distributed key is the most performant\ntype of shuffle, but many datasets don't have this property. Continuing the\ngrocery store purchase in the preceding example, you would expect to have many\nmore purchases for eggs than for wedding cards. When there are a few shuffle\nkeys that are much more common than other keys, you are dealing with skewed\ndata. Skewed data can perform significantly worse than unskewed data because a\ndisproportionate amount of work is being performed by a small handful of\nexecutors. It causes a small subset of partitions to be much larger than all the\nothers.\n\nIn this example, there are five times as many egg purchases than card purchases,\nwhich means the egg aggregate takes roughly five times longer to compute. It\ndoesn't matter much when dealing with just 10 records, instead of two, but it\nmakes a big difference when dealing with five billion records instead of one\nbillion. When you have data skew, the number of partitions used in a shuffle\ndoesn't have a large impact on pipeline performance.\n\nYou can recognize data skew by examining the graph for output records over time.\nIf the stage is outputting records at a much higher pace at the start of the\npipeline run and then suddenly slows down, this might mean you have skewed data.\n\nYou can also recognize data skew by examining cluster memory usage over time. If\nyour cluster is at capacity for some time, but suddenly has low memory usage for\na period of time, this is also a sign that you are dealing with data skew.\n\nSkewed data most significantly impacts performance when a join is being\nperformed. There are a few techniques that can be used to improve performance\nfor skewed joins. For more information, see\n[Parallel processing for `JOIN` operations](/data-fusion/docs/concepts/parallel-processing-join-operations).\n\nAdaptive tuning for execution\n-----------------------------\n\nTo adaptively tune execution, specify the range of partitions to use, not the\nexact partition number. The exact partition number, even if set in pipeline\nconfiguration, is ignored when adaptive execution is enabled.\n\nIf you are using an ephemeral Dataproc cluster,\nCloud Data Fusion sets proper configuration automatically, but for static\nDataproc or Hadoop clusters, the next two configuration\nparameters can be set:\n\n- `spark.default.parallelism`: set it to the total number of vCores available in the cluster. This ensures your cluster isn't underloaded and defines the lower bound for the number of partitions.\n- `spark.sql.adaptive.coalescePartitions.initialPartitionNum`: set it to 32x of the number of vCores available in the cluster. This defines the upper bound for the number of partitions.\n- `Spark.sql.adaptive.enabled`: to enable the optimizations, set this value to `true`. Dataproc sets it automatically, but if you are using generic Hadoop clusters, you must ensure it's enabled .\n\nThese parameters can be set in the [engine configuration](/data-fusion/docs/concepts/manage-pipeline-configurations) of a specific\npipeline or in the [cluster properties](/dataproc/docs/concepts/configuring-clusters/cluster-properties) of a static Dataproc\ncluster.\n\nWhat's next\n-----------\n\n- Learn about [parallel processing for `JOIN` operations](/data-fusion/docs/concepts/parallel-processing-join-operations)."]]