Acelere as cargas de trabalho em lote do Dataproc sem servidor e as sessões interativas com a execução de consulta nativa

Anunciamos a disponibilidade geral (GA, na sigla em inglês) da execução de consulta nativa, que oferece suporte a APIs do Spark Dataframe, consultas do Spark SQL que leem dados de arquivos Parquet e ORC e cargas de trabalho recomendadas pela ferramenta de qualificação de execução de consulta nativa. Se tiver dúvidas sobre outros casos de uso, entre em contato com dataproc-pms@google.com.

Neste documento, descrevemos como ativar os trabalhos em lote do Dataproc Serverless e as sessões interativas executadas no nível de preços Premium para usar a execução de consulta nativa.

Como usar a execução de consulta nativa com a precificação de nível premium

A execução de consulta nativa do Dataproc sem servidor está disponível apenas com cargas de trabalho em lote e sessões interativas executadas no nível de preços premium do Dataproc sem servidor. O preço do nível Premium é mais caro do que o do nível padrão, mas não há cobrança extra pela execução de consulta nativa. Para mais informações, consulte Preços do Dataproc sem servidor.

É possível ativar a alocação e o preço de recursos do nível premium para os recursos de lote e de sessão interativa definindo as seguintes propriedades de nível de alocação de recursos como premium ao enviar uma carga de trabalho em lote do Spark ou uma sessão interativa.

Para configurar a execução de consulta nativa, defina as propriedades de execução de consulta nativa em uma carga de trabalho em lote, sessão interativa ou modelo de sessão, e envie a carga de trabalho ou execute a sessão interativa em um notebook.

Console

  1. No console do Google Cloud:

    1. Acesse Lotes do Dataproc.
    2. Clique em Criar para abrir a página Criar lote.
  2. Selecione e preencha os seguintes campos para configurar o lote para a execução de consultas nativas:

  3. Preencha, selecione ou confirme outras configurações de cargas de trabalho em lote. Consulte Enviar uma carga de trabalho em lote do Spark.

  4. Clique em ENVIAR para executar a carga de trabalho em lote do Spark.

gcloud

Defina as seguintes flags de comando da CLI gcloud gcloud dataproc batches submit spark para configurar a carga de trabalho em lote para a execução de consulta nativa:

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --version=VERSION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

Observações:

API

Defina os seguintes campos da API Dataproc para configurar a carga de trabalho em lote para a execução de consulta nativa:

Como ajustar a carga de trabalho de execução de consultas nativas

A execução de consulta nativa do Dataproc sem servidor pode ser ajustada ainda mais usando as seguintes propriedades:

Propriedade de lote Quando usar
spark.driver.memory
spark.driver.memoryOverhead
To tune the memory provided to spark driver process
spark.executor.memory
spark.executor.memoryOverhead
spark.memory.offHeap.size
To tune the memory provided to onheap/offheap memory of executor process
spark.dataproc.driver.disk.tier
spark.dataproc.driver.disk.size
To configure premium disk tier and size for driver
spark.dataproc.executor.disk.tier
spark.dataproc.executor.disk.size
To configure premium disk tier and size for executor

Propriedades de execução de consulta nativa

  • spark.dataproc.runtimeEngine=native (obrigatório): o mecanismo de execução da carga de trabalho precisa ser definido como native para substituir o mecanismo de execução spark padrão.

  • version (Obrigatório): a carga de trabalho precisa usar a versão do ambiente de execução do Spark 1.2.26+, 2.2.26+ ou uma versão principal mais recente do ambiente de execução.

  • Níveis de computação premium (obrigatório): as propriedades spark.dataproc.spark.driver.compute.tier e spark.dataproc.executor.compute.tier precisam ser definidas como premium.

  • Níveis de disco premium (opcional e recomendado): os níveis de disco premium usam a ordenação por colunas em vez de ordenação aleatória baseada em linhas para oferecer melhor desempenho. Para melhorar a taxa de transferência de E/S de shuffle, use as camadas de disco premium do driver e do executor com um tamanho de disco suficiente para acomodar arquivos de shuffle.

  • Memória (opcional): se você tiver configurado o mecanismo de execução de consulta nativa sem configurar a memória fora do heap (spark.memory.offHeap.size) e a memória no heap (spark.executor.memory), o mecanismo de execução de consulta nativa vai usar uma quantidade padrão de memória 4g e dividi-la em uma proporção 6:1 entre a memória fora do heap e a memória no heap.

    Se você decidir configurar a memória ao usar a execução de consulta nativa, faça isso de uma das seguintes maneiras:

    • Configure apenas a memória off-heap (spark.memory.offHeap.size) com um valor especificado. A execução de consulta nativa vai usar o valor especificado como memória fora do heap e alocar um 1/7th adicional do valor da memória fora do heap como memória no heap.

    • Configure a memória no heap (spark.executor.memory) e a memória fora do heap (spark.memory.offHeap.size). A quantidade alocada para a memória fora do heap precisa ser maior do que a alocada para a memória no heap. Recomendação: aloque a memória off-heap na memória on-heap em uma proporção de 6:1.

    Exemplo de valores:

    Configurações de memória sem execução de consulta nativa Configurações de memória recomendadas com a execução de consulta nativa
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28g 24g 4g
    56g 48g 8g

