Modelli di utilità forniti da Google

Google mette a disposizione una serie di modelli Dataflow open source. Per informazioni generali sui modelli, consulta Modelli di Dataflow. Per un elenco di tutti i modelli forniti da Google, consulta la Guida introduttiva ai modelli forniti da Google.

Questa guida documenta i modelli di utilità.

Conversione del formato file (Avro, Parquet, CSV)

Il modello di conversione del formato file è una pipeline batch che converte i file archiviati in Cloud Storage da un formato supportato a un altro.

Sono supportate le seguenti conversioni di formato:

  • CSV ad Avro
  • Da CSV a Parquet
  • Da Avro a Parquet
  • Da Parquet ad Avro

Requisiti per questa pipeline:

  • Il bucket Cloud Storage di output deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametro Descrizione
inputFileFormat Il formato del file di input. Il valore deve essere [csv, avro, parquet].
outputFileFormat Il formato file di output. Il valore deve essere [avro, parquet].
inputFileSpec Il pattern del percorso di Cloud Storage per i file di input. Ad esempio, gs://bucket-name/path/*.csv
outputBucket La cartella Cloud Storage per scrivere i file di output. Questo percorso deve terminare con una barra. Ad esempio, gs://bucket-name/output/
schema Percorso Cloud Storage del file di schema Avro. Ad esempio, gs://bucket-name/schema/my-schema.avsc
containsHeaders (Facoltativo) I file CSV di input contengono un record di intestazione (true/false). Il valore predefinito è false. Obbligatorio solo durante la lettura dei file CSV.
csvFormat (Facoltativo) La specifica del formato CSV da utilizzare per analizzare i record. Il valore predefinito è Default. Per saperne di più, consulta la pagina Formato CSV Apache.
delimiter (Facoltativo) Il delimitatore di campo utilizzato dai file CSV di input.
outputFilePrefix (Facoltativo) Il prefisso del file di output. Il valore predefinito è output.
numShards (Facoltativo) Il numero di shard del file di output.

Esecuzione del modello di conversione Formato file

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Convert file formats template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • INPUT_FORMAT: il formato del file di input; deve essere [csv, avro, parquet]
  • OUTPUT_FORMAT: il formato file dei file di output; deve essere [avro, parquet]
  • INPUT_FILES: pattern del percorso per i file di input
  • OUTPUT_FOLDER: la tua cartella Cloud Storage per i file di output
  • SCHEMA: percorso al file di schema Avro

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui suoi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • INPUT_FORMAT: il formato del file di input; deve essere [csv, avro, parquet]
  • OUTPUT_FORMAT: il formato file dei file di output; deve essere [avro, parquet]
  • INPUT_FILES: pattern del percorso per i file di input
  • OUTPUT_FOLDER: la tua cartella Cloud Storage per i file di output
  • SCHEMA: percorso al file di schema Avro

Comprimi in blocco i file di Cloud Storage

Il modello di compressione collettiva dei file Cloud Storage è una pipeline batch che comprime i file in Cloud Storage in una posizione specificata. Questo modello può essere utile quando devi comprimere gruppi di file di grandi dimensioni come parte di un processo di archiviazione periodico. Le modalità di compressione supportate sono: BZIP2, DEFLATE e GZIP. I file in uscita nella posizione di destinazione seguiranno uno schema di denominazione del nome file originale aggiunto con l'estensione della modalità di compressione. Le estensioni aggiunte saranno una delle seguenti: .bzip2, .deflate, .gz.

Eventuali errori che si verificano durante il processo di compressione verranno restituiti al file di errore nel formato CSV nome file, messaggio di errore. Se non si verificano errori durante l'esecuzione della pipeline, il file di errore viene comunque creato, ma non contiene record di errori.

Requisiti per questa pipeline:

  • La compressione deve essere in uno dei seguenti formati: BZIP2, DEFLATE e GZIP.
  • La directory di output deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametro Descrizione
inputFilePattern Il pattern del file di input da cui leggere. Ad esempio, gs://bucket-name/uncompressed/*.txt.
outputDirectory La posizione di output in cui scrivere. Ad esempio, gs://bucket-name/compressed/.
outputFailureFile Il file di output del log degli errori da utilizzare per gli errori di scrittura che si verificano durante il processo di compressione. Ad esempio, gs://bucket-name/compressed/failed.csv. Se non sono presenti errori, il file viene comunque creato, ma sarà vuoto. I contenuti del file sono in formato CSV (nome file, errore) e sono composti da una riga per ogni file che non viene compresso.
compression L'algoritmo di compressione utilizzato per comprimere i file abbinati. Il valore deve essere uno dei seguenti: BZIP2, DEFLATE, GZIP

