Empfangene Ereignisse transformieren

Sie können Ihre Ereignisdaten transformieren, indem Sie Transformationsausdrücke mit CEL schreiben. Beispielsweise lassen sich Ereignisnutzlasten ändern, um dem API-Vertrag eines bestimmten Ziels zu entsprechen.

Ereignisse werden immer im CloudEvents-Format mit einer HTTP-Anfrage im Binärinhaltsmodus gesendet, es sei denn, Sie geben eine Nachrichtenbindung an.

Eingabe- und Ausgabedatenformate festlegen

Sie können nicht nur einen Transformationsausdruck in CEL schreiben, sondern auch optional das Datenformat der eingehenden Ereignisdaten angeben. So weiß Eventarc Advanced, wie die Nutzlast des Ereignisses geparst werden muss. Sie können die Daten auch von einem Format in ein anderes konvertieren.

Die folgenden Formate werden unterstützt: Avro, JSON und Protobuf. Weitere Informationen finden Sie unter Empfangene Ereignisse formatieren.

Transformationsausdrücke

Beim Transformieren von Ereignissen können alle Ereignisattribute in einem CEL-Ausdruck über ein vordefiniertes message-Objekt als Variablen abgerufen werden. Diese Variablen werden zur Laufzeit anhand der Ereignisdaten mit Werten gefüllt. Beispiel:

  • message.id gibt das id-Attribut des Ereignisses zurück
  • message.data gibt eine CEL-Wertdarstellung der Ereignisnutzlast zurück.
  • message.data.some-key gibt den Inhalt eines Felds namens some-key aus der Ereignisnutzlast zurück.

Felder in message.data werden immer als String-Typen dargestellt und Werte werden anhand des Schemas, das beim Festlegen des Eingabedatenformats angegeben wurde, aus dem ursprünglichen Ereignis zugeordnet.

Der Transformationsausdruck sollte ein vollständiges Ereignis ausdrücken, das die Ereigniskontextattribute und die Ereignisdatennutzlast enthält. Ausdrücke werden in JSON geschrieben, aber vordefinierte CEL-Funktionen, ‑Makros und ‑Operatoren sowie reguläre Ausdrücke mit RE2 werden unterstützt. Eventarc Advanced unterstützt auch bestimmte Erweiterungsfunktionen, mit denen sich die Ereignisdaten transformieren lassen.

Im Folgenden finden Sie zwei Beispiele für die Verwendung von CEL-Ausdrücken zum Transformieren von Ereignisdaten. Weitere Anwendungsfälle und Beispiele finden Sie unter Transformationsbeispiele.

Beispiel: Attributwerte formatieren

Im folgenden Beispiel werden phone_number-Attributwerte mithilfe von Funktionen für reguläre Ausdrücke formatiert. (Andere Attribute wurden weggelassen.)

  // Input:
  // {
  //   "data":
  //   {
  //     "email_address": "charlie@altostrat.com",
  //     "phone_number": "8005550100",
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //      "email_domain": "altostrat.com",
  //      "phone_number": "(800) 555-0100",
  //      "area_code": "800",
  //      "local_number": "5550100",
  //    }
  // }

  {
    "data":
    {
      "email_domain": re.capture(
                        message.data.email_address,
                        "\\S+@(\\S+)"),

      "phone_number": re.extract(
                        message.data.phone_number,
                        "^(\\d{3})(\\d{3})(\\d{4})", "(\\1) \\2-\\3"
                      ),

    }.merge ( re.captureN(message.data.phone_number,
                        "^(?P\d{3})[\w\-)(]*(?P\d{7})"
                      )
    )
  }

Im vorherigen Beispiel wurden die folgenden Funktionen für reguläre Ausdrücke verwendet:

  • re.capture: Der erste Wert einer benannten oder unbenannten Gruppe. Argumente:
    • target: String, der geparst werden soll
    • regex: Regulärer Ausdruck zum Erfassen von Werten

    Gibt einen String des ersten erfassten Gruppenwerts zurück.

  • re.captureN: Führt eine vollständige Übereinstimmung zwischen dem angegebenen String und dem regulären Ausdruck durch. Argumente:
    • target: String, der geparst werden soll
    • regex: Regulärer Ausdruck, mit dem Werte erfasst werden

    Gibt eine Map mit Schlüssel/Wert-Paaren für eine benannte Gruppe (Gruppenname, erfasster String) oder eine unbenannte Gruppe (Gruppenindex, erfasster String) zurück.

  • re.extract: Damit werden Gruppenwerte aus dem angegebenen Zielstring abgeglichen und der String neu geschrieben. Argumente:
    • target: String, der geparst werden soll
    • regex: Regulärer Ausdruck, mit dem Werte extrahiert werden
    • rewrite: Regulärer Ausdruck für die Formatierung des Ergebnisses

    Gibt einen String mit den extrahierten Werten zurück, der anhand des Arguments rewrite formatiert wird.

