Vorlage „Cloud Pub/Sub für Cloud Storage“

Verwenden Sie die Dataproc-Serverless-Vorlage „Cloud Pub/Sub to Cloud Storage“, um Daten aus Pub/Sub in Cloud Storage zu extrahieren.

Vorlage verwenden

Führen Sie die Vorlage mit der gcloud CLI oder der Dataproc API aus.

gcloud

Ersetzen Sie folgende Werte, bevor sie einen der Befehlsdaten verwenden:

  • PROJECT_ID: erforderlich. Ihre Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
  • REGION: erforderlich. Compute Engine-Region
  • SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im default-Netzwerk ausgewählt.

    Beispiel: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: erforderlich. Geben Sie latest für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B. 2023-03-17_v0.1.0-beta. Rufen Sie gs://dataproc-templates-binaries auf oder führen Sie gcloud storage ls gs://dataproc-templates-binaries aus, um eine Liste der verfügbaren Vorlagenversionen aufzurufen.
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist und das zu lesende Pub/Sub-Abo enthält.
  • SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.

    Hinweis:Die Ausgabedateien werden im Ordner output/ im Bucket gespeichert.

  • FORMAT: erforderlich. Ausgabedatenformat. Optionen: avro oder json

    Hinweis:Wenn avro, müssen Sie dem jars-Flag oder API-Feld der gcloud CLI „file:///usr/lib/spark/connector/spark-avro.jar“ hinzufügen.

    Beispiel (das Präfix file:// verweist auf eine Dataproc Serverless-JAR-Datei):

    --jars=file:///usr/lib/spark/connector/spark-avro.jar, [ ... other jars]
  • TIMEOUT: Optional. Zeit in Millisekunden vor Beendigung des Streams. Der Standardwert ist 60000.
  • DURATION: Optional. Häufigkeit der Schreibvorgänge in Cloud Storage in Sekunden. Standardmäßig 15 Sekunden.
  • NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Der Standardwert ist 5.
  • BATCHSIZE: Optional. Anzahl der Einträge, die in einem Roundtrip in Cloud Storage eingefügt werden sollen. Der Standardwert ist 1.000.
  • SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
  • PROPERTY und PROPERTY_VALUE: Optional. Durch Kommas getrennte Liste von Spark-Property=value-Paaren.
  • LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von label=value-Paaren.
  • LOG_LEVEL: Optional. Protokollierungsebene. Kann ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE oder WARN sein. Standard: INFO.
  • KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden die Daten inaktiv verschlüsselt. Dazu wird ein Google-owned and Google-managed encryption keyverwendet.

    Beispiel: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

Führen Sie folgenden Befehl aus:

Linux, macOS oder Cloud Shell

gcloud dataproc batches submit spark \
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \
    --version="1.2" \
    --project="PROJECT_ID" \
    --region="REGION" \
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \
    --subnet="SUBNET" \
    --kms-key="KMS_KEY" \
    --service-account="SERVICE_ACCOUNT" \
    --properties="PROPERTY=PROPERTY_VALUE" \
    --labels="LABEL=LABEL_VALUE" \
    -- --template=PUBSUBTOGCS \
    --templateProperty log.level="LOG_LEVEL" \
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (PowerShell)

gcloud dataproc batches submit spark `
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate `
    --version="1.2" `
    --project="PROJECT_ID" `
    --region="REGION" `
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" `
    --subnet="SUBNET" `
    --kms-key="KMS_KEY" `
    --service-account="SERVICE_ACCOUNT" `
    --properties="PROPERTY=PROPERTY_VALUE" `
    --labels="LABEL=LABEL_VALUE" `
    -- --template=PUBSUBTOGCS `
    --templateProperty log.level="LOG_LEVEL" `
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" `
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" `
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" `
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" `
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" `
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" `
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" `
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (cmd.exe)

gcloud dataproc batches submit spark ^
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^
    --version="1.2" ^
    --project="PROJECT_ID" ^
    --region="REGION" ^
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^
    --subnet="SUBNET" ^
    --kms-key="KMS_KEY" ^
    --service-account="SERVICE_ACCOUNT" ^
    --properties="PROPERTY=PROPERTY_VALUE" ^
    --labels="LABEL=LABEL_VALUE" ^
    -- --template=PUBSUBTOGCS ^
    --templateProperty log.level="LOG_LEVEL" ^
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

REST

Ersetzen Sie diese Werte in den folgenden Anfragedaten:

  • PROJECT_ID: erforderlich. Ihre Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
  • REGION: erforderlich. Compute Engine-Region
  • SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im default-Netzwerk ausgewählt.

    Beispiel: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: erforderlich. Geben Sie latest für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B. 2023-03-17_v0.1.0-beta. Rufen Sie gs://dataproc-templates-binaries auf oder führen Sie gcloud storage ls gs://dataproc-templates-binaries aus, um eine Liste der verfügbaren Vorlagenversionen aufzurufen.
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist und das zu lesende Pub/Sub-Abo enthält.
  • SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.

    Hinweis:Die Ausgabedateien werden im Ordner output/ im Bucket gespeichert.

  • FORMAT: erforderlich. Ausgabedatenformat. Optionen: avro oder json

    Hinweis:Wenn avro, müssen Sie dem jars-Flag oder API-Feld der gcloud CLI „file:///usr/lib/spark/connector/spark-avro.jar“ hinzufügen.

    Beispiel (das Präfix file:// verweist auf eine Dataproc Serverless-JAR-Datei):

    --jars=file:///usr/lib/spark/connector/spark-avro.jar, [ ... other jars]
  • TIMEOUT: Optional. Zeit in Millisekunden vor Beendigung des Streams. Der Standardwert ist 60000.
  • DURATION: Optional. Häufigkeit der Schreibvorgänge in Cloud Storage in Sekunden. Standardmäßig 15 Sekunden.
  • NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Der Standardwert ist 5.
  • BATCHSIZE: Optional. Anzahl der Einträge, die in einem Roundtrip in Cloud Storage eingefügt werden sollen. Der Standardwert ist 1.000.
  • SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
  • PROPERTY und PROPERTY_VALUE: Optional. Durch Kommas getrennte Liste von Spark-Property=value-Paaren.
  • LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von label=value-Paaren.
  • LOG_LEVEL: Optional. Protokollierungsebene. Kann ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE oder WARN sein. Standard: INFO.
  • KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden die Daten inaktiv verschlüsselt. Dazu wird ein Google-owned and Google-managed encryption keyverwendet.

    Beispiel: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

HTTP-Methode und URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches

JSON-Text anfordern:


{
  "environmentConfig":{
    "executionConfig":{
      "subnetworkUri":"SUBNET",
      "kmsKey": "KMS_KEY",
      "serviceAccount": "SERVICE_ACCOUNT"
    }
  },
  "labels": {
    "LABEL": "LABEL_VALUE"
  },
  "runtimeConfig": {
    "version": "1.2",
    "properties": {
      "PROPERTY": "PROPERTY_VALUE"
    }
  },
  "sparkBatch":{
    "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate",
    "args":[
      "--template","PUBSUBTOGCS",
      "--templateProperty","log.level=LOG_LEVEL",
      "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID",
      "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION",
      "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME",
      "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT",
      "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT",
      "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION",
      "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS",
      "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE"
    ],
    "jarFileUris":[
      "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar"
    ]
  }
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:


{
  "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata",
    "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID",
    "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583",
    "createTime": "2023-02-24T03:31:03.440329Z",
    "operationType": "BATCH",
    "description": "Batch"
  }
}