O pipeline do Cloud Storage Text para BigQuery é um pipeline em lote que lê arquivos de texto armazenados no Cloud Storage, os transforma usando uma função definida pelo usuário (UDF) em JavaScript e anexa o resultado a uma tabela do BigQuery.
Requisitos de pipeline
- Crie um arquivo JSON que descreva seu esquema do BigQuery.
Verifique se há uma matriz JSON de nível superior intitulada
BigQuery Schema
e se o conteúdo dela segue o padrão{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
.O modelo de lote do Cloud Storage Text para BigQuery não é compatível com a importação de dados para os campos
STRUCT
(Registro) na tabela de destino do BigQuery.Veja no JSON a seguir um exemplo de esquema do BigQuery:
{ "BigQuery Schema": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" }, ] }
- Crie um arquivo Python (
.py
) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.Por exemplo, esta função divide cada linha de um arquivo CSV e retorna uma string JSON depois de transformar os valores.
function process(inJson) { val = inJson.split(","); const obj = { "name": val[0], "age": parseInt(val[1]) }; return JSON.stringify(obj); }
Parâmetros do modelo
Parâmetro | Descrição |
---|---|
javascriptTextTransformFunctionName |
o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar.
Por exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ } , o nome da função será
myTransform . Para amostras de UDFs do JavaScript, consulte os
exemplos de UDF.
|
JSONPath |
O caminho gs:// para o arquivo JSON que define o esquema do BigQuery, armazenado no Cloud Storage. Por exemplo, gs://path/to/my/schema.json . |
javascriptTextTransformGcsPath |
O URI do Cloud Storage do arquivo .js que define a função definida
pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar. Por exemplo, gs://my-bucket/my-udfs/my_file.js .
|
inputFilePattern |
O caminho gs:// do texto no Cloud Storage que você quer processar. Por exemplo, gs://path/to/my/text/data.txt . |
outputTable |
O nome da tabela do BigQuery que você quer criar para armazenar seus dados processados.
Se você reutilizar uma tabela atual do BigQuery, os dados serão anexados à tabela de destino.
Por exemplo, my-project-name:my-dataset.my-table |
bigQueryLoadingTemporaryDirectory |
O diretório temporário do processo de carregamento do BigQuery.
Por exemplo, gs://my-bucket/my-files/temp_dir |
useStorageWriteApi |
Opcional:
Se true , o pipeline usa a
API BigQuery Storage Write. O valor padrão é false . Para mais informações, consulte
Como usar a API Storage Write.
|
useStorageWriteApiAtLeastOnce |
Opcional:
Ao usar a API Storage Write, especifica a semântica de gravação. Para usar
semântica pelo menos uma vez, defina esse parâmetro como true . Para usar semântica exatamente uma vez,
defina o parâmetro como false . Esse parâmetro se aplica apenas quando
useStorageWriteApi é true . O valor padrão é false .
|
Função definida pelo usuário
Também é possível estender esse modelo escrevendo uma função definida pelo usuário (UDF). O modelo chama a UDF para cada elemento de entrada. Os payloads dos elementos são serializados como strings JSON. Para mais informações, consulte Criar funções definidas pelo usuário para modelos do Dataflow.
Especificação da função
A UDF tem a seguinte especificação:
- Entrada: uma linha de texto de um arquivo de entrada do Cloud Storage.
- Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to BigQuery (Batch) template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Flex \ --region REGION_NAME \ --parameters \ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
JAVASCRIPT_FUNCTION
: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usarPor exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.PATH_TO_BIGQUERY_SCHEMA_JSON
: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquemaPATH_TO_JAVASCRIPT_UDF_FILE
: o URI do Cloud Storage do arquivo.js
que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: o caminho do Cloud Storage para o conjunto de dados de textoBIGQUERY_TABLE
: o nome da tabela do BigQueryPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporário
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Flex", } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
JAVASCRIPT_FUNCTION
: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usarPor exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.PATH_TO_BIGQUERY_SCHEMA_JSON
: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquemaPATH_TO_JAVASCRIPT_UDF_FILE
: o URI do Cloud Storage do arquivo.js
que define a função definida pelo usuário (UDF, na sigla em inglês) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: o caminho do Cloud Storage para o conjunto de dados de textoBIGQUERY_TABLE
: o nome da tabela do BigQueryPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporário
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.