Vorlage „Cloud Pub/Sub für Cloud Storage“
Verwenden Sie die Dataproc-Vorlage "Server Serverless Cloud Pub/Sub to Cloud Storage", um Daten aus Pub/Sub in Cloud Storage zu extrahieren.
Vorlage verwenden
Führen Sie die Vorlage mithilfe der gcloud CLI oder Dataproc API aus.
gcloud
Bevor Sie die folgenden Befehlsdaten verwenden, ersetzen Sie die folgenden Werte:
- PROJECT_ID: erforderlich. Ihre Google Cloud-Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
- REGION: erforderlich. Compute Engine-Region.
- SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im Netzwerk
default
ausgewählt.Beispiel:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION: erforderlich. Geben Sie
latest
für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B.2023-03-17_v0.1.0-beta
. Rufen Sie gs://dataproc-templates-binaries auf oder führen Siegsutil ls gs://dataproc-templates-binaries
aus, um die verfügbaren Vorlagenversionen aufzulisten. - PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud-Projekt-ID, die in den IAM-Einstellungen aufgeführt ist, die das zu lesende Pub/Sub-Eingabeabo enthält.
- SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.
Hinweis: Die Ausgabedateien werden im Ordner
output/
des Buckets gespeichert. - FORMAT: erforderlich. Format der Ausgabedaten. Optionen:
avro
oderjson
.Hinweis: Wenn
avro
, müssen Sie „file:///usr/lib/spark/external/spark-avro.jar
“ zum gcloud CLI-Flag oder API-Feldjars
hinzufügen.Beispiel (das Präfix
file://
verweist auf eine JAR-Datei von Dataproc Serverless):--jars=file:///usr/lib/spark/external/spark-avro.jar,
[ ... weitere Gläser] - TIMEOUT: Optional. Zeit in Millisekunden bis zum Beenden des Streams. Die Standardeinstellung ist 60000.
- DURATION: Optional. Häufigkeit in Sekunden für Schreibvorgänge in Cloud Storage. Die Standardeinstellung ist 15 Sekunden.
- NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Die Standardeinstellung ist 5.
- BATCHSIZE: Optional. Anzahl der Datensätze, die bei einem Umlauf in Cloud Storage eingefügt werden sollen. Die Standardeinstellung ist 1000.
- SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
- PROPERTY und PROPERTY_VALUE: Optional. Durch Kommas getrennte Liste von Spark property=
value
-Paaren. - LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von
label
=value
-Paaren. - LOG_LEVEL: Optional. Protokollebene. Kann
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
oderWARN
sein. Standardeinstellung:INFO
. -
KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden ruhende Daten mit einem Schlüssel verschlüsselt, der Google gehört und von Google verwaltet wird.
Beispiel:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
Führen Sie den folgenden Befehl aus:
Linux, macOS oder Cloud Shell
gcloud dataproc batches submit spark \ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \ --version="1.1" \ --project="PROJECT_ID" \ --region="REGION" \ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \ --subnet="SUBNET" \ --kms-key="KMS_KEY" \ --service-account="SERVICE_ACCOUNT" \ --properties="PROPERTY=PROPERTY_VALUE" \ --labels="LABEL=LABEL_VALUE" \ -- --template=PUBSUBTOGCS \ --templateProperty log.level="LOG_LEVEL" \ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (PowerShell)
gcloud dataproc batches submit spark ` --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ` --version="1.1" ` --project="PROJECT_ID" ` --region="REGION" ` --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ` --subnet="SUBNET" ` --kms-key="KMS_KEY" ` --service-account="SERVICE_ACCOUNT" ` --properties="PROPERTY=PROPERTY_VALUE" ` --labels="LABEL=LABEL_VALUE" ` -- --template=PUBSUBTOGCS ` --templateProperty log.level="LOG_LEVEL" ` --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ` --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ` --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ` --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ` --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ` --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ` --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ` --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (cmd.exe)
gcloud dataproc batches submit spark ^ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^ --version="1.1" ^ --project="PROJECT_ID" ^ --region="REGION" ^ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^ --subnet="SUBNET" ^ --kms-key="KMS_KEY" ^ --service-account="SERVICE_ACCOUNT" ^ --properties="PROPERTY=PROPERTY_VALUE" ^ --labels="LABEL=LABEL_VALUE" ^ -- --template=PUBSUBTOGCS ^ --templateProperty log.level="LOG_LEVEL" ^ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
REST
Bevor Sie die Anfragedaten verwenden, ersetzen Sie die folgenden Werte:
- PROJECT_ID: erforderlich. Ihre Google Cloud-Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
- REGION: erforderlich. Compute Engine-Region.
- SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im Netzwerk
default
ausgewählt.Beispiel:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION: erforderlich. Geben Sie
latest
für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B.2023-03-17_v0.1.0-beta
. Rufen Sie gs://dataproc-templates-binaries auf oder führen Siegsutil ls gs://dataproc-templates-binaries
aus, um die verfügbaren Vorlagenversionen aufzulisten. - PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud-Projekt-ID, die in den IAM-Einstellungen aufgeführt ist, die das zu lesende Pub/Sub-Eingabeabo enthält.
- SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.
Hinweis: Die Ausgabedateien werden im Ordner
output/
des Buckets gespeichert. - FORMAT: erforderlich. Format der Ausgabedaten. Optionen:
avro
oderjson
.Hinweis: Wenn
avro
, müssen Sie „file:///usr/lib/spark/external/spark-avro.jar
“ zum gcloud CLI-Flag oder API-Feldjars
hinzufügen.Beispiel (das Präfix
file://
verweist auf eine JAR-Datei von Dataproc Serverless):--jars=file:///usr/lib/spark/external/spark-avro.jar,
[ ... weitere Gläser] - TIMEOUT: Optional. Zeit in Millisekunden bis zum Beenden des Streams. Die Standardeinstellung ist 60000.
- DURATION: Optional. Häufigkeit in Sekunden für Schreibvorgänge in Cloud Storage. Die Standardeinstellung ist 15 Sekunden.
- NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Die Standardeinstellung ist 5.
- BATCHSIZE: Optional. Anzahl der Datensätze, die bei einem Umlauf in Cloud Storage eingefügt werden sollen. Die Standardeinstellung ist 1000.
- SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
- PROPERTY und PROPERTY_VALUE: Optional. Durch Kommas getrennte Liste von Spark property=
value
-Paaren. - LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von
label
=value
-Paaren. - LOG_LEVEL: Optional. Protokollebene. Kann
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
oderWARN
sein. Standardeinstellung:INFO
. -
KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden ruhende Daten mit einem Schlüssel verschlüsselt, der Google gehört und von Google verwaltet wird.
Beispiel:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
HTTP-Methode und URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches
JSON-Text der Anfrage:
{ "environmentConfig":{ "executionConfig":{ "subnetworkUri":"SUBNET", "kmsKey": "KMS_KEY", "serviceAccount": "SERVICE_ACCOUNT" } }, "labels": { "LABEL": "LABEL_VALUE" }, "runtimeConfig": { "version": "1.1", "properties": { "PROPERTY": "PROPERTY_VALUE" } }, "sparkBatch":{ "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate", "args":[ "--template","PUBSUBTOGCS", "--templateProperty","log.level=LOG_LEVEL", "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID", "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION", "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME", "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT", "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT", "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION", "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS", "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE" ], "jarFileUris":[ "file:///usr/lib/spark/external/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ] } }
Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:
Sie sollten in etwa folgende JSON-Antwort erhalten:
{ "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata", "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID", "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583", "createTime": "2023-02-24T03:31:03.440329Z", "operationType": "BATCH", "description": "Batch" } }