Esecuzione del modello di compressione collettiva dei file Cloud Storage

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Bulk Compress Files on Cloud Storage template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • BUCKET_NAME: nome del bucket Cloud Storage
  • COMPRESSION: la tua scelta di algoritmo di compressione

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui suoi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • BUCKET_NAME: nome del bucket Cloud Storage
  • COMPRESSION: la tua scelta di algoritmo di compressione

Decomprimi in blocco i file di Cloud Storage

Il modello di decompressione collettiva di Cloud Storage Files è una pipeline batch che decomprime i file su Cloud Storage in una posizione specificata. Questa funzionalità è utile quando vuoi utilizzare dati compressi per ridurre al minimo i costi della larghezza di banda di rete durante una migrazione, ma vuoi massimizzare la velocità di elaborazione analitica operando su dati non compressi dopo la migrazione. La pipeline gestisce automaticamente più modalità di compressione durante una singola esecuzione e determina la modalità di decompressione da utilizzare in base all'estensione del file (.bzip2, .deflate, .gz, .zip).

Requisiti per questa pipeline:

  • I file da decomprimere devono avere uno dei seguenti formati: Bzip2, Deflate, Gzip e Zip.
  • La directory di output deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametro Descrizione
inputFilePattern Il pattern del file di input da cui leggere. Ad esempio, gs://bucket-name/compressed/*.gz.
outputDirectory La posizione di output in cui scrivere. Ad esempio, gs://bucket-name/decompressed.
outputFailureFile Il file di output del log degli errori da utilizzare per gli errori di scrittura che si verificano durante il processo di decompressione. Ad esempio, gs://bucket-name/decompressed/failed.csv. Se non sono presenti errori, il file viene comunque creato, ma sarà vuoto. I contenuti del file sono in formato CSV (nome file, errore) e sono composti da una riga per ogni file che non viene decompresso.

Esecuzione del modello di file decompresso Cloud Storage in blocco

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Bulk Decompress Files on Cloud Storage template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • BUCKET_NAME: nome del bucket Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH: la tua scelta di percorso nel file contenente le informazioni sull'errore

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui suoi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • BUCKET_NAME: nome del bucket Cloud Storage
  • OUTPUT_FAILURE_FILE_PATH: la tua scelta di percorso nel file contenente le informazioni sull'errore

Eliminazione collettiva Datastore [deprecata]

Questo modello è deprecato e verrà rimosso nel primo trimestre del 2022. Esegui la migrazione al modello Eliminazione collettiva Firestore.

Il modello di eliminazione collettiva di Datastore è una pipeline che legge in Entità da Datastore con una determinata query GQL e poi elimina tutte le Entità corrispondenti nel progetto di destinazione selezionato. La pipeline può facoltativamente trasferire le entità Datastore codificate JSON all'UDF JavaScript, che puoi utilizzare per filtrare le entità restituendo valori nulli.

Requisiti per questa pipeline:

  • Datastore deve essere configurato nel progetto prima di eseguire il modello.
  • Se viene letta ed eliminata da istanze Datastore separate, l'account di servizio worker di Dataflow deve disporre dell'autorizzazione per leggere da un'istanza ed eliminare dall'altra.

Parametri del modello

Parametro Descrizione
datastoreReadGqlQuery Query GQL che specifica a quali entità corrispondere per l'eliminazione. L'utilizzo di una query di sole chiavi può migliorare le prestazioni. Ad esempio: "SELECT__key__ FROM MyKind".
datastoreReadProjectId ID progetto GCP dell'istanza Datastore da cui vuoi leggere le entità (utilizzando la query GQL) utilizzate per la corrispondenza.
datastoreDeleteProjectId ID progetto GCP dell'istanza di Datastore da cui eliminare le entità corrispondenti. Può essere uguale a datastoreReadProjectId se vuoi leggere ed eliminare all'interno della stessa istanza di Datastore.
datastoreReadNamespace (Facoltativo) Spazio dei nomi delle entità richieste. Imposta come "" per lo spazio dei nomi predefinito.
datastoreHintNumWorkers (Facoltativo) Suggerimento per il numero previsto di worker nel passaggio di limitazione del datastore. Il valore predefinito è 500.
javascriptTextTransformGcsPath (Facoltativo) L'URI Cloud Storage del file .js che definisce la funzione definita dall'utente JavaScript (UDF) che vuoi utilizzare. Ad esempio: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facoltativo) Il nome della funzione definita dall'utente JavaScript che vuoi utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per un esempio di UDF JavaScript, consulta Esempi di funzioni UDF. Se questa funzione restituisce un valore non definito o nullo per una determinata entità Datastore, l'entità non viene eliminata.

