Gerenciar streams

Visão geral

Nesta seção, você aprenderá a usar a API Datastream para:

  • Validar e criar streams
  • Receber informações sobre streams e objetos de streams
  • Atualize os streams iniciando, pausa, retomando e modificando-os, bem como iniciando e interrompendo o preenchimento de objetos dos streams
  • Excluir streams

Há duas maneiras de usar a API Datastream. É possível fazer chamadas da API REST ou usar a ferramenta de linha de comando (CLI) gcloud.

Para ver informações gerais sobre como usar o gcloud para gerenciar streams do Datastream, clique aqui.

Validar um stream

Antes de criar um fluxo, é possível validá-lo. Dessa forma, é possível garantir que o stream seja executado com êxito e que todas as verificações de validação sejam aprovadas.

Como validar verificações de stream:

  • Indica se a fonte está configurada corretamente para permitir que o Datastream faça streaming de dados.
  • Se a transmissão pode se conectar à origem e ao destino.
  • A configuração completa do stream.

O código a seguir mostra uma solicitação para validar um stream usado para transferir dados de um banco de dados Oracle de origem para um bucket de destino no Cloud Storage.

REST

POST "https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

O valor &validate_only=true indica que você está validando apenas o stream. você não está criando. Além disso, para esta solicitação, coloque todo o URL entre aspas. Isso garante que o Datastream identifique o valor de &validate_only=true para validar o stream.

Depois de fazer essa solicitação, você verá as verificações de validação que o Datastream executa para sua origem e seu destino e se as verificações serão aprovadas ou falharão. Para qualquer verificação de validação que não seja aprovada, serão exibidas informações sobre o motivo da falha e o que fazer para corrigir o problema.

Depois de fazer as correções apropriadas, faça a solicitação novamente para garantir que todas as verificações de validação sejam aprovadas.

Criar um stream

O código a seguir mostra uma solicitação para criar um stream usado para transferir dados de um banco de dados Oracle de origem para um bucket de destino no Cloud Storage.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

Por exemplo, veja uma solicitação para extrair todas as tabelas do schema1 e duas tabelas específicas do schema3: tableA e tableC.

Os eventos são gravados em um bucket no formato Cloud Storage no Avro, e um novo arquivo é criado a cada 100 MB ou 30 segundos (substituindo os valores padrão de 50 MB e 60 segundos).

O parâmetro backfillAll está associado ao preenchimento histórico. Ao definir esse parâmetro como um dicionário vazio ({}), o Datastream preencherá:

  • Dados históricos, além de alterações em andamento nos dados, desde o banco de dados de origem até o destino.
  • Esquemas e tabelas, da origem ao destino.
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "backfillAll": {}
}

Receber informações sobre um stream

O código a seguir mostra uma solicitação para recuperar informações sobre um stream. Essas informações incluem:

  • É o nome reconhecido pelo Datastream.
  • Um nome fácil de usar para a transmissão (o nome de exibição)
  • Carimbos de data e hora de quando o stream foi criado e atualizado
  • Informações sobre os perfis de conexão de origem e de destino associados ao stream
  • O estado do stream

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

Exemplo:

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

A resposta é exibida da seguinte maneira:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "backfillAll": {}
}

gcloud

Para saber como usar gcloud para recuperar informações sobre sua transmissão, clique aqui.

Listar streams

O código a seguir mostra uma solicitação para recuperar informações sobre todos os seus streams.

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams

gcloud

Para saber como usar o gcloud para recuperar informações sobre todos os seus streams, clique aqui.

Listar objetos de um stream

O código a seguir mostra uma solicitação para recuperar informações sobre todos os objetos de um stream.

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects

Exemplo:

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myMySQLCdcStream/objects

A lista de objetos retornada pode ser semelhante a esta:

REST

