Modello da Apache Kafka a Kafka

Il modello da Apache Kafka ad Apache Kafka crea una pipeline di inserimento flussi che importa i dati come byte da un'origine Apache Kafka, quindi li scrive in un sink Apache Kafka.

Requisiti della pipeline

  • Deve esistere l'argomento di origine Apache Kafka.
  • I server dei broker di origine e sink di Apache Kafka devono essere in esecuzione ed essere raggiungibili dalle macchine worker Dataflow.
  • Se utilizzi Apache Kafka per BigQuery come origine o sink, l'argomento deve esistere prima di avviare il modello.

Formato dei messaggi Kafka

I messaggi di origine Apache Kafka vengono letti come byte, mentre i byte vengono scritti nel sink di Apache Kafka.

Autenticazione

Il modello da Apache Kafka ad Apache Kafka supporta SASL/PLAIN e l'autenticazione TLS per i broker Kafka.

Parametri del modello

Parametri obbligatori

  • readBootstrapServerAndTopic : argomento Kafka da cui leggere l'input.
  • kafkaReadAuthenticationMode : la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza NONE per nessuna autenticazione, SASL_PLAIN per nome utente e password SASL/PLAIN e TLS per l'autenticazione basata su certificati. Apache Kafka per BigQuery supporta solo la modalità di autenticazione SASL_PLAIN. Il valore predefinito è: SASL_PLAIN.
  • writeBootstrapServerAndTopic : argomento Kafka in cui scrivere l'output.
  • kafkaWriteAuthenticationMethod : la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza NONE per nessuna autenticazione, SASL_PLAIN per nome utente e password SASL/PLAIN e TLS per l'autenticazione basata su certificati. Il valore predefinito è: NESSUNO.

Parametri facoltativi

  • enableCommitOffsets : esegue il commit degli offset dei messaggi elaborati in Kafka. Se abilitata, questa opzione ridurrà al minimo le lacune o l'elaborazione duplicata dei messaggi quando riavvii la pipeline. Richiede la specifica dell'ID gruppo di consumatori. Il valore predefinito è false.
  • consumerGroupId : l'identificatore univoco del gruppo di consumatori a cui appartiene questa pipeline. Obbligatorio se è abilitato il Commit Offsets in Kafka. Il campo predefinito è vuoto.
  • kafkaReadOffset : il punto di partenza per la lettura dei messaggi quando non esistono offset di commit. La prima parte dell'email parte dall'inizio, la più recente dall'ultimo messaggio. Il valore predefinito è: più recente.
  • kafkaReadUsernameSecretId : l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_PLAIN. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il campo predefinito è vuoto.
  • kafkaReadPasswordSecretId : l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione SASL_PLAIN. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il campo predefinito è vuoto.
  • kafkaReadKeystoreLocation : il percorso di Google Cloud Storage del file Java KeyStore (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. Esempio: gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation : il percorso di Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
  • kafkaReadTruststorePasswordSecretId : l'ID secret di Google Cloud Secret Manager che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka (ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId : l'ID secret di Google Cloud Secret Manager contenente la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeyPasswordSecretId : l'ID secret di Google Cloud Secret Manager contenente la password da utilizzare per accedere alla chiave privata all'interno del file Java KeyStore (JKS) per l'autenticazione TLS Kafka. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaWriteUsernameSecretId : l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka per l'autenticazione SASL_PLAIN con il cluster Kafka di destinazione. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il campo predefinito è vuoto.
  • kafkaWritePasswordSecretId : l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare per l'autenticazione SASL_PLAIN con il cluster Kafka di destinazione. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il campo predefinito è vuoto.
  • kafkaWriteKeystoreLocation : il percorso di Google Cloud Storage del file Java KeyStore (JKS) contenente il certificato TLS e la chiave privata per l'autenticazione con il cluster Kafka di destinazione. Esempio: gs://
  • kafkaWriteTruststoreLocation : il percorso di Google Cloud Storage del file Java TrustStore (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka di destinazione.
  • kafkaWriteTruststorePasswordSecretId : l'ID secret di Google Cloud Secret Manager contenente la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS con il cluster Kafka di destinazione. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaWriteKeystorePasswordSecretId : l'ID secret di Google Cloud Secret Manager contenente la password per accedere al file Java KeyStore (JKS) da utilizzare per l'autenticazione TLS con il cluster Kafka di destinazione. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaWriteKeyPasswordSecretId : l'ID secret di Google Cloud Secret Manager contenente la password da utilizzare per accedere alla chiave privata all'interno del file Java KeyStore (JKS) per l'autenticazione TLS con il cluster Kafka di destinazione. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Kafka to Cloud Storage template.
  6. Inserisci i valori parametro negli appositi campi.
  7. (Facoltativo) Per passare dall'elaborazione "exactly-once" all'impostazione modalità flusso di dati almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco di tua scelta
  • REGION_NAME: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BIGQUERY_TABLE: il nome della tua tabella Cloud Storage
  • KAFKA_TOPICS: elenco degli argomenti di Apache Kakfa. Se vengono forniti più argomenti, le virgole devono essere precedute dal carattere di escape. Vedi gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: L'URI Cloud Storage del file .js che definisce il codice JavaScript definito dall'utente che vuoi utilizzare, ad esempio gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzare

    Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni definite dall'utente.

  • KAFKA_SERVER_ADDRESSES: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP deve contenere il numero di porta da cui è accessibile il server. Ad esempio: 35.70.252.199:9092. Se vengono forniti più indirizzi, devi utilizzare il carattere di escape per le virgole. Vedi gcloud topic escaping.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul API e i relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco di tua scelta
  • LOCATION: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BIGQUERY_TABLE: il nome della tua tabella Cloud Storage
  • KAFKA_TOPICS: elenco degli argomenti di Apache Kakfa. Se vengono forniti più argomenti, le virgole devono essere precedute dal carattere di escape. Vedi gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: L'URI Cloud Storage del file .js che definisce il codice JavaScript definito dall'utente che vuoi utilizzare, ad esempio gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzare

    Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni definite dall'utente.

  • KAFKA_SERVER_ADDRESSES: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP deve contenere il numero di porta da cui è accessibile il server. Ad esempio: 35.70.252.199:9092. Se vengono forniti più indirizzi, devi utilizzare il carattere di escape per le virgole. Vedi gcloud topic escaping.

Per ulteriori informazioni, vedi Scrivere dati da Kafka a Cloud Storage con Dataflow.

Passaggi successivi