Esecuzione del modello di eliminazione collettiva di Datastore

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Bulk Delete Entities in Datastore template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \
    --region REGION_NAME \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • GQL_QUERY: la query che utilizzerai per trovare una corrispondenza con le entità per l'eliminazione
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: l'ID progetto dell'istanza del datastore. Questo esempio legge ed elimina dalla stessa istanza Datastore.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui suoi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • GQL_QUERY: la query che utilizzerai per trovare una corrispondenza con le entità per l'eliminazione
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: l'ID progetto dell'istanza del datastore. Questo esempio legge ed elimina dalla stessa istanza Datastore.

Eliminazione collettiva Firestore

Il modello di eliminazione collettiva Firestore è una pipeline che legge in Entità da Firestore con una determinata query GQL e poi elimina tutte le Entità corrispondenti nel progetto di destinazione selezionato. La pipeline può facoltativamente trasferire le entità Firestore con codifica JSON all'UDF JavaScript, che puoi utilizzare per filtrare le entità restituendo valori nulli.

Requisiti per questa pipeline:

  • Firestore deve essere configurato nel progetto prima di eseguire il modello.
  • Se leggi ed elimina da istanze Firestore separate, l'account di servizio worker di Dataflow deve avere l'autorizzazione per leggere da un'istanza ed eliminare dall'altra.

Parametri del modello

Parametro Descrizione
firestoreReadGqlQuery Query GQL che specifica a quali entità corrispondere per l'eliminazione. L'utilizzo di una query di sole chiavi può migliorare le prestazioni. Ad esempio: "SELECT__key__ FROM MyKind".
firestoreReadProjectId ID progetto GCP dell'istanza Firestore da cui vuoi leggere le entità (utilizzando la query GQL) utilizzate per la corrispondenza.
firestoreDeleteProjectId ID progetto GCP dell'istanza Firestore da cui eliminare le entità corrispondenti. Può essere uguale a firestoreReadProjectId se vuoi leggere ed eliminare all'interno della stessa istanza di Firestore.
firestoreReadNamespace (Facoltativo) Spazio dei nomi delle entità richieste. Imposta come "" per lo spazio dei nomi predefinito.
firestoreHintNumWorkers (Facoltativo) Suggerimento per il numero previsto di worker nel passaggio di limitazione dell'applicazione graduale di Firestore. Il valore predefinito è 500.
javascriptTextTransformGcsPath (Facoltativo) L'URI Cloud Storage del file .js che definisce la funzione definita dall'utente JavaScript (UDF) che vuoi utilizzare. Ad esempio: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facoltativo) Il nome della funzione definita dall'utente JavaScript che vuoi utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per un esempio di UDF JavaScript, consulta Esempi di funzioni UDF. Se questa funzione restituisce un valore non definito o nullo per una determinata entità Firestore, l'entità non viene eliminata.

Esecuzione del modello di eliminazione collettiva Firestore

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Bulk Delete Entities in Firestore template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete \
    --region REGION_NAME \
    --parameters \
firestoreReadGqlQuery="GQL_QUERY",\
firestoreReadProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID,\
firestoreDeleteProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • GQL_QUERY: la query che utilizzerai per trovare una corrispondenza con le entità per l'eliminazione
  • FIRESTORE_READ_AND_DELETE_PROJECT_ID: il tuo ID progetto dell'istanza di Firestore. Questo esempio legge ed elimina dalla stessa istanza Firestore.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui suoi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "firestoreReadGqlQuery": "GQL_QUERY",
       "firestoreReadProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID",
       "firestoreDeleteProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • GQL_QUERY: la query che utilizzerai per trovare una corrispondenza con le entità per l'eliminazione
  • FIRESTORE_READ_AND_DELETE_PROJECT_ID: il tuo ID progetto dell'istanza di Firestore. Questo esempio legge ed elimina dalla stessa istanza Firestore.

Streaming di flussi di dati su Pub/Sub, BigQuery e Cloud Storage

Il modello di generatore di dati in streaming viene utilizzato per generare un numero illimitato o fisso di record o messaggi sintetici in base allo schema fornito dall'utente alla frequenza specificata. Le destinazioni compatibili includono argomenti Pub/Sub, tabelle BigQuery e bucket Cloud Storage.

