Il modello Apache Kafka to Cloud Storage è una pipeline di streaming che importa dati di testo da Google Cloud Managed Service per Apache Kafka e restituisce i record in Cloud Storage.
Puoi utilizzare il modello Apache Kafka to BigQuery anche con Kafka autogestito o esterno.
Requisiti della pipeline
- Il bucket Cloud Storage di output deve esistere.
- Il server broker Apache Kafka deve essere in esecuzione ed essere raggiungibile dalle macchine worker di Dataflow.
- Gli argomenti Apache Kafka devono esistere.
Formato dei messaggi Kafka
Il modello Apache Kafka to Cloud Storage supporta la lettura dei messaggi da Kafka nei seguenti formati: CONFLUENT_AVRO_WIRE_FORMAT
e JSON
.
Formato file di output
Il formato del file di output è lo stesso del messaggio Kafka di input. Ad esempio, se selezioni JSON per il formato dei messaggi Kafka, i file JSON vengono scritti nel bucket Cloud Storage di output.
Autenticazione
Il modello Apache Kafka to Cloud Storage supporta l'autenticazione SASL/PLAIN ai broker Kafka.
Parametri del modello
Parametri obbligatori
- readBootstrapServerAndTopic: l'argomento Kafka da cui leggere l'input.
- outputDirectory: il percorso e il prefisso del nome file per la scrittura dei file di output. Deve terminare con una barra. Ad esempio,
gs://your-bucket/your-path/
. - kafkaReadAuthenticationMode: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza
NONE
per nessuna autenticazione,SASL_PLAIN
per nome utente e password SASL/PLAIN eTLS
per l'autenticazione basata su certificato. Apache Kafka for BigQuery supporta solo la modalità di autenticazioneSASL_PLAIN
. Il valore predefinito è SASL_PLAIN. - messageFormat: il formato dei messaggi Kafka da leggere. I valori supportati sono
AVRO_CONFLUENT_WIRE_FORMAT
(Avro con codifica Confluent Schema Registry),AVRO_BINARY_ENCODING
(Avro binario non codificato) eJSON
. Il valore predefinito è: AVRO_CONFLUENT_WIRE_FORMAT. - useBigQueryDLQ: se true, i messaggi non riusciti verranno scritti in BigQuery con informazioni aggiuntive sugli errori. Il valore predefinito è false.
Parametri facoltativi
- windowDuration: la durata/la dimensione della finestra in cui i dati verranno scritti in Cloud Storage. I formati consentiti sono: Ns (per secondi, ad esempio 5s), Nm (per minuti, ad esempio 12m), Nh (per ore, ad esempio 2h). Ad esempio,
5m
. Il valore predefinito è 5 m. - outputFilenamePrefix: il prefisso da inserire in ogni file analizzato. Ad esempio,
output-
. Il valore predefinito è output. - numShards: il numero massimo di shard di output prodotti durante la scrittura. Un numero maggiore di shard comporta una maggiore velocità effettiva per la scrittura in Cloud Storage, ma un costo potenzialmente più elevato per l'aggregazione dei dati tra gli shard durante l'elaborazione dei file di output di Cloud Storage. Il valore predefinito è deciso da Dataflow.
- enableCommitOffsets: esegui il commit degli offset dei messaggi elaborati in Kafka. Se questa opzione è attivata, le lacune o l'elaborazione duplicata dei messaggi vengono ridotte al minimo al riavvio della pipeline. È necessario specificare l'ID gruppo di consumatori. Il valore predefinito è false.
- consumerGroupId: l'identificatore univoco del gruppo di consumatori a cui appartiene questa pipeline. Obbligatorio se è attivata l'opzione Commit Offsets to Kafka. Il valore predefinito è vuoto.
- kafkaReadOffset: il punto di partenza per la lettura dei messaggi quando non esistono offset committati. La prima inizia dall'inizio, l'ultima dal messaggio più recente. Il valore predefinito è latest.
- kafkaReadUsernameSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene il nome utente Kafka da utilizzare con l'autenticazione
SASL_PLAIN
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Il valore predefinito è vuoto. - kafkaReadPasswordSecretId: l'ID segreto di Google Cloud Secret Manager contenente la password di Kafka da utilizzare con l'autenticazione
SASL_PLAIN
. Ad esempio:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Il valore predefinito è vuoto. - kafkaReadKeystoreLocation: il percorso di Google Cloud Storage del file dell'archivio chiavi Java (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. Ad esempio,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: il percorso di Google Cloud Storage del file dell'archivio attendibilità Java (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
- kafkaReadTruststorePasswordSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: l'ID secret di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: l'ID secret di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere alla chiave privata all'interno del file dell'archivio chiavi Java (JKS) per l'autenticazione TLS di Kafka. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaFormat: il formato dello schema Kafka. Può essere fornito come
SINGLE_SCHEMA_FILE
oSCHEMA_REGISTRY
. Se è specificatoSINGLE_SCHEMA_FILE
, utilizza lo schema indicato nel file dello schema avro per tutti i messaggi. Se viene specificatoSCHEMA_REGISTRY
, i messaggi possono avere un singolo schema o più schemi. Il valore predefinito è SINGLE_SCHEMA_FILE. - confluentAvroSchemaPath: il percorso di Google Cloud Storage al singolo file dello schema Avro utilizzato per decodificare tutti i messaggi di un argomento. Il valore predefinito è vuoto.
- schemaRegistryConnectionUrl: l'URL dell'istanza Confluent Schema Registry utilizzata per gestire gli schemi Avro per la decodifica dei messaggi. Il valore predefinito è vuoto.
- binaryAvroSchemaPath: il percorso di Google Cloud Storage al file dello schema Avro utilizzato per decodificare i messaggi Avro con codifica binaria. Il valore predefinito è vuoto.
- schemaRegistryAuthenticationMode: modalità di autenticazione di Schema Registry. Può essere NONE, TLS o OAUTH. Il valore predefinito è NESSUNO.
- schemaRegistryTruststoreLocation: posizione del certificato SSL in cui sono archiviati l'archivio di attendibilità per l'autenticazione a Schema Registry. Ad esempio,
/your-bucket/truststore.jks
. - schemaRegistryTruststorePasswordSecretId: SecretId in Secret Manager in cui è archiviata la password per accedere al secret nel truststore. Ad esempio,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeystoreLocation: posizione dell'archivio chiavi contenente il certificato SSL e la chiave privata. Ad esempio,
/your-bucket/keystore.jks
. - schemaRegistryKeystorePasswordSecretId: SecretId in Secret Manager dove la password per accedere al file del keystore, ad esempio
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeyPasswordSecretId: SecretId della password richiesta per accedere alla chiave privata del client archiviata nel keystore. Ad esempio,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryOauthClientId: l'ID client utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId: l'ID segreto di Google Cloud Secret Manager contenente il segreto client da utilizzare per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT. Ad esempio,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaRegistryOauthScope: l'ambito del token di accesso utilizzato per autenticare il client Schema Registry in modalità OAUTH. Questo campo è facoltativo, poiché la richiesta può essere effettuata senza che venga passato un parametro di ambito. Ad esempio,
openid
. - schemaRegistryOauthTokenEndpointUrl: l'URL basato su HTTP(S) per il provider di identità OAuth/OIDC utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable: nome completo della tabella BigQuery per i messaggi non riusciti. I messaggi che non sono riusciti a raggiungere la tabella di output per diversi motivi (ad es. schema non corrispondente, JSON in formato errato) vengono scritti in questa tabella. La tabella verrà creata dal modello. Ad esempio,
your-project-id:your-dataset.your-table-name
.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Kafka to Cloud Storage template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming Almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: il nome della tabella Cloud StorageKAFKA_TOPICS
: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: l'URI Cloud Storage del file.js
che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempiogs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzareAd esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF.KAFKA_SERVER_ADDRESSES
: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP deve avere il numero di porta da cui è accessibile il server. Ad esempio:35.70.252.199:9092
. Se vengono forniti più indirizzi, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: il nome della tabella Cloud StorageKAFKA_TOPICS
: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: l'URI Cloud Storage del file.js
che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempiogs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzareAd esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF.KAFKA_SERVER_ADDRESSES
: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP deve avere il numero di porta da cui è accessibile il server. Ad esempio:35.70.252.199:9092
. Se vengono forniti più indirizzi, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.
Per ulteriori informazioni, consulta Scrivere dati da Kafka in Cloud Storage con Dataflow.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.