Vorlage „Cloud Pub/Sub für Cloud Storage“

Verwenden Sie die Dataproc-Vorlage "Server Serverless Cloud Pub/Sub to Cloud Storage", um Daten aus Pub/Sub in Cloud Storage zu extrahieren.

Vorlage verwenden

Führen Sie die Vorlage mithilfe der gcloud CLI oder Dataproc API aus.

gcloud

Bevor Sie die folgenden Befehlsdaten verwenden, ersetzen Sie die folgenden Werte:

  • PROJECT_ID: erforderlich. Ihre Google Cloud-Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
  • REGION: erforderlich. Compute Engine-Region.
  • SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im Netzwerk default ausgewählt.

    Beispiel: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: erforderlich. Geben Sie latest für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B. 2023-03-17_v0.1.0-beta. Rufen Sie gs://dataproc-templates-binaries auf oder führen Sie gsutil ls gs://dataproc-templates-binaries aus, um die verfügbaren Vorlagenversionen aufzulisten.
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud-Projekt-ID, die in den IAM-Einstellungen aufgeführt ist, die das zu lesende Pub/Sub-Eingabeabo enthält.
  • SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.

    Hinweis: Die Ausgabedateien werden im Ordner output/ des Buckets gespeichert.

  • FORMAT: erforderlich. Format der Ausgabedaten. Optionen: avro oder json.

    Hinweis: Wenn avro, müssen Sie „file:///usr/lib/spark/external/spark-avro.jar“ zum gcloud CLI-Flag oder API-Feld jars hinzufügen.

    Beispiel (das Präfix file:// verweist auf eine JAR-Datei von Dataproc Serverless):

    --jars=file:///usr/lib/spark/external/spark-avro.jar, [ ... weitere Gläser]
  • TIMEOUT: Optional. Zeit in Millisekunden bis zum Beenden des Streams. Die Standardeinstellung ist 60000.
  • DURATION: Optional. Häufigkeit in Sekunden für Schreibvorgänge in Cloud Storage. Die Standardeinstellung ist 15 Sekunden.
  • NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Die Standardeinstellung ist 5.
  • BATCHSIZE: Optional. Anzahl der Datensätze, die bei einem Umlauf in Cloud Storage eingefügt werden sollen. Die Standardeinstellung ist 1000.
  • SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
  • PROPERTY und PROPERTY_VALUE: Optional. Durch Kommas getrennte Liste von Spark property=value-Paaren.
  • LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von label=value-Paaren.
  • LOG_LEVEL: Optional. Protokollebene. Kann ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE oder WARN sein. Standardeinstellung: INFO.
  • KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden ruhende Daten mit einem Schlüssel verschlüsselt, der Google gehört und von Google verwaltet wird.

    Beispiel: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

Führen Sie den folgenden Befehl aus:

Linux, macOS oder Cloud Shell

gcloud dataproc batches submit spark \
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \
    --version="1.1" \
    --project="PROJECT_ID" \
    --region="REGION" \
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \
    --subnet="SUBNET" \
    --kms-key="KMS_KEY" \
    --service-account="SERVICE_ACCOUNT" \
    --properties="PROPERTY=PROPERTY_VALUE" \
    --labels="LABEL=LABEL_VALUE" \
    -- --template=PUBSUBTOGCS \
    --templateProperty log.level="LOG_LEVEL" \
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (PowerShell)

gcloud dataproc batches submit spark `
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate `
    --version="1.1" `
    --project="PROJECT_ID" `
    --region="REGION" `
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" `
    --subnet="SUBNET" `
    --kms-key="KMS_KEY" `
    --service-account="SERVICE_ACCOUNT" `
    --properties="PROPERTY=PROPERTY_VALUE" `
    --labels="LABEL=LABEL_VALUE" `
    -- --template=PUBSUBTOGCS `
    --templateProperty log.level="LOG_LEVEL" `
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" `
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" `
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" `
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" `
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" `
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" `
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" `
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (cmd.exe)

gcloud dataproc batches submit spark ^
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^
    --version="1.1" ^
    --project="PROJECT_ID" ^
    --region="REGION" ^
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^
    --subnet="SUBNET" ^
    --kms-key="KMS_KEY" ^
    --service-account="SERVICE_ACCOUNT" ^
    --properties="PROPERTY=PROPERTY_VALUE" ^
    --labels="LABEL=LABEL_VALUE" ^
    -- --template=PUBSUBTOGCS ^
    --templateProperty log.level="LOG_LEVEL" ^
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

REST

Bevor Sie die Anfragedaten verwenden, ersetzen Sie die folgenden Werte:

  • PROJECT_ID: erforderlich. Ihre Google Cloud-Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
  • REGION: erforderlich. Compute Engine-Region.
  • SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im Netzwerk default ausgewählt.

    Beispiel: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: erforderlich. Geben Sie latest für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B. 2023-03-17_v0.1.0-beta. Rufen Sie gs://dataproc-templates-binaries auf oder führen Sie gsutil ls gs://dataproc-templates-binaries aus, um die verfügbaren Vorlagenversionen aufzulisten.
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud-Projekt-ID, die in den IAM-Einstellungen aufgeführt ist, die das zu lesende Pub/Sub-Eingabeabo enthält.
  • SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.

    Hinweis: Die Ausgabedateien werden im Ordner output/ des Buckets gespeichert.

  • FORMAT: erforderlich. Format der Ausgabedaten. Optionen: avro oder json.

    Hinweis: Wenn avro, müssen Sie „file:///usr/lib/spark/external/spark-avro.jar“ zum gcloud CLI-Flag oder API-Feld jars hinzufügen.

    Beispiel (das Präfix file:// verweist auf eine JAR-Datei von Dataproc Serverless):

    --jars=file:///usr/lib/spark/external/spark-avro.jar, [ ... weitere Gläser]
  • TIMEOUT: Optional. Zeit in Millisekunden bis zum Beenden des Streams. Die Standardeinstellung ist 60000.
  • DURATION: Optional. Häufigkeit in Sekunden für Schreibvorgänge in Cloud Storage. Die Standardeinstellung ist 15 Sekunden.
  • NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Die Standardeinstellung ist 5.
  • BATCHSIZE: Optional. Anzahl der Datensätze, die bei einem Umlauf in Cloud Storage eingefügt werden sollen. Die Standardeinstellung ist 1000.
  • SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
  • PROPERTY und PROPERTY_VALUE: Optional. Durch Kommas getrennte Liste von Spark property=value-Paaren.
  • LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von label=value-Paaren.
  • LOG_LEVEL: Optional. Protokollebene. Kann ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE oder WARN sein. Standardeinstellung: INFO.
  • KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden ruhende Daten mit einem Schlüssel verschlüsselt, der Google gehört und von Google verwaltet wird.

    Beispiel: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

HTTP-Methode und URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches

JSON-Text der Anfrage:


{
  "environmentConfig":{
    "executionConfig":{
      "subnetworkUri":"SUBNET",
      "kmsKey": "KMS_KEY",
      "serviceAccount": "SERVICE_ACCOUNT"
    }
  },
  "labels": {
    "LABEL": "LABEL_VALUE"
  },
  "runtimeConfig": {
    "version": "1.1",
    "properties": {
      "PROPERTY": "PROPERTY_VALUE"
    }
  },
  "sparkBatch":{
    "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate",
    "args":[
      "--template","PUBSUBTOGCS",
      "--templateProperty","log.level=LOG_LEVEL",
      "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID",
      "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION",
      "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME",
      "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT",
      "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT",
      "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION",
      "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS",
      "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE"
    ],
    "jarFileUris":[
      "file:///usr/lib/spark/external/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar"
    ]
  }
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten in etwa folgende JSON-Antwort erhalten:


{
  "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata",
    "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID",
    "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583",
    "createTime": "2023-02-24T03:31:03.440329Z",
    "operationType": "BATCH",
    "description": "Batch"
  }
}