O pipeline do Cloud Storage Text para o BigQuery é um pipeline de streaming que transmite arquivos de texto armazenados no Cloud Storage, os transforma usando uma função definida pelo usuário (UDF) do JavaScript fornecida por você e anexa o resultado ao BigQuery.
O pipeline é executado de modo indefinido e precisa ser encerrado manualmente pelo comando
cancelar e não
drenar, porque ele usa a
transformação
Watch
, que é um DoFn
divisível sem suporte para
drenagem.
Requisitos de pipeline
- Crie um arquivo JSON que descreva o esquema da tabela de saída no BigQuery.
Verifique se há uma matriz JSON de nível superior intitulada
fields
e se o conteúdo dela segue o padrão{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
. Por exemplo:{ "fields": [ { "name": "name", "type": "STRING" }, { "name": "age", "type": "INTEGER" } ] }
- Crie um arquivo JavaScript (
.js
) com a função UDF que fornece a lógica para transformar as linhas de texto. A função precisa retornar uma string JSON.O exemplo a seguir divide cada linha de um arquivo CSV, cria um objeto JSON com os valores e retorna uma string JSON:
function process(inJson) { val = inJson.split(","); const obj = { "name": val[0], "age": parseInt(val[1]) }; return JSON.stringify(obj); }
Parâmetros do modelo
Parâmetros obrigatórios
- inputFilePattern: o caminho gs:// do texto no Cloud Storage que você quer processar. Por exemplo,
gs://your-bucket/your-file.txt
. - JSONPath: o caminho gs:// para o arquivo JSON que define o esquema do BigQuery, armazenado no Cloud Storage. Por exemplo,
gs://your-bucket/your-schema.json
. - outputTable: o local da tabela do BigQuery a ser usada para armazenar os dados processados. Se você reutilizar uma tabela, ela será substituída. Por exemplo,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - javascriptTextTransformGcsPath: o URI do Cloud Storage do arquivo
.js
que define a função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo,gs://your-bucket/your-transforms/*.js
. - javascriptTextTransformFunctionName: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usar. Por exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para conferir exemplos de UDFs em JavaScript, consulte os exemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por exemplo,transform_udf1
. - bigQueryLoadingTemporaryDirectory: diretório temporário para o processo de carregamento do BigQuery. Por exemplo,
gs://your-bucket/your-files/temp-dir
.
Parâmetros opcionais
- outputDeadletterTable: tabela de mensagens que não alcançaram a tabela de saída. Se uma tabela não existir, ela será criada durante a execução do pipeline. Se não for especificado,
<outputTableSpec>_error_records
será usado. Por exemplo,<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. - useStorageWriteApiAtLeastOnce: esse parâmetro só entra em vigor se
Use BigQuery Storage Write API
estiver ativado. Se ativada, a semântica do tipo "pelo menos uma vez" será usada para a API Storage Write. Caso contrário, a semântica "exatamente uma" será usada. O padrão é: falso. - useStorageWriteApi: se verdadeiro, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é
false
. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: ao usar a API Storage Write, especifica o número de fluxos de gravação. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, será necessário definir esse parâmetro. Padrão: 0. - storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, será necessário definir esse parâmetro. - pythonExternalTextTransformGcsPath: o padrão de caminho do Cloud Storage para o código Python que contém as funções definidas pelo usuário. Por exemplo,
gs://your-bucket/your-function.py
. - javascriptTextTransformReloadIntervalMinutes: especifica a frequência de recarregamento da UDF em minutos. Se o valor for maior que 0, o Dataflow vai verificar periodicamente o arquivo da UDF no Cloud Storage e vai atualizar a UDF se o arquivo for modificado. Com esse parâmetro, é possível atualizar a UDF enquanto o pipeline está em execução, sem precisar reiniciar o job. Se o valor for
0
, o recarregamento da UDF será desativado. O valor padrão é0
.
Função definida pelo usuário
Esse modelo requer uma UDF que analisa os arquivos de entrada, conforme descrito em Requisitos do pipeline. O modelo chama a UDF para cada linha de texto em cada arquivo de entrada. Para mais informações sobre como criar UDFs, consulte Criar funções definidas pelo usuário para modelos do Dataflow.
Especificação da função
A UDF tem a seguinte especificação:
- Entrada: uma única linha de texto de um arquivo de entrada.
- Saída: uma string JSON que corresponde ao esquema da tabela de destino do BigQuery.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Cloud Storage Text to BigQuery (Stream) template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
Substitua:
JOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: o local para fase de testes de arquivos locais (por exemplo,gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usarPor exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.PATH_TO_BIGQUERY_SCHEMA_JSON
: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquemaPATH_TO_JAVASCRIPT_UDF_FILE
: O URI do Cloud Storage do arquivo.js
que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: o caminho do Cloud Storage para o conjunto de dados de textoBIGQUERY_TABLE
: o nome da tabela do BigQueryBIGQUERY_UNPROCESSED_TABLE
: o nome da tabela do BigQuery para mensagens não processadasPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporário
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex", } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION
: o local para fase de testes de arquivos locais (por exemplo,gs://your-bucket/staging
)JAVASCRIPT_FUNCTION
: o nome da função definida pelo usuário (UDF) do JavaScript que você quer usarPor exemplo, se o código de função do JavaScript for
myTransform(inJson) { /*...do stuff...*/ }
, o nome da função serámyTransform
. Para amostras de UDFs do JavaScript, consulte os exemplos de UDF.PATH_TO_BIGQUERY_SCHEMA_JSON
: o caminho do Cloud Storage para o arquivo JSON que contém a definição do esquemaPATH_TO_JAVASCRIPT_UDF_FILE
: O URI do Cloud Storage do arquivo.js
que define a função definida pelo usuário (UDF) do JavaScript que você quer usar, por exemplo,gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
: o caminho do Cloud Storage para o conjunto de dados de textoBIGQUERY_TABLE
: o nome da tabela do BigQueryBIGQUERY_UNPROCESSED_TABLE
: o nome da tabela do BigQuery para mensagens não processadasPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporário
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.