Beispiel: Array einem Array von Objekten zuordnen

Im folgenden Beispiel wird ein Array von Ganzzahlen einem Array von Objekten zugeordnet. (Andere Attribute wurden weggelassen.)

  // Input:
  // {
  //   "data":
  //   {
  //        "product_ids": [1, 2, 3]
  //   }
  // }
  // Output:
  // {
  //    "data":
  //    {
  //             "products": [
  //                {
  //                   "name": "apple",
  //                   "price": 70
  //                },
  //                {
  //                    "name": "orange",
  //                    "price":  80
  //                },
  //                {
  //                    "name": "Product(3)",
  //                    "price": 0
  //                },
  //                {
  //                     "name": "apple",
  //                     "price": 70
  //                }
  //            ]
  //    }
  // }

  {
    "data":
    {
      "products":  message.data.product_ids.map(product_id,
              product_id == 1?
              {
                "name": "apple",
                "price": 70
              } :
              product_id == 2?
              {
                "name": "orange",
                "price":  80
              } :
              // Default:
              {
                "name": "Product(" + string(product_id) + ")",
                "price": 0
              }
          )
    }
  }

Pipeline zum Transformieren von Ereignissen konfigurieren

Sie können eine Pipeline zum Transformieren von Ereignisdaten in der Google Cloud Console oder mit der gcloud CLI konfigurieren.

Es wird nur eine Vermittlung pro Pipeline unterstützt.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Eventarc > Pipelines auf.

    Zu Pipelines

  2. Sie können eine Pipeline erstellen oder, wenn Sie eine Pipeline aktualisieren möchten, auf den Namen der Pipeline klicken.

    Das Aktualisieren einer Pipeline kann länger als 10 Minuten dauern.

  3. Klicken Sie auf der Seite Pipelinedetails auf Bearbeiten.

  4. Führen Sie im Bereich Ereignisvermittlung die folgenden Schritte aus:

    1. Klicken Sie das Kästchen Transformation anwenden an.
    2. Wählen Sie in der Liste Eingehendes Format das entsprechende Format aus.

      Weitere Informationen finden Sie unter Empfangene Ereignisse formatieren.

    3. Geben Sie im Feld CEL-Ausdruck einen Transformationsausdruck in JSON ein. Vordefinierte CEL-Funktionen, ‑Makros und ‑Operatoren sowie reguläre Ausdrücke werden unterstützt. Beispiel:

      {
      "id": message.id,
      "datacontenttype": "application/json",
      "data": "{ \"scrubbed\": \"true\" }"
      }

      Das vorherige Beispiel führt Folgendes aus:

      • Alle Attribute werden aus dem ursprünglichen Ereignis entfernt, mit Ausnahme der id
      • Legen Sie das datacontenttype-Attribut auf application/json fest.
      • Ersetzt die Ereignisnutzlast durch einen statischen JSON-String.
    4. Klicken Sie auf Weiter.

  5. Führen Sie im Bereich Ziel die folgenden Schritte aus:

    1. Wählen Sie gegebenenfalls in der Liste Ausgabeformat ein Format aus.

      Weitere Informationen finden Sie unter Empfangene Ereignisse formatieren.

    2. Optional: Wenden Sie eine Nachrichtenbindung an. Weitere Informationen finden Sie in diesem Dokument im Abschnitt Nachrichtenbindung definieren.

  6. Klicken Sie auf Speichern.