Ferramenta de qualificação de execução de consulta nativa

É possível executar a ferramenta de qualificação de execução de consulta nativa do Dataproc, run_qualification_tool.sh, para identificar cargas de trabalho que podem ter um tempo de execução mais rápido com a execução de consulta nativa. A ferramenta analisa os arquivos de evento do Spark gerados por aplicativos de carga de trabalho em lote e estima os possíveis ganhos de tempo de execução que cada aplicativo de carga de trabalho pode ter com a execução de consulta nativa.

Executar a ferramenta de qualificação

Siga as etapas abaixo para executar a ferramenta em arquivos de evento de carga de trabalho em lote do Dataproc sem servidor.

1.Copie o run_qualification_tool.sh para um diretório local que contenha os arquivos de evento do Spark a serem analisados.

  1. Execute a ferramenta de qualificação para analisar um arquivo de evento ou um conjunto de arquivos de evento contidos no diretório de script.

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    Flags e valores:

    -f (obrigatório): consulte Locais de arquivos de eventos do Spark para localizar arquivos de eventos de carga de trabalho do Spark.

    • EVENT_FILE_PATH (obrigatório, a menos que EVENT_FILE_NAME seja especificado): caminho do arquivo de evento a ser analisado. Se não for fornecido, o caminho do arquivo de evento será considerado o diretório atual.

    • EVENT_FILE_NAME (obrigatório, a menos que EVENT_FILE_PATH seja especificado): nome do arquivo de evento a ser analisado. Se não forem fornecidos, os arquivos de evento encontrados de forma recursiva no EVENT_FILE_PATH serão analisados.

    -o(opcional): se não for fornecido, a ferramenta criará ou usará um diretório output existente no diretório atual para colocar arquivos de saída.

    • CUSTOM_OUTPUT_DIRECTORY_PATH: caminho do diretório de saída para arquivos de saída.

    -k (opcional):

    -x (opcional):

    • MEMORY_ALLOCATED: a memória em gigabytes a ser alocada para a ferramenta. Por padrão, a ferramenta usa 80% da memória livre disponível no sistema e todos os núcleos de máquina disponíveis.

    -t(opcional):

    • PARALLEL_THREADS_TO_RUN: o N=número de linhas de execução paralelas para a ferramenta executar. Por padrão, a ferramenta executa todas as cores.

    Exemplo de uso do comando:

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    Neste exemplo, a ferramenta de qualificação percorre o diretório gs://dataproc-temp-us-east1-9779/spark-job-history e analisa os arquivos de eventos do Spark contidos nesses diretórios e nos subdiretórios. O acesso ao diretório é fornecido pelo /keys/event-file-key. A ferramenta usa 34 GB memory para execução e executa 5 linhas de execução paralelas.

Arquivos de saída da ferramenta de qualificação

Quando a análise for concluída, a ferramenta de qualificação vai colocar os seguintes arquivos de saída em um diretório perfboost-output no diretório atual:

  • AppsRecommendedForBoost.tsv: uma lista separada por tabulação de aplicativos recomendados para uso com a execução de consulta nativa.

  • UnsupportedOperators.tsv: uma lista separada por tabulação de aplicativos não recomendados para uso com a execução de consulta nativa.

Arquivo de saída AppsRecommendedForBoost.tsv

A tabela a seguir mostra o conteúdo de um arquivo de saída AppsRecommendedForBoost.tsv de exemplo. Ela contém uma linha para cada aplicativo analisado.

Exemplo de arquivo de saída AppsRecommendedForBoost.tsv:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0% 0% 548924253 548924253 100% TRUE 30,00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0% 0% 514401703 514401703 100% TRUE 30,00%

Descrições das colunas:

  • applicationId: o ApplicationID do aplicativo Spark. Use isso para identificar a carga de trabalho de lote correspondente.

  • applicationName: o nome do aplicativo Spark.

  • rddPercentage: a porcentagem de operações de RDD no aplicativo. A execução de consultas nativas não oferece suporte a operações de RDD.

  • unsupportedSqlPercentage: Porcentagem de operações SQL sem suporte à execução de consulta nativa.

  • totalTaskTime: tempo de tarefa cumulativo de todas as tarefas executadas durante a execução do aplicativo.

  • supportedTaskTime: o tempo total da tarefa compatível com a execução de consulta nativa.

