E-Commerce-Streamingpipeline erstellen


In dieser Anleitung erstellen Sie eine Dataflow-Streamingpipeline, die E-Commerce-Daten aus Pub/Sub-Themen und -Abos transformiert und die Daten in BigQuery und Bigtable ausgibt. Für diese Anleitung ist Gradle erforderlich.

Die Anleitung bietet eine durchgängige E-Commerce-Beispielanwendung, die Daten aus einem Webstore an BigQuery und Bigtable streamt. Die Beispielanwendung veranschaulicht gängige Anwendungsfälle und Best Practices für die Implementierung von Streamingdatenanalysen und KI in Echtzeit. In dieser Anleitung erfahren Sie, wie Sie dynamisch auf Kundenaktionen reagieren, um Ereignisse in Echtzeit zu analysieren und auf sie zu reagieren. In dieser Anleitung erfahren Sie, wie Sie Ereignisdaten speichern, analysieren und visualisieren, um mehr über das Kundenverhalten zu erfahren.

Die Beispielanwendung steht auf GitHub zur Verfügung. Zum Ausführen dieser Anleitung mit Terraform folgen Sie den Schritten in der Beispielanwendung auf GitHub.

Ziele

  • Die Validierung eingehender Daten und Korrekturen, falls sie möglich sind.
  • Die Analyse von Clickstream-Daten, um die Anzahl von Aufrufen pro Produkt in einem bestimmten Zeitraum festzustellen. Speichern Sie diese Informationen in einem Speicher mit niedriger Latenz. Die Anwendung kann dann die Daten verwenden, um Kunden auf der Website Nachrichten vom Typ Anzahl von Nutzern, dieses Produkt angesehen haben zu präsentieren.
  • Verwenden Sie Transaktionsdaten, um die Inventarreihenfolge festzulegen:

    • Analysieren Sie Transaktionsdaten, um die Gesamtzahl von Verkäufen für jeden Artikel sowohl im Geschäft als auch weltweit für einen bestimmten Zeitraum zu berechnen.
    • Analysieren Sie Inventardaten, um das eingehende Inventar für jeden Artikel zu berechnen.
    • Übergeben Sie diese Daten kontinuierlich an Inventarsysteme, damit sie für Entscheidungen zum Inventarkauf verwendet werden können.
  • Die Validierung eingehender Daten und Korrekturen, falls sie möglich sind. Schreiben Sie nicht korrigierbare Daten zur weiteren Analyse und Verarbeitung in eine Dead-Letter-Warteschlange. Erstellen Sie einen Messwert, der den Prozentsatz der eingehenden Daten darstellt, die an die Dead-Letter-Warteschlange für Monitoring und Benachrichtigungen gesendet werden.

  • Verarbeiten Sie alle eingehenden Daten in einem Standardformat und speichern Sie sie in einem Data Warehouse, um sie für Analysen und Visualisierungen zu verwenden.

  • Denormalisieren Sie Transaktionsdaten für Ladenverkäufe, sodass sie Informationen wie Breiten- und Längengrad des Geschäftsstandorts enthalten können. Stellen Sie die Speicherinformationen über eine sich langsam ändernde Tabelle in BigQuery bereit, wobei Sie die Geschäfts-ID als Schlüssel verwenden.

Daten

Die Anwendung verarbeitet die folgenden Datentypen:

  • Clickstream-Daten, die von Onlinesystemen an Pub/Sub gesendet werden
  • Transaktionsdaten, die von lokalen oder SaaS-Systemen (Software as a Service) an Pub/Sub gesendet werden
  • Lagerdaten, die von lokalen oder SaaS-Systemen an Pub/Sub gesendet werden

Aufgabenmuster

Die Anwendung enthält die folgenden Aufgabenmuster, die häufig mit Pipelines verwendet werden, die mit dem Apache Beam SDK für Java erstellt wurden:

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Erstellen Sie ein nutzerverwaltetes Worker-Dienstkonto für Ihre neue Pipeline und weisen Sie dem Dienstkonto die erforderlichen Rollen zu.

    1. Führen Sie den Befehl gcloud iam service-accounts create aus, um das Dienstkonto zu erstellen.

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      Ersetzen Sie SERVICE_ACCOUNT_ROLE durch jede einzelne Rolle.

    3. Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie Zugriffstokens für das Dienstkonto erstellen können:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  17. Laden Sie bei Bedarf Gradle herunter und installieren Sie es.

Beispielquellen und -senken erstellen

In diesem Abschnitt wird Folgendes erstellt:

  • Ein Cloud Storage-Bucket, der als temporärer Speicherort verwendet werden soll
  • Datenquellen mit Pub/Sub streamen
  • Datasets zum Laden der Daten in BigQuery
  • Eine Bigtable-Instanz

Cloud Storage-Bucket erstellen

Erstellen Sie zuerst einen Cloud Storage-Bucket. Dieser Bucket wird von der Dataflow-Pipeline als temporärer Speicherort verwendet.

Führen Sie den Befehl gcloud storage buckets create aus:

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

Ersetzen Sie Folgendes:

Pub/Sub-Themen und -Abos erstellen

Erstellen Sie vier Pub/Sub-Themen und erstellen Sie dann drei Abos.

Führen Sie zum Erstellen der Themen den Befehl gcloud pubsub topics create für jedes Thema einmal aus. Informationen zum Benennen eines Abos finden Sie unter Richtlinien für die Benennung eines Themas oder Abos.

gcloud pubsub topics create TOPIC_NAME

Ersetzen Sie TOPIC_NAME durch die folgenden Werte und führen Sie den Befehl viermal einmal für jedes Thema aus:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