gcloud

  1. Öffnen Sie ein Terminalfenster.

  2. Sie können mit dem Befehl gcloud beta eventarc pipelines update eine Pipeline erstellen oder eine Pipeline aktualisieren:

    Das Aktualisieren einer Pipeline kann länger als 10 Minuten dauern.

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --mediations=transformation_template= \
    {
      TRANSFORMATION_EXPRESSION
    }

    Ersetzen Sie Folgendes:

    • PIPELINE_NAME: ID der Pipeline oder ein voll qualifizierter Name
    • REGION: ein unterstützter Standort für Eventarc Advanced

      Alternativ können Sie das Attribut „Speicherort“ der gcloud CLI festlegen:

      gcloud config set eventarc/location REGION
      
    • TRANSFORMATION_EXPRESSION: Ein Ausdruck, der in JSON geschrieben ist. Vordefinierte CEL-Funktionen, ‑Makros und ‑Operatoren sowie reguläre Ausdrücke werden unterstützt. Mit einem mediations-Flag wird ein transformation_template-Schlüssel angewendet.

    Beispiel:

    gcloud beta eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --mediations=transformation_template= \
    {
    "id": message.id,
    "datacontenttype": "application/json",
    "data": "{ \"scrubbed\": \"true\" }"
    }

    Das vorherige Beispiel führt Folgendes aus:

    • Alle Attribute werden aus dem ursprünglichen Ereignis entfernt, mit Ausnahme der id
    • Legen Sie das datacontenttype-Attribut auf application/json fest.
    • Ersetzt die Ereignisnutzlast durch einen statischen JSON-String.

Erweiterungsfunktionen

Eventarc Advanced unterstützt die folgenden Erweiterungsfunktionen, mit denen die über einen Bus empfangenen Ereignisdaten transformiert werden können.

Funktion Beschreibung
denormalize

Eine Karte oder Liste wird denormalisiert, indem redundante Daten hinzugefügt werden, um die Leseleistung zu verbessern. Feldnamen in der resultierenden Zuordnung werden durch einen Punkt (.) voneinander getrennt. Der Listenindex wird ab 0 in einen Stringschlüssel umgewandelt.

Da in Avro- und Protobuf-Feldnamen kein Punkt (.) verwendet werden kann, sollten Sie diese Funktion nur für JSON-Daten verwenden.

Beispiel: map.() -> map(string, dyn) oder list() -> map(string, dyn)

merge

Fügt zwei Felder zusammen und gibt das kombinierte Feld zurück. Felder mit identischen Namen werden zusammengeführt.

Beispiel: message.(message) -> message

removeFields

Entfernt bestimmte Felder aus einem Ereignis. Feldnamen werden als Pfade aufgelöst. Als Trennzeichen wird der Punkt (.) verwendet.

Es wird erwartet, dass es sich um reinen JSON-Code handelt. Wenn Sie das JSON-Objekt marshallen, wird die Transformation möglicherweise auf einen JSON-String angewendet und führt zu einem Fehler.

Beispiel: message.(list(string)) -> message

setField

Hiermit wird ein Feld des Ereignisses mit einem bestimmten Schlüssel hinzugefügt oder ersetzt. Der Feldname wird als Pfad aufgelöst. Der Punkt (.) wird als Trennzeichen verwendet.

Beispiel: message.(string, dyn) -> message

Beispiel: Ereignisnutzlast ein Attribut hinzufügen, ohne andere Daten zu ändern

// Input:
// {
//   "data": 
//   {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX"
//   }
// }
// Output:
// {
//    "data":
//    {
//        "credit_card_number": "XXXX-XXXX-XXXX-XXXX",
//        "card_type": "credit"
//    }
// }
{
  "data": message.data.merge(
    {
      "card_type": "credit"
    }
  )
}

Beispiel: Liste der Elemente aus der Ereignisnutzlast denormalisieren

// Input:
//{
//"data": 
//   {
//        "products": [
//          {
//            "number": 021774,
//            "type": "perishable",
//            "price": 2.00
//          },
//          {
//            "number": 95602,
//            "type": "diy",
//            "price": 120.00
//          },
//          {
//            "number": 568302,
//            "type": "toys",
//            "price": 12.00
//          }
//        ]
//   }
//}
//
// Output:
//{
//"data":
//    {
//        "products": {
//            "0.number": 021774,
//            "0.type": "perishable",
//            "0.price": 2.00,
//            "1.number": 95602,
//            "1.type": "diy",
//            "1.price": 120.00,
//            "2.number": 568302,
//            "2.type": "toys",
//            "2.price": 12.00
//          }
//   }
//}
//
//
message.setField("data.products", message.data.products.denormalize())

Beispiel: Feld aus der Ereignisnutzlast entfernen

