Plantilla de Cloud Pub/Sub a Cloud Storage

Usa la plantilla de Dataproc Serverless Cloud Pub/Sub a Cloud Storage para extraer datos de Pub/Sub a Cloud Storage.

Usa la plantilla

Ejecuta la plantilla con la CLI de gcloud o la API de Dataproc.

gcloud

Antes de usar cualquiera de los datos de comando a continuación, realiza los siguientes reemplazos:

  • PROJECT_ID: Obligatorio. El ID de tu proyecto de Google Cloud que aparece en la Configuración de IAM
  • REGION: Obligatorio. Compute Engine región.
  • SUBNET: Opcional Si no se especifica una subred, esta en la REGION especificada en la red default.

    Ejemplo: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: Obligatorio. Especifica latest para la versión más reciente de la plantilla o la fecha de una versión específica, por ejemplo, 2023-03-17_v0.1.0-beta (visita gs://dataproc-templates-binaries o ejecuta gcloud storage ls gs://dataproc-templates-binaries para ver una lista de las versiones de plantilla disponibles).
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: Obligatorio. El ID del proyecto de Google Cloud que aparece en La Configuración de IAM que contiene la suscripción de Pub/Sub de entrada que se leerá.
  • SUBSCRIPTION: Obligatorio. Es el nombre de la suscripción a Pub/Sub.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: Obligatorio. Es el nombre del bucket de Cloud Storage en el que se almacenará el resultado.

    Nota: Los archivos de salida se almacenarán en la carpeta output/ dentro del bucket.

  • FORMAT: Obligatorio. Formato de datos de salida. Opciones: avro o json.

    Nota: Si es avro, debes agregar "file:///usr/lib/spark/external/spark-avro.jar" al campo de la API o la marca de gcloud CLI de jars.

    Ejemplo (el prefijo file:// hace referencia a un archivo jar de Dataproc Serverless):

    --jars=file:///usr/lib/spark/external/spark-avro.jar, [ ... otros frascos]
  • TIMEOUT: Opcional Es el tiempo en milisegundos antes de que finalice la transmisión. El valor predeterminado es 60000.
  • DURATION: Opcional Frecuencia en segundos de las operaciones de escritura en Cloud Storage. El valor predeterminado es de 15 segundos.
  • NUM_RECEIVERS: Opcional Cantidad de transmisiones leídas de una suscripción a Pub/Sub en paralelo. La configuración predeterminada es 5.
  • BATCHSIZE: Opcional Cantidad de registros para insertar en un recorrido de ida y vuelta en Cloud Storage. La configuración predeterminada es 1,000.
  • SERVICE_ACCOUNT: Opcional Si no se proporciona, se usa la cuenta de servicio de Compute Engine predeterminada.
  • PROPERTY y PROPERTY_VALUE: Opcional. Lista de elementos separados por comas Propiedad de Spark=value pares.
  • LABEL y LABEL_VALUE: Opcional. Lista separada por comas de pares label=value.
  • LOG_LEVEL: Opcional Nivel de registro. Puede ser uno de ALL, DEBUG, ERROR, FATAL y INFO, OFF, TRACE o WARN. Valor predeterminado: INFO.
  • KMS_KEY: Opcional La clave de Cloud Key Management Service que se usará en la encriptación. Si no se especifica una clave, los datos se encriptan en reposo con una clave de propiedad de Google y una administrada por Google.

    Ejemplo: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

Ejecuta el siguiente comando:

Linux, macOS o Cloud Shell

gcloud dataproc batches submit spark \
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \
    --version="1.1" \
    --project="PROJECT_ID" \
    --region="REGION" \
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \
    --subnet="SUBNET" \
    --kms-key="KMS_KEY" \
    --service-account="SERVICE_ACCOUNT" \
    --properties="PROPERTY=PROPERTY_VALUE" \
    --labels="LABEL=LABEL_VALUE" \
    -- --template=PUBSUBTOGCS \
    --templateProperty log.level="LOG_LEVEL" \
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (PowerShell)

gcloud dataproc batches submit spark `
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate `
    --version="1.1" `
    --project="PROJECT_ID" `
    --region="REGION" `
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" `
    --subnet="SUBNET" `
    --kms-key="KMS_KEY" `
    --service-account="SERVICE_ACCOUNT" `
    --properties="PROPERTY=PROPERTY_VALUE" `
    --labels="LABEL=LABEL_VALUE" `
    -- --template=PUBSUBTOGCS `
    --templateProperty log.level="LOG_LEVEL" `
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" `
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" `
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" `
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" `
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" `
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" `
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" `
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (cmd.exe)

gcloud dataproc batches submit spark ^
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^
    --version="1.1" ^
    --project="PROJECT_ID" ^
    --region="REGION" ^
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^
    --subnet="SUBNET" ^
    --kms-key="KMS_KEY" ^
    --service-account="SERVICE_ACCOUNT" ^
    --properties="PROPERTY=PROPERTY_VALUE" ^
    --labels="LABEL=LABEL_VALUE" ^
    -- --template=PUBSUBTOGCS ^
    --templateProperty log.level="LOG_LEVEL" ^
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

REST

Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:

  • PROJECT_ID: Obligatorio. El ID del proyecto de Google Cloud que aparece en la Configuración de IAM.
  • REGION: Obligatorio. Región de Compute Engine.
  • SUBNET: Opcional Si no se especifica una subred, se selecciona la subred en la REGION especificada en la red default.

    Ejemplo: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: Obligatorio. Especifica latest para la versión más reciente versión de la plantilla o la fecha de una versión específica, por ejemplo, 2023-03-17_v0.1.0-beta (visita gs://dataproc-templates-binaries o ejecuta gcloud storage ls gs://dataproc-templates-binaries para enumerar las versiones de plantilla disponibles).
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: Obligatorio. El ID del proyecto de Google Cloud que aparece en La Configuración de IAM que contiene la suscripción de Pub/Sub de entrada que se leerá.
  • SUBSCRIPTION: Obligatorio. Nombre de la suscripción de Pub/Sub.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: Obligatorio. Es el nombre del bucket de Cloud Storage en el que se almacenará el resultado.

    Nota: Los archivos de salida se almacenarán en la carpeta output/ dentro del bucket.

  • FORMAT: Obligatorio. Formato de datos de salida. Opciones: avro o json.

    Nota: Si es avro, debes agregar "file:///usr/lib/spark/external/spark-avro.jar" al campo de la API o la marca de gcloud CLI de jars.

    Ejemplo (el prefijo file:// hace referencia a un archivo jar de Dataproc Serverless):

    --jars=file:///usr/lib/spark/external/spark-avro.jar, [ ... otros frascos]
  • TIMEOUT: Opcional Es el tiempo en milisegundos antes de que finalice la transmisión. El valor predeterminado es 60000.
  • DURATION: Opcional Frecuencia en segundos de las operaciones de escritura en Cloud Storage. El valor predeterminado es de 15 segundos.
  • NUM_RECEIVERS: Opcional Cantidad de transmisiones leídas de una suscripción a Pub/Sub en paralelo. La configuración predeterminada es 5.
  • BATCHSIZE: Opcional Cantidad de registros para insertar en un recorrido de ida y vuelta en Cloud Storage. La configuración predeterminada es 1,000.
  • SERVICE_ACCOUNT: Opcional Si no se proporciona, el cuenta de servicio predeterminada de Compute Engine y control sobre el uso de sus datos.
  • PROPERTY y PROPERTY_VALUE: Opcional. Lista de elementos separados por comas Propiedad de Spark=value pares.
  • LABEL y LABEL_VALUE: Opcional. Es una lista separada por comas de pares label=value.
  • LOG_LEVEL: Opcional Nivel de registro. Puede ser uno de ALL, DEBUG, ERROR, FATAL y INFO, OFF, TRACE o WARN. Valor predeterminado: INFO.
  • KMS_KEY: Opcional La clave de Cloud Key Management Service que se usará en la encriptación. Si no se especifica una clave, los datos se encriptan en reposo con una clave de propiedad de Google y una administrada por Google.

    Ejemplo: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

Método HTTP y URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches

Cuerpo JSON de la solicitud:


{
  "environmentConfig":{
    "executionConfig":{
      "subnetworkUri":"SUBNET",
      "kmsKey": "KMS_KEY",
      "serviceAccount": "SERVICE_ACCOUNT"
    }
  },
  "labels": {
    "LABEL": "LABEL_VALUE"
  },
  "runtimeConfig": {
    "version": "1.1",
    "properties": {
      "PROPERTY": "PROPERTY_VALUE"
    }
  },
  "sparkBatch":{
    "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate",
    "args":[
      "--template","PUBSUBTOGCS",
      "--templateProperty","log.level=LOG_LEVEL",
      "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID",
      "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION",
      "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME",
      "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT",
      "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT",
      "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION",
      "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS",
      "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE"
    ],
    "jarFileUris":[
      "file:///usr/lib/spark/external/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar"
    ]
  }
}

Para enviar tu solicitud, expande una de estas opciones:

Deberías recibir una respuesta JSON similar a la que se muestra a continuación:


{
  "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata",
    "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID",
    "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583",
    "createTime": "2023-02-24T03:31:03.440329Z",
    "operationType": "BATCH",
    "description": "Batch"
  }
}