Modèle Cloud Pub/Sub vers Cloud Storage
Utilisez le modèle Dataproc Cloud Pub/Sub vers Cloud Storage sans serveur pour extraire depuis Pub/Sub vers Cloud Storage.
Utiliser le modèle
Exécutez le modèle à l'aide de la CLI gcloud ou de l'API Dataproc.
gcloud
Avant d'utiliser les données de la commande ci-dessous, effectuez les remplacements suivants :
- PROJECT_ID : valeur obligatoire. L'ID de votre projet Google Cloud indiqué dans les paramètres IAM.
- REGION : valeur obligatoire. Région Compute Engine
- SUBNET : facultatif. Si aucun sous-réseau n'est spécifié, le sous-réseau
dans la région spécifiée dans le réseau
default
est sélectionné.Exemple
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION : valeur obligatoire. Spécifiez
latest
pour la dernière version du modèle ou la date d'une version spécifique, par exemple2023-03-17_v0.1.0-beta
(accédez à gs://dataproc-templates-binaries ou exécutezgcloud storage ls gs://dataproc-templates-binaries
pour afficher les versions de modèle disponibles). - PUBSUB_SUBSCRIPTION_PROJECT_ID : valeur obligatoire. L'ID du projet Google Cloud indiqué dans les paramètres IAM qui contiennent l'abonnement Pub/Sub d'entrée à lire.
- SUBSCRIPTION : valeur obligatoire. Nom de l'abonnement Pub/Sub.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME : valeur obligatoire. Nom du bucket Cloud Storage dans lequel le résultat sera stocké.
Remarque : Les fichiers de sortie seront stockés dans le dossier
output/
du bucket. - FORMAT : valeur obligatoire. Format des données de sortie. Options:
avro
oujson
.Remarque : Si la valeur est
avro
, vous devez ajouter "file:///usr/lib/spark/external/spark-avro.jar
" au champ de l'indicateur ou de l'API de la CLI gcloudjars
.Exemple (le préfixe
file://
fait référence à un fichier JAR Dataproc sans serveur):--jars=file:///usr/lib/spark/external/spark-avro.jar,
[ ... other jars] - TIMEOUT : facultatif. Durée en millisecondes avant l'arrêt du flux. La valeur par défaut est 60000.
- DURATION : facultatif. Fréquence en secondes des écritures dans Cloud Storage. La valeur par défaut est 15 secondes.
- NUM_RECEIVERS : facultatif. Nombre de flux lus en parallèle à partir d'un abonnement Pub/Sub. La valeur par défaut est 5.
- BATCHSIZE : facultatif. Nombre d'enregistrements à insérer en un aller-retour dans Cloud Storage. La valeur par défaut est 1000.
- SERVICE_ACCOUNT : facultatif. Si aucune valeur n'est fournie, le compte de service Compute Engine par défaut est utilisé.
- PROPERTY et PROPERTY_VALUE : facultatives. Liste de paires propriété Spark=
value
séparées par une virgule. - LABEL et LABEL_VALUE:
Facultatif. Liste de paires
label
=value
séparées par une virgule. - LOG_LEVEL : facultatif. Niveau de journalisation Il peut s'agir de
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
ouWARN
. Valeur par défaut :INFO
. -
KMS_KEY : facultatif. Clé Cloud Key Management Service à utiliser pour le chiffrement. Si aucune clé n'est spécifiée, les données sont chiffrées au repos à l'aide d'une clé appartenant à Google et gérée par Google.
Exemple
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
Exécutez la commande suivante :
Linux, macOS ou Cloud Shell
gcloud dataproc batches submit spark \ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \ --version="1.1" \ --project="PROJECT_ID" \ --region="REGION" \ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \ --subnet="SUBNET" \ --kms-key="KMS_KEY" \ --service-account="SERVICE_ACCOUNT" \ --properties="PROPERTY=PROPERTY_VALUE" \ --labels="LABEL=LABEL_VALUE" \ -- --template=PUBSUBTOGCS \ --templateProperty log.level="LOG_LEVEL" \ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (PowerShell)
gcloud dataproc batches submit spark ` --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ` --version="1.1" ` --project="PROJECT_ID" ` --region="REGION" ` --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ` --subnet="SUBNET" ` --kms-key="KMS_KEY" ` --service-account="SERVICE_ACCOUNT" ` --properties="PROPERTY=PROPERTY_VALUE" ` --labels="LABEL=LABEL_VALUE" ` -- --template=PUBSUBTOGCS ` --templateProperty log.level="LOG_LEVEL" ` --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ` --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ` --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ` --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ` --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ` --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ` --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ` --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (cmd.exe)
gcloud dataproc batches submit spark ^ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^ --version="1.1" ^ --project="PROJECT_ID" ^ --region="REGION" ^ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^ --subnet="SUBNET" ^ --kms-key="KMS_KEY" ^ --service-account="SERVICE_ACCOUNT" ^ --properties="PROPERTY=PROPERTY_VALUE" ^ --labels="LABEL=LABEL_VALUE" ^ -- --template=PUBSUBTOGCS ^ --templateProperty log.level="LOG_LEVEL" ^ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
REST
Avant d'utiliser les données de requête ci-dessous, effectuez les remplacements suivants :
- PROJECT_ID : valeur obligatoire. ID de votre projet Google Cloud répertorié dans les paramètres IAM.
- REGION : valeur obligatoire. Compute Engine région.
- SUBNET : facultatif. Si aucun sous-réseau n'est spécifié, le sous-réseau
dans la région spécifiée dans le réseau
default
est sélectionné.Exemple
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION : valeur obligatoire. Spécifiez
latest
pour la dernière version du modèle ou la date d'une version spécifique, par exemple2023-03-17_v0.1.0-beta
(accédez à gs://dataproc-templates-binaries ou exécutezgcloud storage ls gs://dataproc-templates-binaries
pour afficher les versions de modèle disponibles). - PUBSUB_SUBSCRIPTION_PROJECT_ID : valeur obligatoire. ID de projet Google Cloud indiqué dans les paramètres IAM contenant l'abonnement Pub/Sub d'entrée à lire.
- SUBSCRIPTION : valeur obligatoire. Nom de l'abonnement Pub/Sub.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME : valeur obligatoire. Nom du bucket Cloud Storage dans lequel le résultat sera stocké.
Remarque : Les fichiers de sortie seront stockés dans le dossier
output/
du bucket. - FORMAT : valeur obligatoire. Format des données de sortie. Options:
avro
oujson
.Remarque : Si la valeur est
avro
, vous devez ajouter "file:///usr/lib/spark/external/spark-avro.jar
" au champ de l'indicateur ou de l'API de la CLI gcloudjars
.Exemple (le préfixe
file://
fait référence à un fichier JAR Dataproc sans serveur):--jars=file:///usr/lib/spark/external/spark-avro.jar,
[ ... other jars] - TIMEOUT : facultatif. Durée en millisecondes avant l'arrêt du flux. La valeur par défaut est 60000.
- DURATION : facultatif. Fréquence en secondes des écritures dans Cloud Storage. La valeur par défaut est 15 secondes.
- NUM_RECEIVERS : facultatif. Nombre de flux lus en parallèle à partir d'un abonnement Pub/Sub. La valeur par défaut est 5.
- BATCHSIZE : facultatif. Nombre d'enregistrements à insérer en un aller-retour dans Cloud Storage. La valeur par défaut est 1000.
- SERVICE_ACCOUNT : facultatif. Si aucune valeur n'est fournie, le compte de service Compute Engine par défaut est utilisé.
- PROPERTY et PROPERTY_VALUE : facultatives. Liste de valeurs séparées par une virgule
Propriété Spark=
value
. - LABEL et LABEL_VALUE : facultatives. Liste de paires
label
=value
séparées par une virgule. - LOG_LEVEL : facultatif. Niveau de journalisation Il peut s'agir de
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
ouWARN
. Valeur par défaut :INFO
. -
KMS_KEY : facultatif. Clé Cloud Key Management Service à utiliser pour le chiffrement. Si aucune clé n'est spécifiée, les données sont chiffrées au repos à l'aide d'une clé détenue et gérée par Google.
Exemple
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
Méthode HTTP et URL :
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches
Corps JSON de la requête :
{ "environmentConfig":{ "executionConfig":{ "subnetworkUri":"SUBNET", "kmsKey": "KMS_KEY", "serviceAccount": "SERVICE_ACCOUNT" } }, "labels": { "LABEL": "LABEL_VALUE" }, "runtimeConfig": { "version": "1.1", "properties": { "PROPERTY": "PROPERTY_VALUE" } }, "sparkBatch":{ "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate", "args":[ "--template","PUBSUBTOGCS", "--templateProperty","log.level=LOG_LEVEL", "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID", "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION", "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME", "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT", "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT", "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION", "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS", "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE" ], "jarFileUris":[ "file:///usr/lib/spark/external/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ] } }
Pour envoyer votre requête, développez l'une des options suivantes :
Vous devriez recevoir une réponse JSON de ce type :
{ "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata", "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID", "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583", "createTime": "2023-02-24T03:31:03.440329Z", "operationType": "BATCH", "description": "Batch" } }