// Input:
// {
//   "data": 
//   {
//     "payment": {
//       "card_number": "XXXX-XXXX-XXXX-XXXX",
//       "card_type": "credit",
//     }
//   }
// }
// Output:
// {
//   "data":
//   {
//     "payment": {
//       "card_type": "credit"
//     }
//   }
// }
message.removeFields(["data.payment.card_number"])

Nachrichtenbindung definieren

Standardmäßig werden Ereignisse immer im CloudEvents-Format mit einer HTTP-Anfrage im Binärinhaltsmodus an ein Ziel gesendet. Sie können dieses Verhalten überschreiben, indem Sie eine Nachrichtenbindung definieren und eine neue HTTP-Anfrage erstellen.

Alle HTTP-Header, die durch andere Richtlinien oder Steuerelemente eingeführt werden (z. B. OAuth- oder OIDC-Tokens), werden beibehalten und mit den Headern aus dem Bindungsausdruck zusammengeführt.

Sie können eine Nachrichtenbindung beim Konfigurieren einer Pipeline in der Google Cloud Console oder mit der gcloud CLI definieren.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Eventarc > Pipelines auf.

    Zu Pipelines

  2. Sie können eine Pipeline erstellen oder, wenn Sie eine Pipeline aktualisieren möchten, auf den Namen der Pipeline klicken.

    Das Aktualisieren einer Pipeline kann länger als 10 Minuten dauern.

  3. Klicken Sie auf der Seite Pipelinedetails auf Bearbeiten.

  4. Wenden Sie im Bereich Ziel eine Nachrichtenbindung an, die ein in JSON geschriebener CEL-Ausdruck ist. Dies führt zu einer neu erstellten HTTP-Anfrage, die dann an das Ziel der Pipeline gesendet wird.

    Weitere Informationen finden Sie in den Abschnitten Auf eingehende Nachrichten zugreifen und HTTP-Anfragen erstellen in diesem Dokument.

  5. Klicken Sie auf Speichern.

gcloud

  1. Öffnen Sie ein Terminalfenster.

  2. Sie können mit dem Befehl gcloud beta eventarc pipelines update eine Pipeline erstellen oder eine Pipeline aktualisieren:

    gcloud beta eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --destinations=http_endpoint_message_binding_template='MESSAGE_BINDING'

    Ersetzen Sie Folgendes:

    • PIPELINE_NAME: ID der Pipeline oder ein voll qualifizierter Name
    • REGION: ein unterstützter Standort für Eventarc Advanced

      Alternativ können Sie das Attribut „Speicherort“ der gcloud CLI festlegen:

      gcloud config set eventarc/location REGION
      
    • MESSAGE_BINDING: Ein in JSON geschriebener CEL-Ausdruck, der zu einer neu erstellten HTTP-Anfrage führt, die dann an das Ziel der Pipeline gesendet wird.

      Weitere Informationen finden Sie in den Abschnitten Auf eingehende Nachrichten zugreifen und HTTP-Anfragen erstellen in diesem Dokument.

    Beispiel:

    gcloud beta eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment, \
    http_endpoint_message_binding_template='{"headers":{"new-header-key": "new-header-value"}}'

    Wenn Sie einen http_endpoint_message_binding_template-Schlüssel verwenden, müssen Sie auch die Schlüssel http_endpoint_uri und network_attachment festlegen.

Auf eingehende Nachrichten zugreifen

So greifen Sie mit einem CEL-Ausdruck auf eine eingehende CloudEvents-Nachricht zu:

  • Verwenden Sie den Wert message.data, um auf das Feld data der eingehenden Nachricht zuzugreifen.
  • Verwenden Sie die message.key-Werte (key ist der Name des Attributs), um auf die Attribute der eingehenden Nachricht zuzugreifen.
  • Verwenden Sie eine headers-Variable, um auf Header zuzugreifen, die der HTTP-Anfrage durch vorherige Vermittlungen in der Verarbeitungskette hinzugefügt wurden. Diese Variable definiert eine Zuordnung von Schlüssel/Wert-Paaren, die den zusätzlichen HTTP-Headern und nicht den ursprünglichen Headern der ursprünglichen eingehenden Anfrage entsprechen.

    Mit dem folgenden CEL-Ausdruck kann beispielsweise eine HTTP-Anfrage nur mit Headern erstellt werden, indem den in vorherigen Pipeline-Vermittlungen hinzugefügten Headern ein zusätzlicher Header hinzugefügt wird:

    {"headers": headers.merge({"new-header-key": "new-header-value"})}

