Esegui un job Spark su Dataproc su Google Kubernetes Engine

Prima di iniziare

  1. Devi aver creato un cluster zonale o regionale Google Kubernetes Engine (GKE) standard (non Autopilot) con Workload Identity attivato.

Crea un cluster virtuale Dataproc su GKE

Viene creato un cluster virtuale Dataproc su GKE come piattaforma di deployment per i componenti Dataproc. Si tratta di una risorsa virtuale e, a differenza di un cluster Dataproc su Compute Engine, non include VM master e worker Dataproc separate.

  • Dataproc on GKE crea pool di nodi all'interno di un cluster GKE quando crei un cluster virtuale Dataproc on GKE.

  • I job Dataproc su GKE vengono eseguiti come pod su questi pool di nodi. I pool di nodi e la pianificazione dei pod nei pool di nodi sono gestiti da GKE.

  • Crea più cluster virtuali. Puoi creare ed eseguire più cluster virtuali su un cluster GKE per migliorare l'utilizzo delle risorse condividendo i pool di nodi tra i cluster virtuali.

    • Ogni cluster virtuale:
      • viene creato con proprietà separate, tra cui la versione del motore Spark e l'identità del carico di lavoro
      • sia isolato in uno spazio dei nomi GKE distinto nel cluster GKE

Console

  1. Nella console Google Cloud, vai alla pagina Cluster di Dataproc.

    Vai a Cluster

  2. Fai clic su Crea cluster.

  3. Nella finestra di dialogo Crea cluster Dataproc, fai clic su Crea nella riga Cluster su GKE.

  4. Nel riquadro Configura cluster:

    1. Nel campo Nome cluster, inserisci un nome per il cluster.
    2. Nell'elenco Regione, seleziona una regione per il cluster virtuale Dataproc su GKE. Questa regione deve essere la stessa in cui si trova il tuo cluster GKE esistente (che selezioni nell'elemento successivo).
    3. Nel campo Cluster Kubernetes, fai clic su Sfoglia per selezionare la regione in cui si trova il tuo cluster GKE esistente.
    4. (Facoltativo) Nel campo Bucket di staging Cloud Storage, puoi fare clic su Sfoglia per selezionare un bucket Cloud Storage esistente. Dataproc su GKE eseguirà lo staging degli elementi nel bucket. Ignora questo campo per consentire a Dataproc su GKE di creare un bucket di staging.
  5. Nel riquadro a sinistra, fai clic su Configura pool di nodi e poi nel riquadro Pool di nodi fai clic su Aggiungi un pool.

    1. Per riutilizzare un pool di nodi Dataproc su GKE esistente:
      1. Fai clic su Riutilizza un node pool esistente.
      2. Inserisci il nome del pool di nodi esistente e seleziona il relativo ruolo. Almeno un pool di nodi deve avere il ruolo DEFAULT.
      3. Fai clic su Fine.
    2. Per creare un nuovo pool di nodi Dataproc su GKE:
      1. Fai clic su Crea un nuovo node pool.
      2. Inserisci i seguenti valori per il pool di nodi:
        • Nome node pool
        • Ruolo: almeno un pool di nodi deve avere il ruolo DEFAULT.
        • Posizione: specifica una zona all'interno della regione del cluster Dataproc su GKE.
        • Tipo di macchina del pool di nodi
        • Piattaforma CPU
        • Precedenza
        • Min: numero minimo di nodi.
        • Max: numero massimo di nodi. Il numero massimo di nodi deve essere maggiore di 0.
    3. Fai clic su Aggiungi un pool per aggiungere altri pool di nodi. Tutti i pool di nodi devono avere la posizione. Puoi aggiungere un totale di quattro pool di nodi.
  6. (Facoltativo) Se hai configurato un server di cronologia persistente (PHS) Dataproc da utilizzare per visualizzare la cronologia dei job Spark, fai clic su Personalizza cluster nei cluster Dataproc su GKE attivi ed eliminati. Poi, nel campo Cluster di server di cronologia, cerca e scegli il cluster PHS. Il cluster PHS deve trovarsi nella stessa regione del cluster virtuale Dataproc su GKE.

  7. Fai clic su Crea per creare il cluster Dataproc. Il cluster Dataproc on GKE viene visualizzato in un elenco nella pagina Cluster. Lo stato è Provisioning in corso fino a quando il cluster non è pronto per essere utilizzato, quindi diventa In esecuzione.