As colunas a seguir fornecem informações importantes para determinar se a execução de consulta nativa pode beneficiar sua carga de trabalho em lote:

  • supportedSqlPercentage:a porcentagem de operações SQL aceitas pela execução de consulta nativa. Quanto maior a porcentagem, maior será a redução do tempo de execução que pode ser alcançada executando o aplicativo com a execução de consulta nativa.

  • recommendedForBoost:se TRUE, é recomendável executar o app com a execução de consulta nativa. Se recommendedForBoost for FALSE, não use a execução de consulta nativa na carga de trabalho em lote.

  • expectedRuntimeReduction:a redução percentual esperada no tempo de execução do aplicativo quando você executa o aplicativo com a execução de consulta nativa.

UnsupportedOperators.tsv arquivo de saída.

O arquivo de saída UnsupportedOperators.tsv contém uma lista de operadores usados em aplicativos de carga de trabalho que não têm suporte à execução de consulta nativa. Cada linha no arquivo de saída lista um operador sem suporte.

Descrições das colunas:

  • unsupportedOperator: o nome do operador que não é aceito pela execução de consulta nativa.

  • cumulativeCpuMs: o número de milissegundos de CPU consumidos durante a execução do operador. Esse valor reflete a importância relativa do operador no aplicativo.

  • count: o número de vezes que o operador é usado no aplicativo.

Executar a ferramenta de qualificação em projetos

Esta seção fornece instruções para executar um script para executar a ferramenta de qualificação e analisar arquivos de eventos do Spark de vários projetos.

Requisitos e limitações do script:

  • Execute o script em máquinas Linux:
    • A versão >=11 do Java precisa ser instalada como a versão padrão.
  • Como os registros no Cloud Logging têm um TTL de 30 dias, os arquivos de eventos do Spark de trabalhos em lote executados há mais de 30 dias não podem ser analisados.

Para executar a ferramenta de qualificação em projetos, siga estas etapas.

  1. Faça o download do script list-batches-and-run-qt.sh e copie-o para a máquina local.

  2. Mude as permissões do script.

    chmod +x list-batches-and-run-qt.sh
    
  3. Prepare uma lista de arquivos de entrada do projeto para transmitir ao script para análise. Crie o arquivo de texto adicionando uma linha no formato abaixo para cada projeto e região com arquivos de evento do Spark de carga de trabalho em lote para analisar.

    -r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
    

    Observações:

    -r (obrigatório):

    • REGION: região em que os lotes do projeto são enviados.

    -s (obrigatório): Formatar: yyyy-mm-dd. É possível adicionar um segmento de tempo 00:00:00 opcional.

    • START_DATE: apenas as cargas de trabalho em lote criadas após a data de início são analisadas. Os lotes são analisados em ordem decrescente de criação de lote. Os lotes mais recentes são analisados primeiro.

    -e (opcional): formato: yyyy-mm-dd. É possível adicionar um segmento de tempo 00:00:00 opcional.

    • END_DATE: se você especificar isso, apenas as cargas de trabalho em lote criadas antes ou na data de término serão analisadas. Se não for especificado, todos os lotes criados após o START_DATE serão analisados. Os lotes são analisados em ordem decrescente de criação. Os mais recentes são analisados primeiro.

    -l (opcional):

    • LIMIT_MAX_BATCHES: o número máximo de lotes a serem analisados. Você pode usar essa opção em combinação com START-DATE e END-DATE para analisar um número limitado de lotes criados entre as datas especificadas.

      Se -l não for especificado, o número padrão de até 100 lotes será analisado.

    -k (opcional):

    • KEY_PATH: um caminho local que contém chaves de acesso do Cloud Storage para os arquivos de evento do Spark da carga de trabalho.

    Exemplo de arquivo de entrada:

    -r us-central1 -s 2024-08-21 -p project1 -k key1
    -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
    

    Observações:

    • Linha 1:até os 100 arquivos de evento do Spark mais recentes (padrão) em project1 na região us-central1 com um tempo de criação após 2024-08-21 00:00:00 AM serão analisados. key1 permite o acesso aos arquivos no Cloud Storage.

    • Linha 2:até 50 arquivos de eventos do Spark mais recentes em project2 na região us-eastl1 com um horário de criação após 2024-08-21 00:00:00 AM e antes ou em 2024-08-23 11:59:59 PM serão analisados. key2 permite o acesso aos arquivos de evento no Cloud Storage.

  4. Execute o script list-batches-and-run-qt.sh: A saída da análise é gerada nos arquivos AppsRecommendedForBoost.tsv e UnsupportedOperators.tsv.

    ./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \
        -x MEMORY_ALLOCATED \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -t PARALLEL_THREADS_TO_RUN
    

    Observações:

Locais de arquivos de eventos do Spark

Siga uma destas etapas para encontrar os arquivos de evento do Spark para cargas de trabalho em lote do Dataproc sem servidor:

  1. No Cloud Storage, encontre o spark.eventLog.dir da carga de trabalho e faça o download.

    1. Se você não encontrar o spark.eventLog.dir, defina o spark.eventLog.dir para um local do Cloud Storage, reexecute a carga de trabalho e faça o download do spark.eventLog.dir.
  2. Se você tiver configurado o Servidor de histórico do Spark para o job em lote:

    1. Acesse o servidor de histórico do Spark e selecione a carga de trabalho.
    2. Clique em Fazer o download na coluna Registro de eventos.

Quando usar a execução de consulta nativa

Use a execução de consulta nativa nos seguintes cenários:

APIs Dataframe do Spark e consultas do Spark SQL que leem dados de arquivos Parquet e ORC.
Cargas de trabalho recomendadas pela ferramenta de qualificação de execução de consulta nativa.

Quando não usar a execução de consulta nativa

Não use a execução de consulta nativa nos seguintes cenários, porque isso pode não reduzir o tempo de execução da carga de trabalho e causar falhas ou regressões:

  • Cargas de trabalho não recomendadas pela ferramenta de qualificação de execução de consulta nativa.
  • Cargas de trabalho de ML, UDF e RDD do Spark
  • Cargas de trabalho de gravação
  • Formatos de arquivo diferentes de Parquet e ORC
  • Entradas dos seguintes tipos de dados:
    • Timestamp, TinyInt, Byte, Struct, Array, Map: ORC e Parquet
    • VarChar, Char: ORC
  • Consultas que contêm expressões regulares
  • Cargas de trabalho que usam um bucket de pagamentos do solicitante
  • Configurações do Cloud Storage que não são padrão. A execução de consulta nativa usa muitos padrões, mesmo quando eles são substituídos.

Limitações

A ativação da execução de consulta nativa nos cenários a seguir pode gerar exceções, criar incompatibilidades do Spark ou fazer com que a carga de trabalho volte para o motor padrão do Spark.

Substitutos

A execução de consulta nativa pode resultar em substituição de carga de trabalho pelo mecanismo de execução do Spark, resultando em regressão ou falha.

  • ANSI:se o modo ANSI estiver ativado, a execução será substituída pelo Spark.

  • Modo com distinção entre maiúsculas e minúsculas:a execução de consulta nativa oferece suporte apenas ao modo padrão do Spark, sem distinção entre maiúsculas e minúsculas. Se o modo de diferenciação entre maiúsculas e minúsculas estiver ativado, podem ocorrer resultados incorretos.

  • Verificação de tabelas particionadas:a execução de consulta nativa oferece suporte à verificação de tabela particionada somente quando o caminho contém as informações de partição. Caso contrário, a carga de trabalho volta para o mecanismo de execução do Spark.

Comportamento incompatível

Comportamento incompatível ou resultados incorretos podem ocorrer ao usar a execução de consulta nativa nos seguintes casos:

  • Funções JSON:a execução de consulta nativa aceita strings cercadas por aspas duplas, não simples. Os resultados incorretos ocorrem com aspas simples. O uso de "*" no caminho com a função get_json_object retorna NULL.

  • Configuração de leitura do Parquet:

    • A execução de consulta nativa trata spark.files.ignoreCorruptFiles como definido para o valor padrão false, mesmo quando definido como true.
    • A execução de consulta nativa ignora spark.sql.parquet.datetimeRebaseModeInRead e retorna apenas o conteúdo do arquivo Parquet. As diferenças entre o calendário híbrido legado (juliano gregoriano) e o calendário gregoriano proleptário não são consideradas. Os resultados do Spark podem ser diferentes.
  • NaN:não tem suporte. Resultados inesperados podem ocorrer, por exemplo, ao usar NaN em uma comparação numérica.

  • Leitura colunar do Spark:um erro fatal pode ocorrer porque o vetor colunar do Spark é incompatível com a execução de consulta nativa.

  • Derrame:quando as partições de shuffle são definidas como um número grande, o recurso de derramamento para disco pode acionar um OutOfMemoryException. Se isso acontecer, reduzir o número de partições pode eliminar essa exceção.