Di seguito sono riportati alcuni esempi di casi d'uso possibili:

  • Simula la pubblicazione di eventi in tempo reale su larga scala in un argomento Pub/Sub per misurare e determinare il numero e le dimensioni dei consumatori necessari per elaborare gli eventi pubblicati.
  • Genera dati sintetici in una tabella BigQuery o in un bucket Cloud Storage per valutare i benchmark di prestazioni o servire come proof of concept.

Sink e formati di codifica supportati

La tabella seguente descrive i sink e i formati di codifica supportati da questo modello:
JSON Avro Parquet
Pub/Sub Yes No
BigQuery No No
Cloud Storage

La libreria Generatore di dati JSON utilizzata dalla pipeline consente di utilizzare varie funzioni faker per ogni campo dello schema. Per ulteriori informazioni sulle funzioni faker e sul formato dello schema, consulta la documentazione relativa a json-data-generator.

Requisiti per questa pipeline:

  • Crea un file di schema dei messaggi e archivialo in una posizione di Cloud Storage.
  • Il target di output deve esistere prima dell'esecuzione. La destinazione deve essere un argomento Pub/Sub, una tabella BigQuery o un bucket Cloud Storage a seconda del tipo di sink.
  • Se la codifica di output è Avro o Parquet, crea un file di schema Avro e archivialo in un percorso Cloud Storage.

Parametri del modello

Parametro Descrizione
schemaLocation Posizione del file di schema. Ad esempio: gs://mybucket/filename.json.
qps Numero di messaggi da pubblicare al secondo. Ad esempio: 100.
sinkType (Facoltativo) Tipo di sink di output. I valori possibili sono PUBSUB, BIGQUERY, GCS. Il valore predefinito è PUBSUB.
outputType (Facoltativo) Tipo di codifica di output. I valori possibili sono JSON, AVRO, PARQUET. Il valore predefinito è JSON.
avroSchemaLocation (Facoltativo) Posizione del file di schema AVRO. Obbligatorio quando outputType è AVRO o PARQUET. Ad esempio: gs://mybucket/filename.avsc.
topic (Facoltativo) Nome dell'argomento Pub/Sub in cui la pipeline deve pubblicare i dati. Obbligatorio quando sinkType è Pub/Sub. Ad esempio: projects/my-project-ID/topics/my-topic-ID.
outputTableSpec (Facoltativo) Nome della tabella BigQuery di output. Obbligatorio quando sinkType è BigQuery. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facoltativo) BigQuery Write Disposition. I valori possibili sono WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. L'impostazione predefinita è WRITE_APPEND.
outputDeadletterTable (Facoltativo) Nome della tabella BigQuery di output in cui conservare i record non riusciti. Se non viene fornito, la pipeline crea una tabella durante l'esecuzione con il nome {output_table_name}_error_records. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facoltativo) Percorso della posizione di Cloud Storage di output. Obbligatorio quando sinkType è Cloud Storage. Ad esempio: gs://mybucket/pathprefix/.
outputFilenamePrefix (Facoltativo) Il prefisso del nome file dei file di output scritti in Cloud Storage. Il valore predefinito è output--.
windowDuration (Facoltativo) Intervallo di finestra in cui l'output viene scritto in Cloud Storage. Il valore predefinito è 1 m (ovvero 1 minuto).
numShards (Facoltativo) Numero massimo di shard dell'output. Obbligatorio quando sinkType è Cloud Storage e deve essere impostato su 1 o su un numero superiore.
messagesLimit (Facoltativo) Numero massimo di messaggi di output. Il valore predefinito è 0 che indica illimitato.
autoscalingAlgorithm (Facoltativo) Algoritmo utilizzato per la scalabilità automatica dei worker. I valori possibili sono THROUGHPUT_BASED per abilitare la scalabilità automatica o NONE per disabilitare.
maxNumWorkers (Facoltativo) Numero massimo di macchine worker. Ad esempio: 10.

Esecuzione del modello di generatore di dati in streaming

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Streaming Data Generator template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Cloud in cui vuoi eseguire il job Dataflow
  • REGION_NAME: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • SCHEMA_LOCATION: percorso del file di schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-ID/topics/my-topic-ID.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui suoi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Cloud in cui vuoi eseguire il job Dataflow
  • LOCATION: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/
    • Il nome della versione, come 2021-09-20-00_RC00, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
  • SCHEMA_LOCATION: percorso del file di schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-ID/topics/my-topic-ID.