O pipeline de texto do Cloud Storage para o BigQuery é um pipeline de streaming que faz stream de ficheiros de texto armazenados no Cloud Storage, transforma-os através de uma função definida pelo utilizador (UDF) em JavaScript que fornece e anexa o resultado ao BigQuery.
O pipeline é executado indefinidamente e tem de ser terminado manualmente através de um
cancelamento e não de uma
drenagem, devido à sua utilização da transformação
Watch
, que é uma DoFn
divisível que não suporta
drenagem.
Requisitos do pipeline
- Crie um ficheiro JSON que descreva o esquema da tabela de resultados no BigQuery.
Certifique-se de que existe uma matriz JSON de nível superior com o título
fields
e que o respetivo conteúdo segue o padrão{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. Por exemplo:{ "fields": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" } ] }
- Crie um ficheiro JavaScript (
.js
) com a função FDU que fornece a lógica para transformar as linhas de texto. A sua função tem de devolver uma string JSON.O exemplo seguinte divide cada linha de um ficheiro CSV, cria um objeto JSON com os valores e devolve uma string JSON:
function process(inJson) { val = inJson.split(","); const obj = { "name": val[0], "age": parseInt(val[1]) }; return JSON.stringify(obj); }
Parâmetros de modelos
Parâmetros obrigatórios
- inputFilePattern: o caminho gs:// para o texto no Cloud Storage que quer processar. Por exemplo,
gs://your-bucket/your-file.txt
. - JSONPath: o caminho gs:// para o ficheiro JSON que define o seu esquema do BigQuery, armazenado no Cloud Storage. Por exemplo,
gs://your-bucket/your-schema.json
. - outputTable: a localização da tabela do BigQuery a usar para armazenar os dados processados. Se reutilizar uma tabela existente, esta é substituída. Por exemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - javascriptTextTransformGcsPath: o URI do Cloud Storage do ficheiro
.js
que define a função definida pelo utilizador (FDU) JavaScript que quer usar. Por exemplo,gs://your-bucket/your-transforms/*.js
. - javascriptTextTransformFunctionName: o nome da função definida pelo utilizador (FDU) JavaScript que quer usar. Por exemplo, se o código da função JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função émyTransform
. Para ver exemplos de UDFs JavaScript, consulte Exemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo,transform_udf1
. - bigQueryLoadingTemporaryDirectory: diretório temporário para o processo de carregamento do BigQuery. Por exemplo,
gs://your-bucket/your-files/temp-dir
.
Parâmetros opcionais
- outputDeadletterTable: tabela para mensagens que não conseguiram alcançar a tabela de resultados. Se uma tabela não existir, é criada durante a execução do pipeline. Se não for especificado, é usado
<outputTableSpec>_error_records
. Por exemplo,<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - useStorageWriteApiAtLeastOnce: este parâmetro só tem efeito se
Use BigQuery Storage Write API
estiver ativado. Se estiver ativada, a semântica de pelo menos uma vez é usada para a API Storage Write. Caso contrário, é usada a semântica de exatamente uma vez. A predefinição é: false. - useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é
false
. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, tem de definir este parâmetro. A predefinição é: 0. - storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, tem de definir este parâmetro. - pythonExternalTextTransformGcsPath: o padrão do caminho do Cloud Storage para o código Python que contém as suas funções definidas pelo utilizador. Por exemplo,
gs://your-bucket/your-function.py
. - javascriptTextTransformReloadIntervalMinutes: especifica a frequência com que o UDF é recarregado, em minutos. Se o valor for superior a 0, o Dataflow verifica periodicamente o ficheiro UDF no Cloud Storage e recarrega a UDF se o ficheiro for modificado. Este parâmetro permite-lhe atualizar a UDF enquanto o pipeline está em execução, sem ter de reiniciar a tarefa. Se o valor for
0
, o recarregamento das FDU está desativado. O valor predefinido é0
.
Função definida pelo utilizador
Este modelo requer uma FDU que analise os ficheiros de entrada, conforme descrito nos requisitos do pipeline. O modelo chama a UDF para cada linha de texto em cada ficheiro de entrada. Para mais informações sobre a criação de UDFs, consulte o artigo Crie funções definidas pelo utilizador para modelos do Dataflow.
Especificação da função
A FDU tem a seguinte especificação:
- Entrada: uma única linha de texto de um ficheiro de entrada.
- Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Cloud Storage Text to BigQuery (Stream) template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Substitua o seguinte:
JOB_NAME
: um nome de tarefa exclusivo à sua escolhaREGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: a localização para organizar ficheiros locais (por exemplo,gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: o nome da função definida pelo utilizador (FDU) JavaScript que quer usarPor exemplo, se o código da função JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função émyTransform
. Para ver exemplos de UDFs JavaScript, consulte os exemplos de UDFs.PATH_TO_BIGQUERY_SCHEMA_JSON
: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquemaPATH_TO_JAVASCRIPT_UDF_FILE
: o URI do Cloud Storage do ficheiro.js
que define a função definida pelo utilizador (FDU) JavaScript que quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: o caminho do Cloud Storage para o seu conjunto de dados de textoBIGQUERY_TABLE
: o nome da tabela do BigQueryBIGQUERY_UNPROCESSED_TABLE
: o nome da sua tabela do BigQuery para mensagens não processadasPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporário
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex", } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaLOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: a localização para organizar ficheiros locais (por exemplo,gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: o nome da função definida pelo utilizador (FDU) JavaScript que quer usarPor exemplo, se o código da função JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função émyTransform
. Para ver exemplos de UDFs JavaScript, consulte os exemplos de UDFs.PATH_TO_BIGQUERY_SCHEMA_JSON
: o caminho do Google Cloud Storage para o ficheiro JSON que contém a definição do esquemaPATH_TO_JAVASCRIPT_UDF_FILE
: o URI do Cloud Storage do ficheiro.js
que define a função definida pelo utilizador (FDU) JavaScript que quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: o caminho do Cloud Storage para o seu conjunto de dados de textoBIGQUERY_TABLE
: o nome da tabela do BigQueryBIGQUERY_UNPROCESSED_TABLE
: o nome da sua tabela do BigQuery para mensagens não processadasPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporário
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.