Plantilla de Cloud Pub/Sub a Cloud Storage
Usa la plantilla de Dataproc sin servidor de Cloud Pub/Sub a Cloud Storage para extraer data de Pub/Sub a Cloud Storage.
Usa la plantilla
Ejecuta la plantilla con gcloud CLI o la API de Dataproc.
gcloud
Antes de usar cualquiera de los datos de comando a continuación, realiza los siguientes reemplazos:
- PROJECT_ID: Obligatorio. El ID de tu proyecto de Google Cloud que aparece en la Configuración de IAM
- REGION: Obligatorio. Región de Compute Engine.
- SUBNET: Opcional Si no se especifica una subred, se selecciona la subred en la REGION especificada en la red
default
.Ejemplo:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION: Obligatorio. Especifica
latest
para la versión más reciente de la plantilla o la fecha de una versión específica, por ejemplo,2023-03-17_v0.1.0-beta
(visita gs://dataproc-templates-binaries o ejecutagcloud storage ls gs://dataproc-templates-binaries
para ver una lista de las versiones de plantilla disponibles). - PUBSUB_SUBSCRIPTION_PROJECT_ID: Obligatorio. El ID del proyecto de Google Cloud que aparece en la Configuración de IAM y que contiene la suscripción de Pub/Sub de entrada que se leerá.
- SUBSCRIPTION: Obligatorio. Es el nombre de la suscripción a Pub/Sub.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: Obligatorio. Es el nombre del bucket de Cloud Storage en el que se almacenará el resultado.
Nota: Los archivos de salida se almacenarán en la carpeta
output/
dentro del bucket. - FORMAT: Obligatorio. Formato de datos de salida. Opciones:
avro
ojson
.Nota: Si es
avro
, debes agregar "file:///usr/lib/spark/connector/spark-avro.jar
" al campo de API o a la marcajars
de gcloud CLI.Ejemplo (el prefijo
file://
hace referencia a un archivo jar de Dataproc Serverless):--jars=file:///usr/lib/spark/connector/spark-avro.jar,
[ ... other jars] - TIMEOUT: Opcional Es el tiempo en milisegundos antes de que finalice la transmisión. El valor predeterminado es 60000.
- DURATION: Opcional Frecuencia en segundos de las operaciones de escritura en Cloud Storage. El valor predeterminado es de 15 segundos.
- NUM_RECEIVERS: Opcional Cantidad de transmisiones leídas de una suscripción a Pub/Sub en paralelo. El valor predeterminado es 5.
- BATCHSIZE: Opcional Es la cantidad de registros que se insertarán en un viaje de ida y vuelta a Cloud Storage. El valor predeterminado es 1,000.
- SERVICE_ACCOUNT: Opcional Si no se proporciona, se usa la cuenta de servicio de Compute Engine predeterminada.
- PROPERTY y PROPERTY_VALUE: Opcional. Es una lista separada por comas de pares propiedad de Spark=
value
. - LABEL y LABEL_VALUE: Opcional. Es una lista separada por comas de pares
label
=value
. - LOG_LEVEL: Opcional Nivel de registro. Puede ser una de las siguientes opciones:
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
oWARN
. El valor predeterminado esINFO
. -
KMS_KEY: Opcional La clave de Cloud Key Management Service que se usará para la encriptación. Si no se especifica una clave, los datos se encriptan en reposo con una clave que es propiedad de Google y está administrada por Google.
Ejemplo:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
Ejecuta el siguiente comando:
Linux, macOS o Cloud Shell
gcloud dataproc batches submit spark \ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \ --version="1.2" \ --project="PROJECT_ID" \ --region="REGION" \ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \ --subnet="SUBNET" \ --kms-key="KMS_KEY" \ --service-account="SERVICE_ACCOUNT" \ --properties="PROPERTY=PROPERTY_VALUE" \ --labels="LABEL=LABEL_VALUE" \ -- --template=PUBSUBTOGCS \ --templateProperty log.level="LOG_LEVEL" \ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (PowerShell)
gcloud dataproc batches submit spark ` --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ` --version="1.2" ` --project="PROJECT_ID" ` --region="REGION" ` --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ` --subnet="SUBNET" ` --kms-key="KMS_KEY" ` --service-account="SERVICE_ACCOUNT" ` --properties="PROPERTY=PROPERTY_VALUE" ` --labels="LABEL=LABEL_VALUE" ` -- --template=PUBSUBTOGCS ` --templateProperty log.level="LOG_LEVEL" ` --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ` --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ` --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ` --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ` --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ` --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ` --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ` --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (cmd.exe)
gcloud dataproc batches submit spark ^ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^ --version="1.2" ^ --project="PROJECT_ID" ^ --region="REGION" ^ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^ --subnet="SUBNET" ^ --kms-key="KMS_KEY" ^ --service-account="SERVICE_ACCOUNT" ^ --properties="PROPERTY=PROPERTY_VALUE" ^ --labels="LABEL=LABEL_VALUE" ^ -- --template=PUBSUBTOGCS ^ --templateProperty log.level="LOG_LEVEL" ^ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
REST
Antes de usar cualquiera de los datos de solicitud a continuación, realiza los siguientes reemplazos:
- PROJECT_ID: Obligatorio. El ID de tu proyecto de Google Cloud que aparece en la Configuración de IAM
- REGION: Obligatorio. Región de Compute Engine.
- SUBNET: Opcional Si no se especifica una subred, se selecciona la subred en la REGION especificada en la red
default
.Ejemplo:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION: Obligatorio. Especifica
latest
para la versión más reciente de la plantilla o la fecha de una versión específica, por ejemplo,2023-03-17_v0.1.0-beta
(visita gs://dataproc-templates-binaries o ejecutagcloud storage ls gs://dataproc-templates-binaries
para ver una lista de las versiones de plantilla disponibles). - PUBSUB_SUBSCRIPTION_PROJECT_ID: Obligatorio. El ID del proyecto de Google Cloud que aparece en la Configuración de IAM y que contiene la suscripción de Pub/Sub de entrada que se leerá.
- SUBSCRIPTION: Obligatorio. Es el nombre de la suscripción a Pub/Sub.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: Obligatorio. Es el nombre del bucket de Cloud Storage en el que se almacenará el resultado.
Nota: Los archivos de salida se almacenarán en la carpeta
output/
dentro del bucket. - FORMAT: Obligatorio. Formato de datos de salida. Opciones:
avro
ojson
.Nota: Si es
avro
, debes agregar "file:///usr/lib/spark/connector/spark-avro.jar
" al campo de API o a la marcajars
de gcloud CLI.Ejemplo (el prefijo
file://
hace referencia a un archivo jar de Dataproc Serverless):--jars=file:///usr/lib/spark/connector/spark-avro.jar,
[ ... other jars] - TIMEOUT: Opcional Es el tiempo en milisegundos antes de que finalice la transmisión. El valor predeterminado es 60000.
- DURATION: Opcional Frecuencia en segundos de las operaciones de escritura en Cloud Storage. El valor predeterminado es de 15 segundos.
- NUM_RECEIVERS: Opcional Cantidad de transmisiones leídas de una suscripción a Pub/Sub en paralelo. El valor predeterminado es 5.
- BATCHSIZE: Opcional Es la cantidad de registros que se insertarán en un viaje de ida y vuelta a Cloud Storage. El valor predeterminado es 1,000.
- SERVICE_ACCOUNT: Opcional Si no se proporciona, se usa la cuenta de servicio de Compute Engine predeterminada.
- PROPERTY y PROPERTY_VALUE: Opcional. Es una lista separada por comas de pares propiedad de Spark=
value
. - LABEL y LABEL_VALUE: Opcional. Es una lista separada por comas de pares
label
=value
. - LOG_LEVEL: Opcional Nivel de registro. Puede ser una de las siguientes opciones:
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
oWARN
. El valor predeterminado esINFO
. -
KMS_KEY: Opcional La clave de Cloud Key Management Service que se usará para la encriptación. Si no se especifica una clave, los datos se encriptan en reposo con una clave que es propiedad de Google y está administrada por Google.
Ejemplo:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
Método HTTP y URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches
Cuerpo JSON de la solicitud:
{ "environmentConfig":{ "executionConfig":{ "subnetworkUri":"SUBNET", "kmsKey": "KMS_KEY", "serviceAccount": "SERVICE_ACCOUNT" } }, "labels": { "LABEL": "LABEL_VALUE" }, "runtimeConfig": { "version": "1.2", "properties": { "PROPERTY": "PROPERTY_VALUE" } }, "sparkBatch":{ "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate", "args":[ "--template","PUBSUBTOGCS", "--templateProperty","log.level=LOG_LEVEL", "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID", "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION", "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME", "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT", "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT", "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION", "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS", "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE" ], "jarFileUris":[ "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ] } }
Para enviar tu solicitud, expande una de estas opciones:
Deberías recibir una respuesta JSON similar a la que se muestra a continuación:
{ "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata", "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID", "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583", "createTime": "2023-02-24T03:31:03.440329Z", "operationType": "BATCH", "description": "Batch" } }