Use o conetor do Bigtable Spark
O conetor do Bigtable Spark permite-lhe ler e escrever dados no Bigtable. Pode ler dados da sua aplicação Spark através do Spark SQL e dos DataFrames. As seguintes operações do Bigtable são suportadas através do conetor do Bigtable Spark:
- Escrever dados
- Ler dados
- Criar uma nova tabela
Este documento mostra como converter uma tabela de DataFrames do Spark SQL numa tabela do Bigtable e, em seguida, compilar e criar um ficheiro JAR para enviar uma tarefa do Spark.
Estado de compatibilidade do Spark e do Scala
O conector do Bigtable Spark suporta as seguintes versões do Scala:
O conetor do Bigtable Spark suporta as seguintes versões do Spark:
O conetor do Bigtable Spark suporta as seguintes versões do Dataproc:
- 1.5 cluster de versões de imagens
- Cluster de versão de imagem 2.0
- 2.1 cluster de versões de imagens
- 2.2 cluster de versões de imagens
- Versão 1.0 do tempo de execução sem servidor do Dataproc
Calcule os custos
Se decidir usar qualquer um dos seguintes componentes faturáveis do Google Cloud, a faturação é feita pelos recursos que usar:
- Bigtable (não lhe é cobrado nenhum valor pela utilização do emulador do Bigtable)
- Dataproc
- Cloud Storage
Os preços do Dataproc aplicam-se à utilização de clusters do Dataproc no Compute Engine. A determinação de preços do Dataproc sem servidor aplica-se a cargas de trabalho e sessões executadas no Dataproc sem servidor para Spark.
Para gerar uma estimativa de custos com base na sua utilização projetada, use a calculadora de preços.
Antes de começar
Conclua os seguintes pré-requisitos antes de usar o conector do Bigtable Spark.
Funções necessárias
Para receber as autorizações de que precisa para usar o conector do Bigtable Spark, peça ao seu administrador para lhe conceder as seguintes funções IAM no seu projeto:
-
Administrador do Bigtable (
roles/bigtable.admin
)(opcional): permite-lhe ler ou escrever dados e criar uma nova tabela. -
Utilizador do Bigtable (
roles/bigtable.user
): permite-lhe ler ou escrever dados, mas não lhe permite criar uma nova tabela.
Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.
Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.
Se estiver a usar o Dataproc ou o Cloud Storage, podem ser necessárias autorizações adicionais. Para mais informações, consulte as autorizações do Dataproc e as autorizações do Cloud Storage.
Configure o Spark
Além de criar uma instância do Bigtable, também tem de configurar a instância do Spark. Pode fazê-lo localmente ou selecionar qualquer uma destas opções para usar o Spark com o Dataproc:
- Cluster do Dataproc
- Dataproc sem servidor
Para mais informações sobre a escolha entre um cluster do Dataproc ou uma opção sem servidor, consulte a documentação Dataproc sem servidor para Spark em comparação com o Dataproc no Compute Engine .
Transfira o ficheiro JAR do conetor
Pode encontrar o código fonte do conetor do Bigtable Spark com exemplos no repositório do GitHub do conetor do Bigtable Spark.
Com base na configuração do Spark, pode aceder ao ficheiro JAR da seguinte forma:
Se estiver a executar o PySpark localmente, deve transferir o ficheiro JAR do conetor a partir da
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
localização do Cloud Storage.Substitua
SCALA_VERSION
por2.12
ou2.13
, que são as únicas versões do Scala suportadas, e substituaCONNECTOR_VERSION
pela versão do conector que quer usar.Para o cluster do Dataproc ou a opção sem servidor, use o ficheiro JAR mais recente como um artefacto que pode ser adicionado nas suas aplicações Scala ou Java Spark. Para mais informações sobre a utilização do ficheiro JAR como um artefacto, consulte o artigo Faça a gestão das dependências.
Se estiver a enviar a sua tarefa do PySpark para o Dataproc, use a flag
gcloud dataproc jobs submit pyspark --jars
para definir o URI para a localização do ficheiro JAR no Cloud Storage, por exemplo,gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.
Determine o tipo de computação
Para tarefas apenas de leitura, pode usar a computação sem servidor do Data Boost, que lhe permite evitar o impacto nos clusters de fornecimento de aplicações. A sua aplicação Spark tem de usar a versão 1.1.0 ou posterior do conector do Spark para usar o aumento de dados.
Para usar o Data Boost, tem de criar um perfil da app do Data Boost e, em seguida, fornecer o ID do perfil da app para a opção spark.bigtable.app_profile.id
Spark quando adicionar a configuração do Bigtable à sua aplicação Spark. Se já criou um perfil de app para as suas tarefas de leitura do Spark e quer continuar a usá-lo sem alterar o código da aplicação, pode converter o perfil de app num perfil de app do Data Boost. Para mais informações, consulte o artigo Converta um perfil
de app.
Para mais informações, consulte a vista geral do Bigtable Data Boost.
Para tarefas que envolvam leituras e escritas, pode usar os nós do cluster da sua instância para computação especificando um perfil de app padrão com o seu pedido.
Identifique ou crie um perfil de app para usar
Se não especificar um ID do perfil da app, o conetor usa o perfil da app predefinido.
Recomendamos que use um perfil de app único para cada aplicação que executa, incluindo a sua aplicação Spark. Para mais informações sobre os tipos e as definições de perfis de apps, consulte a vista geral dos perfis de apps. Para obter instruções, consulte o artigo Crie e configure perfis de apps.
Adicione a configuração do Bigtable à sua aplicação Spark
Na sua aplicação Spark, adicione as opções do Spark que lhe permitem interagir com o Bigtable.
Opções do Spark suportadas
Use as opções do Spark disponíveis como parte do pacote com.google.cloud.spark.bigtable
.
Nome da opção | Obrigatória | Valor predefinido | Significado |
---|---|---|---|
spark.bigtable.project.id |
Sim | N/A | Defina o ID do projeto do Bigtable. |
spark.bigtable.instance.id |
Sim | N/A | Defina o ID da instância do Bigtable. |
catalog |
Sim | N/A | Defina o formato JSON que especifica o formato de conversão entre o esquema semelhante a SQL do DataFrame e o esquema da tabela do Bigtable. Consulte o artigo Crie metadados de tabelas no formato JSON para mais informações. |
spark.bigtable.app_profile.id |
Não | default |
Defina o ID do perfil da app do Bigtable. |
spark.bigtable.write.timestamp.milliseconds |
Não | Hora atual do sistema | Defina a data/hora em milissegundos a usar ao escrever um DataFrame no Bigtable. Tenha em atenção que, uma vez que todas as linhas no DataFrame usam a mesma data/hora, as linhas com a mesma coluna de chave de linha no DataFrame persistem como uma única versão no Bigtable, uma vez que partilham a mesma data/hora. |
spark.bigtable.create.new.table |
Não | false |
Definido como true para criar uma nova tabela antes de escrever no Bigtable. |
spark.bigtable.read.timerange.start.milliseconds ou spark.bigtable.read.timerange.end.milliseconds |
Não | N/A | Defina as indicações de tempo (em milissegundos desde a hora da época) para filtrar as células com uma data de início e uma data de fim específicas, respetivamente. |
spark.bigtable.push.down.row.key.filters |
Não | true |
Definido como true para permitir a filtragem simples de chaves de linhas do lado do servidor. A filtragem em chaves de linhas compostas é implementada no lado do cliente.Consulte o artigo Leia uma linha específica do DataFrame usando um filtro para mais informações. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
Não | 30m | Defina a duração do tempo limite para uma tentativa de leitura de linhas correspondente a uma partição de DataFrame no cliente do Bigtable para Java. |
spark.bigtable.read.rows.total.timeout.milliseconds |
Não | 12h | Defina a duração do tempo limite total para uma tentativa de leitura de linhas correspondente a uma partição DataFrame no cliente Bigtable para Java. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
Não | 1 min | Defina a duração do tempo limite para uma tentativa de alteração de linhas correspondente a uma partição do DataFrame no cliente do Bigtable para Java. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
Não | 10m | Defina a duração do tempo limite total para uma tentativa de alteração de linhas correspondente a uma partição de DataFrame no cliente do Bigtable para Java. |
spark.bigtable.batch.mutate.size |
Não | 100 |
Definido como o número de mutações em cada lote. O valor máximo que pode definir é 100000 . |
spark.bigtable.enable.batch_mutate.flow_control |
Não | false |
Defina como true para ativar o controlo de fluxo para mutações em lote. |
Crie metadados de tabelas no formato JSON
O formato de tabela de DataFrames do Spark SQL tem de ser convertido numa tabela do Bigtable através de uma string com formato JSON. Este formato JSON de string torna o formato de dados compatível com o Bigtable. Pode transmitir o formato JSON no código da sua aplicação através da opção .option("catalog", catalog_json_string)
.
Como exemplo, considere a seguinte tabela DataFrame e a tabela Bigtable correspondente.
Neste exemplo, as colunas name
e birthYear
no DataFrame são agrupadas na família de colunas info
e mudadas para name
e birth_year
, respetivamente. Da mesma forma, a coluna address
é armazenada na família de colunas location
com o mesmo nome de coluna. A coluna id
do DataFrame é convertida na chave da linha do Bigtable.
As chaves de linhas não têm um nome de coluna dedicado no Bigtable e, neste exemplo, id_rowkey
é usado apenas para indicar ao conector que esta é a coluna de chaves de linhas. Pode usar qualquer nome para a coluna de chave de linha e certificar-se de que usa o mesmo nome quando declara o campo "rowkey":"column_name"
no formato JSON.
DataFrame | Tabela do Bigtable = t1 | |||||||
Colunas | Chave da linha | Famílias de colunas | ||||||
informações | localização | |||||||
Colunas | Colunas | |||||||
id | name | birthYear | address | id_rowkey | name | birth_year | address |
O formato JSON do catálogo é o seguinte:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
As chaves e os valores usados no formato JSON são os seguintes:
Chave do catálogo | Valor do catálogo | Formato JSON |
---|---|---|
tabela | Nome da tabela do Bigtable. | "table":{"name":"t1"} Se a tabela não existir, use .option("spark.bigtable.create.new.table", "true") para criar uma tabela. |
rowkey | Nome da coluna que vai ser usada como a chave da linha do Bigtable. Certifique-se de que o nome da coluna do DataFrame é usado como a chave da linha, por exemplo, id_rowkey . As chaves compostas também são aceites como chaves de linhas. Por exemplo, "rowkey":"name:address" . Esta abordagem pode resultar em chaves de linhas que requerem uma análise completa da tabela para todos os pedidos de leitura. |
"rowkey":"id_rowkey" , |
colunas | Mapeamento de cada coluna do DataFrame na família de colunas ("cf" ) e no nome da coluna ("col" ) correspondentes do Bigtable. O nome da coluna pode ser diferente do nome da coluna na tabela do DataFrame. Os tipos de dados suportados incluem string , long e binary . |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" Neste exemplo, id_rowkey é a chave da linha e info e location são as famílias de colunas. |
Tipos de dados suportados
O conetor suporta a utilização dos tipos string
, long
e binary
(matriz de bytes) no catálogo. Até ser adicionado suporte para outros tipos, como int
e float
, pode converter manualmente esses tipos de dados em matrizes de bytes (BinaryType
do Spark SQL) antes de usar o conector para os escrever no Bigtable.
Além disso, pode usar o Avro para serializar tipos complexos, como ArrayType
. Para mais informações, consulte o artigo Serializar tipos de dados complexos com o Apache Avro.
Escreva no Bigtable
Use a função .write()
e as opções suportadas para escrever os seus dados no Bigtable.
Java
O seguinte código do repositório do GitHub usa Java e Maven para escrever no Bigtable.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
O código seguinte do repositório do GitHub usa Python para escrever no Bigtable.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Ler a partir do Bigtable
Use a função .read()
para verificar se a tabela foi importada com êxito para o Bigtable.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
Compile o projeto
Gere o ficheiro JAR que é usado para executar uma tarefa num cluster do Dataproc, no Dataproc sem servidor ou numa instância local do Spark. Pode compilar o ficheiro JAR localmente e, em seguida, usá-lo para enviar uma tarefa. O caminho para o JAR compilado é definido como a variável de ambiente PATH_TO_COMPILED_JAR
quando envia uma tarefa.
Este passo não se aplica a aplicações PySpark.
Faça a gestão das dependências
O conector do Bigtable Spark é compatível com as seguintes ferramentas de gestão de dependências:
Compile o ficheiro JAR
Maven
Adicione a dependência
spark-bigtable
ao ficheiro pom.xml.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Adicione o plug-in Maven Shade ao ficheiro
pom.xml
para criar um JAR completo:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
Execute o comando
mvn clean install
para gerar um ficheiro JAR.
sbt
Adicione a dependência
spark-bigtable
ao ficheirobuild.sbt
:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
Adicione o plugin
sbt-assembly
ao seu ficheiroproject/plugins.sbt
ouproject/assembly.sbt
para criar um ficheiro JAR Uber.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Execute o comando
sbt clean assembly
para gerar o ficheiro JAR.
Gradle
Adicione a dependência
spark-bigtable
ao ficheirobuild.gradle
.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
Adicione o plug-in Shadow no seu ficheiro
build.gradle
para criar um ficheiro JAR uber:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
Consulte a documentação do plug-in Shadow para obter mais informações sobre a configuração e a compilação de JAR.
Envie um trabalho
Envie uma tarefa do Spark através do Dataproc, do Dataproc sem servidor ou de uma instância local do Spark para iniciar a sua aplicação.
Defina o ambiente de tempo de execução
Defina as seguintes variáveis de ambiente.
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
Substitua o seguinte:
- PROJECT_ID: o identificador permanente do projeto do Bigtable.
- INSTANCE_ID: o identificador permanente da instância do Bigtable.
- TABLE_NAME: o identificador permanente da tabela.
- DATAPROC_CLUSTER: o identificador permanente do cluster do Dataproc.
- DATAPROC_REGION: a região do Dataproc que contém um dos clusters na sua instância do Dataproc, por exemplo,
northamerica-northeast2
. - DATAPROC_ZONE: a zona onde o cluster do Dataproc é executado.
- SUBNET: O caminho completo do recurso da sub-rede.
- GCS_BUCKET_NAME: o contentor do Cloud Storage para carregar dependências da carga de trabalho do Spark.
- PATH_TO_COMPILED_JAR: o caminho completo ou relativo para o JAR compilado, por exemplo,
/path/to/project/root/target/<compiled_JAR_name>
para o Maven. - GCS_PATH_TO_CONNECTOR_JAR: o contentor do Cloud Storage, onde se encontra o ficheiro
spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.gs://spark-lib/bigtable
- PATH_TO_PYTHON_FILE: para aplicações PySpark, o caminho para o ficheiro Python que vai ser usado para escrever dados no Bigtable e ler dados do mesmo.
- LOCAL_PATH_TO_CONNECTOR_JAR: para aplicações PySpark, caminho para o ficheiro JAR do conetor do Bigtable para o Spark transferido.
Envie uma tarefa do Spark
Para instâncias do Dataproc ou a sua configuração local do Spark, execute uma tarefa do Spark para carregar dados para o Bigtable.
Cluster do Dataproc
Use o ficheiro JAR compilado e crie uma tarefa de cluster do Dataproc que leia e escreva dados do e para o Bigtable.
Crie um cluster do Dataproc. O exemplo seguinte mostra um comando de exemplo para criar um cluster do Dataproc v2.0 com o Debian 10, dois nós de trabalho e configurações predefinidas.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
Envie um trabalho.
Scala/Java
O exemplo seguinte mostra a classe
spark.bigtable.example.WordCount
que inclui a lógica para criar uma tabela de teste no DataFrame, escrever a tabela no Bigtable e, em seguida, contar o número de palavras na tabela.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc sem servidor
Use o ficheiro JAR compilado e crie uma tarefa do Dataproc que leia e escreva dados de e para o Bigtable com uma instância do Dataproc sem servidor.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Local Spark
Use o ficheiro JAR transferido e crie uma tarefa do Spark que leia e escreva dados de e para o Bigtable com uma instância do Spark local. Também pode usar o emulador do Bigtable para enviar a tarefa do Spark.
Use o emulador do Bigtable
Se decidir usar o emulador do Bigtable, siga estes passos:
Execute o seguinte comando para iniciar o emulador:
gcloud beta emulators bigtable start
Por predefinição, o emulador escolhe
localhost:8086
.Defina a variável de ambiente
BIGTABLE_EMULATOR_HOST
:export BIGTABLE_EMULATOR_HOST=localhost:8086
Para mais informações sobre a utilização do emulador do Bigtable, consulte o artigo Teste com o emulador.
Envie uma tarefa do Spark
Use o comando spark-submit
para enviar uma tarefa do Spark, independentemente de estar a usar um emulador do Bigtable local.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Valide os dados da tabela
Execute o seguinte comando da
CLI cbt
para verificar se os dados são escritos no Bigtable. A CLI
cbt
é um componente da CLI do Google Cloud. Para mais informações, consulte a
cbt
vista geral da CLI.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
Soluções adicionais
Use o conetor Bigtable Spark para soluções específicas, como serializar tipos complexos de Spark SQL, ler linhas específicas e gerar métricas do lado do cliente.
Ler uma linha específica de um DataFrame através de um filtro
Quando usa DataFrames para ler a partir do Bigtable, pode especificar um filtro para ler apenas linhas específicas. Os filtros simples, como ==
, <=
e startsWith
na coluna da chave da linha, são aplicados no lado do servidor para evitar uma análise completa da tabela. Os filtros em chaves de linhas compostas ou filtros complexos, como o filtro LIKE
na coluna da chave de linha, são aplicados no lado do cliente.
Se estiver a ler tabelas grandes, recomendamos que use filtros de chave de linha simples para evitar a execução de uma análise completa da tabela. A declaração de exemplo seguinte mostra como ler usando um filtro simples. Certifique-se de que, no filtro do Spark, usa o nome da coluna do DataFrame que é convertida na chave da linha:
dataframe.filter("id == 'some_id'").show()
Quando aplicar um filtro, use o nome da coluna do DataFrame em vez do nome da coluna da tabela do Bigtable.
Serializar tipos de dados complexos através do Apache Avro
O conetor do Bigtable Spark oferece suporte para a utilização do Apache Avro para serializar tipos de SQL do Spark complexos, como ArrayType
, MapType
ou StructType
. O Apache Avro fornece serialização de dados para dados de registo que são usados frequentemente para processar e armazenar estruturas de dados complexas.
Use uma sintaxe como "avro":"avroSchema"
para especificar que uma coluna no Bigtable deve ser codificada através do Avro. Em seguida, pode usar .option("avroSchema", avroSchemaString)
ao ler ou escrever no Bigtable para especificar o esquema Avro correspondente a essa coluna no formato de string. Pode usar nomes de opções diferentes, por exemplo, "anotherAvroSchema"
para colunas diferentes e transmitir esquemas Avro para várias colunas.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
Use métricas do lado do cliente
Uma vez que o conector do Bigtable Spark se baseia no cliente do Bigtable para Java, as métricas do lado do cliente são ativadas no conector por predefinição. Pode consultar a documentação sobre as métricas do lado do cliente para encontrar mais detalhes sobre o acesso e a interpretação destas métricas.
Use o cliente do Bigtable para Java com funções RDD de baixo nível
Uma vez que o conector do Bigtable Spark se baseia no cliente do Bigtable para Java, pode usar diretamente o cliente nas suas aplicações Spark e realizar pedidos de leitura ou escrita distribuídos nas funções RDD de baixo nível, como mapPartitions
e foreachPartition
.
Para usar o cliente Bigtable para classes Java, acrescente o prefixo com.google.cloud.spark.bigtable.repackaged
aos nomes dos pacotes. Por exemplo, em vez de usar o nome da classe como com.google.cloud.bigtable.data.v2.BigtableDataClient
, use com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
.
Para mais informações sobre o cliente do Bigtable para Java, consulte o artigo Cliente do Bigtable para Java.
O que se segue?
- Saiba como otimizar a tarefa do Spark no Dataproc.
- Use classes do cliente do Bigtable para Java com o conetor do Bigtable Spark.