{
  "streamObjects": [
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

Atualizar um stream

Iniciar um stream

O código a seguir mostra uma solicitação para iniciar um fluxo.

Usando o parâmetro updateMask na solicitação, apenas os campos que você especificar precisam ser incluídos no corpo da solicitação.

Neste exemplo, o campo especificado é state, que representa o status (ou estado) do stream. Ao iniciar o stream, você muda o estado dele de CREATED para RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

Exemplo:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

Pausar um stream

O código a seguir mostra uma solicitação para pausar um fluxo em execução.

Neste exemplo, o campo especificado no parâmetro updateMask é state. Ao pausar o stream, você muda o estado dele de RUNNING para PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

Exemplo:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

Retomar um stream

O código a seguir mostra uma solicitação para retomar um stream pausado.

Neste exemplo, o campo especificado no parâmetro updateMask é state. Ao retomar o stream, você muda o estado dele de PAUSED para RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

Exemplo:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

Modificar um stream

O código a seguir mostra uma solicitação para atualizar a configuração de rotação de arquivos de um fluxo para alterná-lo a cada 75 MB ou 45 segundos.

Neste exemplo, os campos especificados para o parâmetro updateMask incluem os campos fileRotationMb e fileRotationInterval, representados pelas sinalizações destinationConfig.gcsDestinationConfig.fileRotationMb e destinationConfig.gcsDestinationConfig.fileRotationInterval, respectivamente.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Exemplo:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

O código a seguir mostra uma solicitação para incluir um arquivo de esquema de tipos unificados no caminho dos arquivos que o Datastream grava no Cloud Storage. Como resultado, o Datastream grava dois arquivos: um arquivo de dados JSON e um arquivo de esquema Avro.

Neste exemplo, o campo especificado é jsonFileFormat, representado pela sinalização destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

Exemplo:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

O código a seguir mostra uma solicitação para que o Datastream replique dados existentes, além de alterações contínuas nos dados, do banco de dados de origem para o destino.

A seção oracleExcludedObjects do código mostra as tabelas e esquemas que não podem ser preenchidos no destino.

Neste exemplo, todas as tabelas e esquemas serão preenchidos, exceto a tableA no schema3.

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

Exemplo:

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

Iniciar preenchimento para um objeto de um stream

Um stream no Datastream pode preencher dados históricos e fazer streaming de alterações em andamento para um destino. As alterações em andamento serão sempre transmitidas de uma origem para um destino. No entanto, é possível especificar se você quer que os dados históricos sejam transmitidos.

Se você quiser que os dados históricos sejam transmitidos da origem para o destino, use o parâmetro backfillAll.

O Datastream também permite transmitir dados históricos apenas para tabelas de banco de dados específicas. Para fazer isso, use o parâmetro backfillAll e exclua as tabelas para as quais você não quer dados históricos.

Se você quiser que apenas as alterações em andamento sejam transmitidas no destino, use o parâmetro backfillNone. Se você quiser que o Datastream transmita um snapshot de todos os dados existentes da origem para o destino, inicie o preenchimento manualmente para os objetos que contêm esses dados.

Outro motivo para iniciar o preenchimento de um objeto é se os dados não estiverem sincronizados entre a origem e o destino. Por exemplo, um usuário pode excluir dados no destino acidentalmente, e os dados serão perdidos. Nesse caso, o preenchimento do objeto será iniciado como um "mecanismo de redefinição", porque todos os dados são transmitidos para o destino em uma única captura. Como resultado, os dados são sincronizados entre a origem e o destino.

Antes de iniciar o preenchimento de um objeto de um stream, você precisa recuperar informações sobre o objeto.

Cada objeto tem um [object-id], que identifica o objeto de maneira exclusiva. Use o [object-id] para iniciar o preenchimento do stream.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:startBackfillJob

Interromper o preenchimento de um objeto de um stream

Depois de iniciar o preenchimento de um objeto de um stream, você pode interromper o preenchimento do objeto. Por exemplo, se um usuário modificar um esquema de banco de dados, o esquema ou os dados poderão estar corrompidos. Você não quer que o esquema ou os dados sejam transmitidos para o destino, então o preenchimento do objeto é interrompido.

Também é possível interromper o preenchimento de um objeto para fins de balanceamento de carga. O Datastream pode executar vários preenchimentos em paralelo. Isso pode sobrecarregar a origem. Se a carga for significativa, interrompa o preenchimento de cada objeto e, em seguida, inicie o preenchimento para os objetos, um por um.

Antes de interromper o preenchimento de um objeto de um stream, é preciso fazer uma solicitação para recuperar informações sobre todos os objetos de um stream. Cada objeto retornado tem um [object-id], que identifica o objeto de maneira exclusiva. Use o [object-id] para interromper o preenchimento do stream.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:stopBackfillJob

Excluir um stream

O código a seguir mostra uma solicitação para excluir um stream.

REST

DELETE https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

Exemplo:

DELETE https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

Para mais informações sobre como usar o gcloud para excluir sua transmissão, clique aqui.