Il modello da Apache Kafka a Cloud Storage è una pipeline di inserimento flussi che importa dati di testo da Google Cloud Managed Service per Apache Kafka e invia i record a Cloud Storage.
Puoi utilizzare il modello Apache Kafka to BigQuery anche con Kafka autogestito o esterno.
Requisiti della pipeline
- Deve esistere il bucket Cloud Storage di output.
- Il server del broker Apache Kafka deve essere in esecuzione ed essere raggiungibile dalle macchine worker Dataflow.
- Gli argomenti Apache Kafka devono esistere.
Formato dei messaggi Kafka
Il modello Apache Kafka to Cloud Storage supporta la lettura dei messaggi da Kafka nei seguenti formati: CONFLUENT_AVRO_WIRE_FORMAT
e JSON
.
Formato file di output
Il formato del file di output è lo stesso del messaggio Kafka di input. Ad esempio, se selezioni JSON per il formato dei messaggi Kafka, i file JSON vengono scritti nel bucket Cloud Storage di output.
Autenticazione
Il modello da Apache Kafka a Cloud Storage supporta l'autenticazione SASL/PLAIN per i broker Kafka.
Parametri del modello
Parametri obbligatori
- readBootstrapServerAndTopic : argomento Kafka da cui leggere l'input.
- kafkaReadAuthenticationMode: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza NONE per l'autenticazione e SASL_PLAIN per il nome utente e la password SASL/PLAIN. Apache Kafka per BigQuery supporta solo la modalità di autenticazione SASL_PLAIN. Il valore predefinito è: SASL_PLAIN.
- outputDirectory : il percorso e il prefisso del nome file per la scrittura dei file di output. Deve terminare con una barra. (ad es. gs://your-bucket/your-path/).
- messageFormat : il formato dei messaggi Kafka da leggere. I valori supportati sono AVRO_CONFLUENT_WIRE_FORMAT (Avro codificato da Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binario semplice) e JSON. Il valore predefinito è: AVRO_CONFLUENT_WIRE_FORMAT.
Parametri facoltativi
- windowDuration : durata/dimensione della finestra in cui i dati verranno scritti in Cloud Storage. I formati consentiti sono: Ns (per secondi, esempio: 5s), Nm (per minuti, esempio: 12m), Nh (per ore, esempio: 2h). (ad es. 5 m). Il valore predefinito è 5 min.
- outputFilenamePrefix: il prefisso da inserire in ogni file a finestra. (Esempio: output-). Il valore predefinito è: output.
- numShards: il numero massimo di shard di output prodotti durante la scrittura. Un numero più elevato di shard significa una maggiore velocità effettiva per la scrittura in Cloud Storage, ma un costo di aggregazione dei dati potenzialmente maggiore negli shard durante l'elaborazione dei file Cloud Storage di output. Il valore predefinito è deciso da Dataflow.
- enableCommitOffsets : esegue il commit degli offset dei messaggi elaborati in Kafka. Se abilitata, questa opzione ridurrà al minimo le lacune o l'elaborazione duplicata dei messaggi quando riavvii la pipeline. È necessario specificare l'ID gruppo di consumatori. Il valore predefinito è false.
- consumerGroupId: l'identificatore univoco del gruppo di consumatori a cui appartiene questa pipeline. Obbligatorio se è attivata l'opzione Commit Offsets to Kafka. Il valore predefinito è vuoto.
- kafkaReadOffset: il punto di partenza per la lettura dei messaggi quando non esistono offset committati. La prima inizia dall'inizio, l'ultima dal messaggio più recente. Il valore predefinito è latest.
- kafkaReadUsernameSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_PLAIN. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il campo predefinito è vuoto.
- kafkaReadPasswordSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene la password di Kafka da utilizzare con l'autenticazione SASL_PLAIN. (ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il valore predefinito è vuoto.
- schemaFormat : il formato dello schema Kafka. Può essere fornito come SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Se è specificato SINGLE_SCHEMA_FILE, tutti i messaggi devono avere lo schema menzionato nel file dello schema avro. Se viene specificato SCHEMA_REGISTRY, i messaggi possono avere un solo schema o più schemi. Il valore predefinito è: SINGLE_SCHEMA_FILE.
- confluentAvroSchemaPath: il percorso di Google Cloud Storage del singolo file dello schema Avro utilizzato per decodificare tutti i messaggi di un argomento. Il campo predefinito è vuoto.
- schemaRegistryConnectionUrl : l'URL dell'istanza del Confluent Schema Registry utilizzato per gestire gli schemi Avro per la decodifica dei messaggi. Il campo predefinito è vuoto.
- binaryAvroSchemaPath : il percorso di Google Cloud Storage del file di schema Avro utilizzato per decodificare i messaggi Avro con codifica binaria. Il valore predefinito è vuoto.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Kafka to Cloud Storage template.
- Inserisci i valori parametro negli appositi campi.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità flusso di dati Almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaREGION_NAME
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: il nome della tabella Cloud StorageKAFKA_TOPICS
: elenco degli argomenti di Apache Kakfa. Se vengono forniti più argomenti, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: l'URI Cloud Storage del file.js
che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempiogs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzareAd esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni definite dall'utente.KAFKA_SERVER_ADDRESSES
: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP deve avere il numero di porta da cui è accessibile il server. Ad esempio:35.70.252.199:9092
. Se vengono forniti più indirizzi, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaLOCATION
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: il nome della tabella Cloud StorageKAFKA_TOPICS
: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire la fuga delle virgole. Vedigcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: L'URI Cloud Storage del file.js
che definisce il codice JavaScript definito dall'utente che vuoi utilizzare, ad esempiogs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzareAd esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni definite dall'utente.KAFKA_SERVER_ADDRESSES
: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP deve contenere il numero di porta da cui è accessibile il server. Ad esempio:35.70.252.199:9092
. Se vengono forniti più indirizzi, devi eseguire la fuga delle virgole. Vedigcloud topic escaping
.
Per ulteriori informazioni, consulta Scrivere dati da Kafka a Cloud Storage con Dataflow.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.