Vorlage „Cloud Pub/Sub für Cloud Storage“
Verwenden Sie die Dataproc-Serverless-Vorlage „Cloud Pub/Sub to Cloud Storage“, um Daten aus Pub/Sub in Cloud Storage zu extrahieren.
Vorlage verwenden
Führen Sie die Vorlage mit der gcloud CLI oder der Dataproc API aus.
gcloud
Ersetzen Sie folgende Werte, bevor sie einen der Befehlsdaten verwenden:
- PROJECT_ID: erforderlich. Ihre Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
- REGION: erforderlich. Compute Engine-Region
- SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im
default
-Netzwerk ausgewählt.Beispiel:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION: erforderlich. Geben Sie
latest
für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B.2023-03-17_v0.1.0-beta
. Rufen Sie gs://dataproc-templates-binaries auf oder führen Siegcloud storage ls gs://dataproc-templates-binaries
aus, um eine Liste der verfügbaren Vorlagenversionen aufzurufen. - PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist und das zu lesende Pub/Sub-Abo enthält.
- SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.
Hinweis:Die Ausgabedateien werden im Ordner
output/
im Bucket gespeichert. - FORMAT: erforderlich. Ausgabedatenformat. Optionen:
avro
oderjson
Hinweis:Wenn
avro
, müssen Sie demjars
-Flag oder API-Feld der gcloud CLI „file:///usr/lib/spark/connector/spark-avro.jar
“ hinzufügen.Beispiel (das Präfix
file://
verweist auf eine Dataproc Serverless-JAR-Datei):--jars=file:///usr/lib/spark/connector/spark-avro.jar,
[ ... other jars] - TIMEOUT: Optional. Zeit in Millisekunden vor Beendigung des Streams. Der Standardwert ist 60000.
- DURATION: Optional. Häufigkeit der Schreibvorgänge in Cloud Storage in Sekunden. Standardmäßig 15 Sekunden.
- NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Der Standardwert ist 5.
- BATCHSIZE: Optional. Anzahl der Einträge, die in einem Roundtrip in Cloud Storage eingefügt werden sollen. Der Standardwert ist 1.000.
- SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
- PROPERTY und PROPERTY_VALUE:
Optional. Durch Kommas getrennte Liste von Spark-Property=
value
-Paaren. - LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von
label
=value
-Paaren. - LOG_LEVEL: Optional. Protokollierungsebene. Kann
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
oderWARN
sein. Standard:INFO
. -
KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden die Daten inaktiv verschlüsselt. Dazu wird ein Google-owned and Google-managed encryption keyverwendet.
Beispiel:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
Führen Sie folgenden Befehl aus:
Linux, macOS oder Cloud Shell
gcloud dataproc batches submit spark \ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \ --version="1.2" \ --project="PROJECT_ID" \ --region="REGION" \ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \ --subnet="SUBNET" \ --kms-key="KMS_KEY" \ --service-account="SERVICE_ACCOUNT" \ --properties="PROPERTY=PROPERTY_VALUE" \ --labels="LABEL=LABEL_VALUE" \ -- --template=PUBSUBTOGCS \ --templateProperty log.level="LOG_LEVEL" \ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (PowerShell)
gcloud dataproc batches submit spark ` --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ` --version="1.2" ` --project="PROJECT_ID" ` --region="REGION" ` --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ` --subnet="SUBNET" ` --kms-key="KMS_KEY" ` --service-account="SERVICE_ACCOUNT" ` --properties="PROPERTY=PROPERTY_VALUE" ` --labels="LABEL=LABEL_VALUE" ` -- --template=PUBSUBTOGCS ` --templateProperty log.level="LOG_LEVEL" ` --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ` --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ` --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ` --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ` --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ` --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ` --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ` --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (cmd.exe)
gcloud dataproc batches submit spark ^ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^ --version="1.2" ^ --project="PROJECT_ID" ^ --region="REGION" ^ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^ --subnet="SUBNET" ^ --kms-key="KMS_KEY" ^ --service-account="SERVICE_ACCOUNT" ^ --properties="PROPERTY=PROPERTY_VALUE" ^ --labels="LABEL=LABEL_VALUE" ^ -- --template=PUBSUBTOGCS ^ --templateProperty log.level="LOG_LEVEL" ^ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
REST
Ersetzen Sie diese Werte in den folgenden Anfragedaten:
- PROJECT_ID: erforderlich. Ihre Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
- REGION: erforderlich. Compute Engine-Region
- SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im
default
-Netzwerk ausgewählt.Beispiel:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION: erforderlich. Geben Sie
latest
für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B.2023-03-17_v0.1.0-beta
. Rufen Sie gs://dataproc-templates-binaries auf oder führen Siegcloud storage ls gs://dataproc-templates-binaries
aus, um eine Liste der verfügbaren Vorlagenversionen aufzurufen. - PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist und das zu lesende Pub/Sub-Abo enthält.
- SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.
Hinweis:Die Ausgabedateien werden im Ordner
output/
im Bucket gespeichert. - FORMAT: erforderlich. Ausgabedatenformat. Optionen:
avro
oderjson
Hinweis:Wenn
avro
, müssen Sie demjars
-Flag oder API-Feld der gcloud CLI „file:///usr/lib/spark/connector/spark-avro.jar
“ hinzufügen.Beispiel (das Präfix
file://
verweist auf eine Dataproc Serverless-JAR-Datei):--jars=file:///usr/lib/spark/connector/spark-avro.jar,
[ ... other jars] - TIMEOUT: Optional. Zeit in Millisekunden vor Beendigung des Streams. Der Standardwert ist 60000.
- DURATION: Optional. Häufigkeit der Schreibvorgänge in Cloud Storage in Sekunden. Standardmäßig 15 Sekunden.
- NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Der Standardwert ist 5.
- BATCHSIZE: Optional. Anzahl der Einträge, die in einem Roundtrip in Cloud Storage eingefügt werden sollen. Der Standardwert ist 1.000.
- SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
- PROPERTY und PROPERTY_VALUE:
Optional. Durch Kommas getrennte Liste von Spark-Property=
value
-Paaren. - LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von
label
=value
-Paaren. - LOG_LEVEL: Optional. Protokollierungsebene. Kann
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
oderWARN
sein. Standard:INFO
. -
KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden die Daten inaktiv verschlüsselt. Dazu wird ein Google-owned and Google-managed encryption keyverwendet.
Beispiel:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
HTTP-Methode und URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches
JSON-Text anfordern:
{ "environmentConfig":{ "executionConfig":{ "subnetworkUri":"SUBNET", "kmsKey": "KMS_KEY", "serviceAccount": "SERVICE_ACCOUNT" } }, "labels": { "LABEL": "LABEL_VALUE" }, "runtimeConfig": { "version": "1.2", "properties": { "PROPERTY": "PROPERTY_VALUE" } }, "sparkBatch":{ "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate", "args":[ "--template","PUBSUBTOGCS", "--templateProperty","log.level=LOG_LEVEL", "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID", "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION", "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME", "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT", "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT", "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION", "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS", "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE" ], "jarFileUris":[ "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ] } }
Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:
Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:
{ "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata", "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID", "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583", "createTime": "2023-02-24T03:31:03.440329Z", "operationType": "BATCH", "description": "Batch" } }