gcloud

Imposta le variabili di ambiente, quindi esegui il comando gcloud dataproc clusters gke create localmente o in Cloud Shell per creare un cluster Dataproc su GKE.

  1. Imposta le variabili di ambiente:

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    Note:

    • DP_CLUSTER: imposta il nome del cluster virtuale Dataproc, che deve iniziare con una lettera minuscola seguita da un massimo di 54 lettere minuscole, numeri o trattini. e non può terminare con un trattino.
    • REGION: region deve essere uguale alla regione in cui si trova il cluster GKE.
    • GKE_CLUSTER: il nome del cluster GKE esistente.
    • BUCKET: (Facoltativo) puoi specificare il nome di un bucket Cloud Storage, che verrà utilizzato da Dataproc per eseguire lo staging degli elementi. Se non specifichi un bucket, Dataproc su GKE creerà un bucket di staging.
    • DP_POOLNAME: il nome di un node pool da creare nel cluster GKE.
    • PHS_CLUSTER: (Facoltativo) Server PHS Dataproc da utilizzare per visualizzare la cronologia dei job Spark nei cluster Dataproc su GKE attivi ed eliminati. Il cluster PHS deve trovarsi nella stessa regione del cluster virtuale Dataproc su GKE.
  2. Esegui il comando:

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    Note:

    • --spark-engine-version: la versione dell'immagine Spark utilizzata nel cluster Dataproc. Puoi utilizzare un identificatore, ad esempio 3, 3.1 o latest, oppure puoi specificare la versione subminor completa, ad esempio 3.1-dataproc-5.
    • --staging-bucket: elimina questo flag per consentire a Dataproc su GKE di creare un bucket di staging.
    • --pools: questo flag viene utilizzato per specificare un pool di nodi nuovo o esistente che Dataproc creerà o utilizzerà per eseguire il carico di lavoro. Elenca le impostazioni del pool di nodi Dataproc su GKE, separate da virgole, ad esempio:
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      Devi specificare il pool di nodi name e role. Le altre impostazioni del pool di nodi sono facoltative. Puoi utilizzare più flag --pools per specificare più pool di nodi. Almeno un pool di nodi deve avere il ruolo default. Tutti i pool di nodi devono avere la stessa posizione.
    • --setup-workload-identity: questo flag attiva le associazioni di Workload Identity. Queste associazioni consentono agli account di servizio Kubernetes (KSA) di agire come account di servizio VM Dataproc (identità del piano dati) predefinito del cluster virtuale.

REST

Completa un virtualClusterConfig come parte di una richiesta API Dataproc cluster.create.

Prima di utilizzare i dati della richiesta, apporta le seguenti sostituzioni:

  • PROJECT: ID progetto Google Cloud
  • REGION: regione del cluster virtuale Dataproc (la stessa regione del cluster GKE esistente)
  • DP_CLUSTER: nome del cluster Dataproc
  • GKE_CLUSTER: nome del cluster GKE
  • NODE_POOL: nome del pool di nodi
  • PHS_CLUSTER: nome del cluster del server di cronologia permanente (PHS)
  • BUCKET: (facoltativo) nome del bucket di staging. Lascia vuoto questo campo per consentire a Dataproc su GKE di creare un bucket di staging.

Metodo HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

Corpo JSON della richiesta:

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

Per inviare la richiesta, espandi una di queste opzioni:

Dovresti ricevere una risposta JSON simile alla seguente:

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

Invia un job Spark

Dopo aver eseguito il cluster virtuale Dataproc su GKE, invia un job Spark utilizzando la console Google Cloud, gcloud CLI o l'API Dataproc jobs.submit (utilizzando richieste HTTP dirette o le librerie client Cloud).

Esempio di job Spark dell'interfaccia a riga di comando gcloud:

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

Esempio di job PySpark dell'interfaccia a riga di comando gcloud:

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

Esempio di job SparkR dell'interfaccia a riga di comando gcloud:

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

Esegui la pulizia