Vorlage „Cloud Pub/Sub für Cloud Storage“
Mit der Vorlage „Serverless for Apache Spark Cloud Pub/Sub to Cloud Storage“ können Sie Daten aus Pub/Sub in Cloud Storage extrahieren.
Vorlage verwenden
Führen Sie die Vorlage mit der gcloud CLI oder der Dataproc API aus.
gcloud
Ersetzen Sie folgende Werte, bevor sie einen der Befehlsdaten verwenden:
- PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
- REGION: erforderlich. Compute Engine-Region.
- SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im Netzwerk
default
ausgewählt.Beispiel:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION: erforderlich. Geben Sie
latest
für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B.2023-03-17_v0.1.0-beta
. Rufen Sie gs://dataproc-templates-binaries auf oder führen Siegcloud storage ls gs://dataproc-templates-binaries
aus, um die verfügbaren Vorlagenversionen aufzulisten. - PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist und das Pub/Sub-Eingabeabo enthält, das gelesen werden soll.
- SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.
Hinweis:Die Ausgabedateien werden im Ordner
output/
im Bucket gespeichert. - FORMAT: erforderlich. Ausgabedatenformat. Optionen:
avro
oderjson
.Hinweis:Wenn
avro
, müssen Sie dem gcloud CLI-Flag oder API-Feldjars
„file:///usr/lib/spark/connector/spark-avro.jar
“ hinzufügen.Beispiel (das Präfix
file://
verweist auf eine Serverless for Apache Spark-JAR-Datei):--jars=file:///usr/lib/spark/connector/spark-avro.jar,
[ ... other jars] - TIMEOUT: Optional. Zeit in Millisekunden vor dem Beenden des Streams. Der Standardwert ist 60000.
- DURATION: Optional. Häufigkeit der Schreibvorgänge in Cloud Storage in Sekunden. Standardmäßig 15 Sekunden.
- NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Der Standardwert ist 5.
- BATCHSIZE: Optional. Anzahl der Datensätze, die in einem Roundtrip in Cloud Storage eingefügt werden sollen. Der Standardwert ist 1000.
- SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
- PROPERTY und PROPERTY_VALUE:
Optional. Durch Kommas getrennte Liste von Spark-Eigenschaft=
value
-Paaren. - LABEL und LABEL_VALUE:
Optional. Durch Kommas getrennte Liste von
label
=value
-Paaren. - LOG_LEVEL: Optional. Ebene der Protokollierung. Kann einer der folgenden Werte sein:
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
oderWARN
. Standard:INFO
. -
KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden Daten mit einem Google-owned and Google-managed encryption keyim Ruhezustand verschlüsselt.
Beispiel:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
Führen Sie folgenden Befehl aus:
Linux, macOS oder Cloud Shell
gcloud dataproc batches submit spark \ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \ --version="1.2" \ --project="PROJECT_ID" \ --region="REGION" \ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \ --subnet="SUBNET" \ --kms-key="KMS_KEY" \ --service-account="SERVICE_ACCOUNT" \ --properties="PROPERTY=PROPERTY_VALUE" \ --labels="LABEL=LABEL_VALUE" \ -- --template=PUBSUBTOGCS \ --templateProperty log.level="LOG_LEVEL" \ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (PowerShell)
gcloud dataproc batches submit spark ` --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ` --version="1.2" ` --project="PROJECT_ID" ` --region="REGION" ` --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ` --subnet="SUBNET" ` --kms-key="KMS_KEY" ` --service-account="SERVICE_ACCOUNT" ` --properties="PROPERTY=PROPERTY_VALUE" ` --labels="LABEL=LABEL_VALUE" ` -- --template=PUBSUBTOGCS ` --templateProperty log.level="LOG_LEVEL" ` --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ` --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ` --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ` --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ` --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ` --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ` --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ` --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
Windows (cmd.exe)
gcloud dataproc batches submit spark ^ --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^ --version="1.2" ^ --project="PROJECT_ID" ^ --region="REGION" ^ --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^ --subnet="SUBNET" ^ --kms-key="KMS_KEY" ^ --service-account="SERVICE_ACCOUNT" ^ --properties="PROPERTY=PROPERTY_VALUE" ^ --labels="LABEL=LABEL_VALUE" ^ -- --template=PUBSUBTOGCS ^ --templateProperty log.level="LOG_LEVEL" ^ --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^ --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^ --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^ --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^ --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^ --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^ --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^ --templateProperty pubsubtogcs.batch.size="BATCHSIZE"
REST
Ersetzen Sie diese Werte in den folgenden Anfragedaten:
- PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
- REGION: erforderlich. Compute Engine-Region.
- SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im Netzwerk
default
ausgewählt.Beispiel:
projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
- TEMPLATE_VERSION: erforderlich. Geben Sie
latest
für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B.2023-03-17_v0.1.0-beta
. Rufen Sie gs://dataproc-templates-binaries auf oder führen Siegcloud storage ls gs://dataproc-templates-binaries
aus, um die verfügbaren Vorlagenversionen aufzulisten. - PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist und das Pub/Sub-Eingabeabo enthält, das gelesen werden soll.
- SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
- CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.
Hinweis:Die Ausgabedateien werden im Ordner
output/
im Bucket gespeichert. - FORMAT: erforderlich. Ausgabedatenformat. Optionen:
avro
oderjson
.Hinweis:Wenn
avro
, müssen Sie dem gcloud CLI-Flag oder API-Feldjars
„file:///usr/lib/spark/connector/spark-avro.jar
“ hinzufügen.Beispiel (das Präfix
file://
verweist auf eine Serverless for Apache Spark-JAR-Datei):--jars=file:///usr/lib/spark/connector/spark-avro.jar,
[ ... other jars] - TIMEOUT: Optional. Zeit in Millisekunden vor dem Beenden des Streams. Der Standardwert ist 60000.
- DURATION: Optional. Häufigkeit der Schreibvorgänge in Cloud Storage in Sekunden. Standardmäßig 15 Sekunden.
- NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Der Standardwert ist 5.
- BATCHSIZE: Optional. Anzahl der Datensätze, die in einem Roundtrip in Cloud Storage eingefügt werden sollen. Der Standardwert ist 1000.
- SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
- PROPERTY und PROPERTY_VALUE:
Optional. Durch Kommas getrennte Liste von Spark-Eigenschaft=
value
-Paaren. - LABEL und LABEL_VALUE:
Optional. Durch Kommas getrennte Liste von
label
=value
-Paaren. - LOG_LEVEL: Optional. Ebene der Protokollierung. Kann einer der folgenden Werte sein:
ALL
,DEBUG
,ERROR
,FATAL
,INFO
,OFF
,TRACE
oderWARN
. Standard:INFO
. -
KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden Daten mit einem Google-owned and Google-managed encryption keyim Ruhezustand verschlüsselt.
Beispiel:
projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME
HTTP-Methode und URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches
JSON-Text anfordern:
{ "environmentConfig":{ "executionConfig":{ "subnetworkUri":"SUBNET", "kmsKey": "KMS_KEY", "serviceAccount": "SERVICE_ACCOUNT" } }, "labels": { "LABEL": "LABEL_VALUE" }, "runtimeConfig": { "version": "1.2", "properties": { "PROPERTY": "PROPERTY_VALUE" } }, "sparkBatch":{ "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate", "args":[ "--template","PUBSUBTOGCS", "--templateProperty","log.level=LOG_LEVEL", "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID", "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION", "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME", "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT", "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT", "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION", "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS", "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE" ], "jarFileUris":[ "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ] } }
Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:
Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:
{ "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata", "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID", "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583", "createTime": "2023-02-24T03:31:03.440329Z", "operationType": "BATCH", "description": "Batch" } }