HTTP-Anfragen erstellen

Das Ergebnis des CEL-Ausdrucks muss eine Zuordnung von Schlüssel/Wert-Paaren sein, deren Felder headers und body verwendet werden, um die HTTP-Anfrage wie unten beschrieben zu erstellen.

Für headers-Felder:

  • Wenn durch den CEL-Ausdruck eine headers-Zuordnung vorhanden ist, werden ihre Schlüssel/Wert-Paare direkt den HTTP-Anfrageheadern zugeordnet und ihre Werte werden mit der kanonischen Stringcodierung des entsprechenden Datentyps erstellt.
  • Wenn kein headers-Feld vorhanden ist, enthält die resultierende HTTP-Anfrage keine Header.

Für body-Felder:

  • Wenn als Ergebnis des CEL-Ausdrucks ein body-Feld vorhanden ist, wird sein Wert direkt dem HTTP-Hauptteil zugeordnet.
  • Wenn der Wert des Felds body vom Typ bytes oder string ist, wird er unverändert als HTTP-Anfragetext verwendet. Andernfalls wird er in einen JSON-String konvertiert.
  • Wenn das Feld body nicht vorhanden ist, ist der resultierende HTTP-Anfragetext der Text der endgültigen CloudEvents-HTTP-Nachrichtenbindung im Binärinhaltsmodus.

Alle anderen Felder, die durch den CEL-Ausdruck entstehen, werden ignoriert.

Erweiterungsfunktionen

Eventarc Advanced unterstützt die folgenden Erweiterungsfunktionen, mit denen die Ereignisdaten bei der Angabe einer Nachrichtenbindung transformiert werden können.

Funktion Beschreibung
merge

Eine übergebene CEL-Zuordnung wird mit der CEL-Zuordnung zusammengeführt, auf die die Funktion angewendet wird. Wenn derselbe Schlüssel in beiden Maps vorhanden ist oder der Wert des Schlüssels vom Typ map ist, werden beide Maps zusammengeführt. Andernfalls wird der Wert aus der übergebenen Map verwendet.

Beispiel: map1.merge(map2) -> map3

toBase64

Wandelt einen CEL-Wert in einen base64-URL-codierten String um.

Beispiel: map.toBase64() -> string

toCloudEventJsonWithPayloadFormat

Konvertiert eine Nachricht in eine CEL-Map, die einer JSON-Darstellung einer CloudEvents-Nachricht entspricht, und wendet toDestinationPayloadFormat auf die Nachrichtendaten an. Außerdem wird das datacontenttype des Ereignisses auf das angegebene Ausgangsformat (output_payload_format_*) festgelegt. Wenn kein Ausgangsformat festgelegt ist, wird ein vorhandenes datacontenttype verwendet. Andernfalls wird das datacontenttype nicht festgelegt. Wenn die Nachricht nicht der CloudEvents-Spezifikation entspricht, schlägt die Funktion fehl. Sie können toJsonString verwenden, um die Daten in einen JSON-String umzuwandeln.

Beispiel: message.toCloudEventJsonWithPayloadFormat() -> map.toJsonString() -> string

toDestinationPayloadFormat

Konvertiert message.data in das angegebene Ausgangsformat (output_payload_format_*). Wenn kein Ausgangsformat festgelegt ist, wird message.data unverändert zurückgegeben.

Beispiel: message.data.toDestinationPayloadFormat() -> string or bytes

toJsonString

Konvertiert einen CEL-Wert in einen JSON-String.

Beispiel: map.toJsonString() -> string

toMap

Konvertiert eine CEL-Liste von CEL-Maps in eine einzelne CEL-Map.

Beispiel: list(map).toMap() -> map

Beispiel: Header beibehalten, neuen Header hinzufügen, Textkörper auf Zielformat festlegen

gcloud beta eventarc pipelines create my-pipeline \
    --location=us-central1 \
    --input-payload-format-json='{}' \
    --destinations=http_endpoint_uri='https://example-endpoint.com',network_attachment=my-network-attachment,http_endpoint_message_binding_template='{"headers": headers.merge({"content-type":"application/avro"}), "body": message.data.toDestinationPayloadFormat()"}',output_payload_format_avro_schema_definition='{"schema_definition": "{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},{"name":"account_late","type":"boolean"}]}"}'

Nächste Schritte