Führen Sie den Befehl gcloud pubsub subscriptions create für jedes Abo einmal aus, um ein Abo für Ihr Thema zu erstellen:

  1. Ein Clickstream-inbound-sub-Abo erstellen:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. Ein Transactions-inbound-sub-Abo erstellen:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. Ein Inventory-inbound-sub-Abo erstellen:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

BigQuery-Datasets und -Tabelle erstellen

Erstellen Sie ein BigQuery-Dataset und eine partitionierte Tabelle mit dem entsprechenden Schema für Ihr Pub/Sub-Thema.

  1. Verwenden Sie den Befehl bq mk, um das erste Dataset zu erstellen.

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. Erstellen Sie das zweite Dataset.

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. Verwenden Sie die SQL-Anweisung CREATE TABLE, um eine Tabelle mit einem Schema und Testdaten zu erstellen. Die Testdaten haben einen Speicher mit dem ID-Wert 1. Das Nebeneingabemuster für langsame Aktualisierungen verwendet diese Tabelle.

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

Bigtable-Instanz und -Tabelle erstellen

Bigtable-Instanz und -Tabelle erstellen Weitere Informationen zum Erstellen von Bigtable-Instanzen finden Sie unter Instanz erstellen.

  1. Führen Sie bei Bedarf den folgenden Befehl aus, um die cbt-Befehlszeile zu installieren:

    gcloud components install cbt
    
  2. Verwenden Sie den Befehl bigtable instances create, um eine Instanz zu erstellen:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    Ersetzen Sie CLUSTER_ZONE durch die Zone, in der der Cluster ausgeführt wird.

  3. Zum Erstellen einer permanente Tabelle verwenden Sie den Befehl cbt createtable.

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. Verwenden Sie den folgenden Befehl, um eine Spaltenfamilie der Tabelle hinzuzufügen:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

Pipeline ausführen

Führen Sie mit Gradle eine Streaming-Pipeline aus. Informationen zum von der Pipeline verwendeten Java-Code finden Sie unter RetailDataProcessingPipeline.java.

  1. Klonen Sie das GitHub-Repository mit dem Befehl git clone:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. Wechseln Sie in das Anwendungsverzeichnis:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. Führen Sie zum Testen der Pipeline in der Shell oder dem Terminal den folgenden Befehl mit Gradle aus:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. Führen Sie mit Gradle den folgenden Befehl aus, um die Pipeline auszuführen:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID"
    

Sehen Sie sich den Pipeline-Quellcode auf GitHub an.

Cloud Scheduler-Jobs erstellen und ausführen

Erstellen und führen Sie drei Cloud Scheduler-Jobs aus, einen, der Clickstream-Daten veröffentlicht, einen für Inventardaten und einen für Transaktionsdaten. Mit diesem Schritt werden Beispieldaten für die Pipeline generiert.

  1. Verwenden Sie den Befehl gcloud scheduler jobs create, um einen Cloud Scheduler-Job für diese Anleitung zu erstellen. Mit diesem Schritt wird ein Publisher für Clickstream-Daten erstellt, der eine Nachricht pro Minute veröffentlicht.

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. Verwenden Sie den Befehl gcloud scheduler jobs run, um den Cloud Scheduler-Job zu starten.

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. Einen weiteren ähnlichen Publisher für Inventardaten erstellen und ausführen, die alle zwei Minuten eine Nachricht veröffentlicht.

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. Starten Sie den zweiten Cloud Scheduler-Job.

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. Erstellen und führen Sie einen dritten Publisher für Transaktionsdaten aus, die alle zwei Minuten eine Nachricht veröffentlichen.

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. Starten Sie den dritten Cloud Scheduler-Job.

    gcloud scheduler jobs run --location=LOCATION transactions
    

Ergebnisse ansehen

Sehen Sie sich die Daten an, die in Ihren BigQuery-Tabellen geschrieben wurden. Prüfen Sie die Ergebnisse in BigQuery mit den folgenden Abfragen: Während diese Pipeline ausgeführt wird, können Sie jede Minute neue Zeilen an die BigQuery-Tabellen anhängen.

Möglicherweise müssen Sie warten, bis die Tabellen mit Daten gefüllt sind.

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

Projekt löschen

Am einfachsten können Sie weitere Kosten vermeiden, wenn Sie das Google Cloud-Projekt löschen, das Sie für die Anleitung erstellt haben.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Einzelne Ressourcen löschen

Wenn Sie das Projekt wiederverwenden möchten, löschen Sie die für die Anleitung erstellten Ressourcen.

Google Cloud-Projektressourcen bereinigen

  1. Verwenden Sie den Befehl gcloud scheduler jobs delete, um die Cloud Scheduler-Jobs zu löschen.

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. Verwenden Sie zum Löschen der Pub/Sub-Abos und -Themen die Befehle gcloud pubsub subscriptions delete und gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. Verwenden Sie zum Löschen der BigQuery-Tabelle den Befehl bq rm.

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. BigQuery-Datasets löschen. Das Dataset allein verursacht keine Gebühren.

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. Verwenden Sie zum Löschen der Bigtable-Instanz den Befehl cbt deleteinstance. Für den Bucket fallen keine Gebühren an.

    cbt deleteinstance aggregate-tables
    
  6. Verwenden Sie den Befehl gcloud storage rm, um den Cloud Storage-Bucket zu löschen. Für den Bucket fallen keine Gebühren an.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Anmeldedaten entfernen

  1. Widerrufen Sie die Rollen, die Sie dem nutzerverwalteten Worker-Dienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Nächste Schritte