Clientseitige Verschlüsselung

Auf dieser Seite wird beschrieben, wie die clientseitige Verschlüsselung in Cloud SQL implementiert wird.


Bei der clientseitigen Verschlüsselung werden Daten vor dem Schreiben in Cloud SQL verschlüsselt. Sie können Cloud SQL-Daten so verschlüsseln, dass nur Ihre Anwendung sie entschlüsseln kann.

Zum Aktivieren der clientseitigen Verschlüsselung haben Sie folgende Möglichkeiten:

  1. Mit einem im Cloud Key Management Service (Cloud KMS) gespeicherten Verschlüsselungsschlüssel.
  2. Mit einem lokal in Ihrer Anwendung gespeicherten Verschlüsselungsschlüssel.

In diesem Thema wird beschrieben, wie Sie die erste Option verwenden, die die einfachste Möglichkeit zur Schlüsselverwaltung bietet. Wir erstellen einen Verschlüsselungsschlüssel in Cloud KMS und implementieren die Umschlagverschlüsselung mithilfe von Tink, der Open-Source-Kryptografiebibliothek von Google.

Warum ist eine clientseitige Verschlüsselung erforderlich?

Sie benötigen clientseitige Verschlüsselung, wenn Sie die Cloud SQL-Daten auf Spaltenebene 1 schützen möchten. Angenommen, Sie haben eine Tabelle mit Namen und Kreditkartennummern. Sie möchten einem Nutzer Zugriff auf diese Tabelle gewähren, er soll jedoch nicht die Kreditkartennummern sehen. Sie können die Nummern mit der clientseitigen Verschlüsselung verschlüsseln. Solange dem Nutzer kein Zugriff auf den Verschlüsselungsschlüssel in Cloud KMS gewährt wurde, kann er die Kreditkartendaten nicht lesen.

Schlüssel mit Cloud KMS erstellen

Mit Cloud KMS können Sie Schlüssel auf der Google Cloud Platform erstellen und verwalten.

Cloud KMS unterstützt viele verschiedene Schlüsseltypen. Für die clientseitige Verschlüsselung müssen Sie einen symmetrischen Schlüssel erstellen.

Um Ihrer Anwendung Zugriff auf den Schlüssel in Cloud KMS zu gewähren, müssen Sie dem Dienstkonto, das Ihre Anwendung verwendet, die Rolle cloudkms.cryptoKeyEncrypterDecrypter zuweisen. In gcloud verwenden Sie den folgenden Befehl:

gcloud kms keys add-iam-policy-binding key \
--keyring=key-ring \
--location=location \ \

Sie können den KMS-Schlüssel zwar verwenden, um Daten direkt zu verschlüsseln, hier verwenden wir aber eine flexiblere Lösung namens Umschlagverschlüsselung. Auf diese Weise können Nachrichten, die länger als 64 KB sind, verschlüsselt werden. Dies ist die maximale Nachrichtengröße, die von der Cloud Key Management Service API unterstützt wird.

Cloud KMS-Umschlagverschlüsselung

Bei der Umschlagverschlüsselung fungiert der KMS-Schlüssel als KEK. Das heißt, er wird zum Verschlüsseln von Datenverschlüsselungsschlüsseln (Data Encryption Keys, DEKs) verwendet, die wiederum zum Verschlüsseln tatsächlicher Daten verwendet werden.

Nachdem Sie einen KEK in Cloud KMS erstellt haben, müssen Sie zum Verschlüsseln jeder Nachricht Folgendes tun:

  • Generieren Sie lokal einen Datenverschlüsselungsschlüssel (Data Encryption Key, DEK).
  • Verwenden Sie diesen DEK lokal zum Verschlüsseln der Nachricht.
  • Rufen Sie Cloud KMS auf, um den DEK mit dem KEK zu verschlüsseln (zusammenzufassen).
  • Speichern Sie die verschlüsselten Daten und den zusammengefassten DEK.

In diesem Thema verwenden wir Tink, statt eine völlig neue Umschlag-Verschlüsselung zu implementieren.


Tink ist eine plattformübergreifende mehrsprachige Bibliothek, die hochwertige kryptografische APIs bereitstellt. Um Daten mit der Umschlagverschlüsselung von Tink zu verschlüsseln, stellen Sie Tink einen Schlüssel-URI bereit, der auf Ihren KEK in Cloud KMS verweist, und Anmeldedaten, mit denen Tink den KEK verwenden kann. Tink generiert den DEK, verschlüsselt die Daten, verpackt den DEK und gibt einen einzelnen Geheimtext mit den verschlüsselten Daten und dem verpackten DEK zurück.

Tink unterstützt die Umschlagverschlüsselung in C++, Java, Go und Python mithilfe der AEAD API:

public interface Aead{
  byte[] encrypt(final byte[] plaintext, final byte[] associatedData)
  byte[] decrypt(final byte[] ciphertext, final byte[] associatedData)

Neben dem normalen Argument „message/ciphertext” unterstützen die Methoden zum Verschlüsseln und Entschlüsseln optional verknüpfte Daten. Dieses Argument kann verwendet werden, um den Geheimtext mit einem Datenelement zu verknüpfen. Angenommen, Sie haben eine Datenbank mit dem Feld user-id und dem Feld encrypted-medical-history. In diesem Fall sollte das Feld user-id beim Verschlüsseln des medizinischen Verlaufs als verknüpfte Daten verwendet werden. Dadurch wird verhindert, dass ein Angreifer einen medizinischen Verlauf von einem Nutzer zu einem anderen verschieben kann. Außerdem wird bei der Ausführung einer Abfrage überprüft, ob Sie die richtige Datenzeile haben.


In diesem Abschnitt wird anhand eines Beispielcodes eine Wählerdatenbank mit clientseitiger Verschlüsselung verwendet. Der Beispielcode zeigt, wie Pub/Sub-Ereignisse in einem Dienst gelesen werden, der in Cloud Run (vollständig verwaltet) bereitgestellt wird.

  • Datenbanktabelle und Verbindungspool erstellen
  • Tink für die Umschlagverschlüsselung einrichten
  • Daten mithilfe der Umschlagverschlüsselung von Tink mit einem KEK in Cloud KMS verschlüsseln und entschlüsseln


  1. Erstellen Sie mithilfe dieser Anleitung eine Cloud SQL-Instanz. Notieren Sie den Verbindungsstring, den Datenbanknutzer und das Datenbankpasswort, das Sie erstellen.

  2. Erstellen Sie eine Datenbank für Ihre Anwendung. Folgen Sie dazu dieser Anleitung. Notieren Sie sich den Namen der Datenbank.

  3. Erstellen Sie entsprechend dieser Anleitung einen KMS-Schlüssel für Ihre Anwendung. Kopieren Sie den Ressourcennamen des erstellten Schlüssels.

  4. Erstellen Sie ein Dienstkonto mit den „Cloud SQL-Client”-Berechtigungen, indem Sie dieser Anleitung folgen.

  5. Fügen Sie Ihrem Dienstkonto die Berechtigung "Cloud KMS CryptoKey-Verschlüsseler/Entschlüsseler" für den Schlüssel hinzu. Folgen Sie dazu dieser Anleitung.

Erstellen Sie einen Verbindungspool und eine neue Tabelle in der Datenbank.

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.sql.DataSource;

public class CloudSqlConnectionPool {

  public static DataSource createConnectionPool(String dbUser, String dbPass, String dbName,
      String instanceConnectionName) {
    HikariConfig config = new HikariConfig();
    config.setUsername(dbUser); // e.g. "root", "sqlserver"
    config.setPassword(dbPass); // e.g. "my-password"
    config.addDataSourceProperty("databaseName", dbName);

    // The Cloud SQL Java Connector provides SSL encryption so 
    // it should be disabled at the driver level
    config.addDataSourceProperty("encrypt", "false");

    config.addDataSourceProperty("socketFactoryConstructorArg", instanceConnectionName);
    DataSource pool = new HikariDataSource(config);
    return pool;

  public static void createTable(DataSource pool, String tableName) throws SQLException {
    // Safely attempt to create the table schema.
    try (Connection conn = pool.getConnection()) {

      String stmt = String.format("IF NOT EXISTS("
          + "SELECT * FROM sysobjects WHERE name='%s' and xtype='U')"
          + "CREATE TABLE %s ("
          + "vote_id INT NOT NULL IDENTITY,"
          + "time_cast DATETIME NOT NULL,"
          + "team VARCHAR(6) NOT NULL,"
          + "voter_email VARBINARY(255)"
          + "PRIMARY KEY (vote_id));", tableName, tableName);
      try (PreparedStatement createTableStatement = conn.prepareStatement(stmt);) {
import pytds
import sqlalchemy
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import Integer
from sqlalchemy import Table

def init_tcp_connection_engine(
    db_user: str, db_pass: str, db_name: str, db_host: str
) -> sqlalchemy.engine.base.Engine:
    # Remember - storing secrets in plaintext is potentially unsafe. Consider using
    # something like to help keep
    # secrets secret.

    # Extract host and port from db_host
    host_args = db_host.split(":")
    db_hostname, db_port = host_args[0], int(host_args[1])

    def connect_with_pytds() -> pytds.Connection:
        return pytds.connect(
            db_hostname,  # e.g. ""
            user=db_user,  # e.g. "my-database-user"
            password=db_pass,  # e.g. "my-database-password"
            database=db_name,  # e.g. "my-database-name"
            port=db_port,  # e.g. 1433
            bytes_to_unicode=False,  # disables automatic decoding of bytes

    pool = sqlalchemy.create_engine(
        # This allows us to use the pytds sqlalchemy dialect, but also set the
        # bytes_to_unicode flag to False, which is not supported by the dialect

    print("Created TCP connection pool")
    return pool

def init_db(
    db_user: str,
    db_pass: str,
    db_name: str,
    db_host: str,
    table_name: str,
) -> sqlalchemy.engine.base.Engine:
    db = init_tcp_connection_engine(db_user, db_pass, db_name, db_host)

    # Create tables (if they don't already exist)
    if not db.has_table(table_name):
        metadata = sqlalchemy.MetaData(db)
            Column("vote_id", Integer, primary_key=True, nullable=False),
            Column("voter_email", sqlalchemy.types.VARBINARY, nullable=False),
            Column("time_cast", DateTime, nullable=False),
            Column("team", sqlalchemy.types.VARCHAR(6), nullable=False),

    print(f"Created table {table_name} in db {db_name}")
    return db

Initialisieren Sie eine Umschlag-AEAD-Primitive mit Tink.


public class CloudKmsEnvelopeAead {

  public static Aead get(String kmsUri) throws GeneralSecurityException {

    // Create a new KMS Client
    KmsClient client = new GcpKmsClient().withDefaultCredentials();

    // Create an AEAD primitive using the Cloud KMS key
    Aead gcpAead = client.getAead(kmsUri);

    // Create an envelope AEAD primitive.
    // This key should only be used for client-side encryption to ensure authenticity and integrity
    // of data.
    return new KmsEnvelopeAead(AeadKeyTemplates.AES128_GCM, gcpAead);
import logging

import tink
from tink import aead
from tink.integration import gcpkms

logger = logging.getLogger(__name__)

def init_tink_env_aead(key_uri: str, credentials: str) -> tink.aead.KmsEnvelopeAead:

        gcp_client = gcpkms.GcpKmsClient(key_uri, credentials)
        gcp_aead = gcp_client.get_aead(key_uri)
    except tink.TinkError as e:
        logger.error("Error initializing GCP client: %s", e)
        raise e

    # Create envelope AEAD primitive using AES256 GCM for encrypting the data
    # This key should only be used for client-side encryption to ensure authenticity and integrity
    # of data.
    key_template = aead.aead_key_templates.AES256_GCM
    env_aead = aead.KmsEnvelopeAead(key_template, gcp_aead)

    print(f"Created envelope AEAD Primitive using KMS URI: {key_uri}")

    return env_aead

Verschlüsseln Sie Daten und fügen Sie sie in die Datenbank ein.

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
import javax.sql.DataSource;

public class EncryptAndInsertData {

  public static void main(String[] args) throws GeneralSecurityException, SQLException {
    // Saving credentials in environment variables is convenient, but not secure - consider a more
    // secure solution such as Cloud Secret Manager to help keep secrets safe.
    String dbUser = System.getenv("DB_USER"); // e.g. "root", "mysql"
    String dbPass = System.getenv("DB_PASS"); // e.g. "mysupersecretpassword"
    String dbName = System.getenv("DB_NAME"); // e.g. "votes_db"
    String instanceConnectionName =
        System.getenv("INSTANCE_CONNECTION_NAME"); // e.g. "project-name:region:instance-name"
    String kmsUri = System.getenv("CLOUD_KMS_URI"); // e.g. "gcp-kms://projects/...path/to/key
    // Tink uses the "gcp-kms://" prefix for paths to keys stored in Google Cloud KMS. For more
    // info on creating a KMS key and getting its path, see

    String team = "TABS";
    String tableName = "votes";
    String email = "";

    // Initialize database connection pool and create table if it does not exist
    // See for setup details
    DataSource pool =
        CloudSqlConnectionPool.createConnectionPool(dbUser, dbPass, dbName, instanceConnectionName);
    CloudSqlConnectionPool.createTable(pool, tableName);

    // Initialize envelope AEAD
    // See for setup details
    Aead envAead = CloudKmsEnvelopeAead.get(kmsUri);

    encryptAndInsertData(pool, envAead, tableName, team, email);

  public static void encryptAndInsertData(
      DataSource pool, Aead envAead, String tableName, String team, String email)
      throws GeneralSecurityException, SQLException {

    try (Connection conn = pool.getConnection()) {
      String stmt =
              "INSERT INTO %s (team, time_cast, voter_email) VALUES (?, ?, ?);", tableName);
      try (PreparedStatement voteStmt = conn.prepareStatement(stmt); ) {
        voteStmt.setString(1, team);
        voteStmt.setTimestamp(2, new Timestamp(new Date().getTime()));

        // Use the envelope AEAD primitive to encrypt the email, using the team name as
        // associated data. This binds the encryption of the email to the team name, preventing
        // associating an encrypted email in one row with a team name in another row.
        byte[] encryptedEmail = envAead.encrypt(email.getBytes(), team.getBytes());
        voteStmt.setBytes(3, encryptedEmail);

        // Finally, execute the statement. If it fails, an error will be thrown.
        System.out.println(String.format("Successfully inserted row into table %s", tableName));
import datetime
import logging
import os

import sqlalchemy
import tink

from .cloud_kms_env_aead import init_tink_env_aead
from .cloud_sql_connection_pool import init_db

logger = logging.getLogger(__name__)

def main() -> None:
    db_user = os.environ["DB_USER"]  # e.g. "root", "sqlserver"
    db_pass = os.environ["DB_PASS"]  # e.g. "mysupersecretpassword"
    db_name = os.environ["DB_NAME"]  # e.g. "votes_db"

    # Set if connecting using TCP:
    db_host = os.environ["DB_HOST"]  # e.g. ""

    # Set if connecting using Unix sockets:
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]
    # e.g. "project-name:region:instance-name"

    credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")
    key_uri = "gcp-kms://" + os.environ["GCP_KMS_URI"]
    # e.g. "gcp-kms://projects/...path/to/key
    # Tink uses the "gcp-kms://" prefix for paths to keys stored in Google
    # Cloud KMS. For more info on creating a KMS key and getting its path, see

    table_name = "votes"
    team = "TABS"
    email = ""

    env_aead = init_tink_env_aead(key_uri, credentials)
    db = init_db(

    encrypt_and_insert_data(db, env_aead, table_name, team, email)

def encrypt_and_insert_data(
    db: sqlalchemy.engine.base.Engine,
    env_aead: tink.aead.KmsEnvelopeAead,
    table_name: str,
    team: str,
    email: str,
) -> None:
    time_cast =
    # Use the envelope AEAD primitive to encrypt the email, using the team name as
    # associated data. Encryption with associated data ensures authenticity
    # (who the sender is) and integrity (the data has not been tampered with) of that
    # data, but not its secrecy. (see RFC 5116 for more info)
    encrypted_email = env_aead.encrypt(email.encode(), team.encode())
    # Verify that the team is one of the allowed options
    if team != "TABS" and team != "SPACES":
        logger.error(f"Invalid team specified: {team}")

    # Preparing a statement before hand can help protect against injections.
    stmt = sqlalchemy.text(
        f"INSERT INTO {table_name} (time_cast, team, voter_email)"
        " VALUES (:time_cast, :team, CONVERT(varbinary(max), :voter_email, 0))"

    # Using a with statement ensures that the connection is always released
    # back into the pool at the end of statement (even if an error occurs)
    with db.connect() as conn:
        conn.execute(stmt, time_cast=time_cast, team=team, voter_email=encrypted_email)
    print(f"Vote successfully cast for '{team}' at time {time_cast}!")

Fragen Sie die Datenbank ab und entschlüsseln Sie die gespeicherten Daten.

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import javax.sql.DataSource;

public class QueryAndDecryptData {

  public static void main(String[] args) throws GeneralSecurityException, SQLException {
    // Saving credentials in environment variables is convenient, but not secure - consider a more
    // secure solution such as Cloud Secret Manager to help keep secrets safe.
    String dbUser = System.getenv("DB_USER"); // e.g. "root", "mysql"
    String dbPass = System.getenv("DB_PASS"); // e.g. "mysupersecretpassword"
    String dbName = System.getenv("DB_NAME"); // e.g. "votes_db"
    String instanceConnectionName =
        System.getenv("INSTANCE_CONNECTION_NAME"); // e.g. "project-name:region:instance-name"
    String kmsUri = System.getenv("CLOUD_KMS_URI"); // e.g. "gcp-kms://projects/...path/to/key
    // Tink uses the "gcp-kms://" prefix for paths to keys stored in Google Cloud KMS. For more
    // info on creating a KMS key and getting its path, see

    String tableName = "votes123";

    // Initialize database connection pool and create table if it does not exist
    // See for setup details
    DataSource pool =
        CloudSqlConnectionPool.createConnectionPool(dbUser, dbPass, dbName, instanceConnectionName);
    CloudSqlConnectionPool.createTable(pool, tableName);

    // Initialize envelope AEAD
    // See for setup details
    Aead envAead = CloudKmsEnvelopeAead.get(kmsUri);

    // Insert row into table to test
    // See for setup details
        pool, envAead, tableName, "SPACES", "");

    queryAndDecryptData(pool, envAead, tableName);

  public static void queryAndDecryptData(DataSource pool, Aead envAead, String tableName)
      throws GeneralSecurityException, SQLException {

    try (Connection conn = pool.getConnection()) {
      String stmt =
              "SELECT TOP(5) team, time_cast, voter_email FROM %s ORDER BY time_cast DESC;",
      try (PreparedStatement voteStmt = conn.prepareStatement(stmt); ) {
        ResultSet voteResults = voteStmt.executeQuery();

        System.out.println("Team\tTime Cast\tEmail");
        while ( {
          String team = voteResults.getString(1);
          Timestamp timeCast = voteResults.getTimestamp(2);

          // Use the envelope AEAD primitive to encrypt the email, using the team name as
          // associated data. This binds the encryption of the email to the team name, preventing
          // associating an encrypted email in one row with a team name in another row.
          String email = new String(envAead.decrypt(voteResults.getBytes(3), team.getBytes()));

          System.out.println(String.format("%s\t%s\t%s", team, timeCast, email));
import os

import sqlalchemy
import tink

from .cloud_kms_env_aead import init_tink_env_aead
from .cloud_sql_connection_pool import init_db
from .encrypt_and_insert_data import encrypt_and_insert_data

def main() -> None:
    db_user = os.environ["DB_USER"]  # e.g. "root", "sqlserver"
    db_pass = os.environ["DB_PASS"]  # e.g. "mysupersecretpassword"
    db_name = os.environ["DB_NAME"]  # e.g. "votes_db"

    # Set if connecting using TCP:
    db_host = os.environ["DB_HOST"]  # e.g. ""

    # Set if connecting using Unix sockets:
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]
    # e.g. "project-name:region:instance-name"

    credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")
    key_uri = "gcp-kms://" + os.environ["GCP_KMS_URI"]
    # e.g. "gcp-kms://projects/...path/to/key
    # Tink uses the "gcp-kms://" prefix for paths to keys stored in Google
    # Cloud KMS. For more info on creating a KMS key and getting its path, see

    table_name = "votes"
    team = "TABS"
    email = ""

    env_aead = init_tink_env_aead(key_uri, credentials)
    db = init_db(

    encrypt_and_insert_data(db, env_aead, table_name, team, email)
    query_and_decrypt_data(db, env_aead, table_name)

def query_and_decrypt_data(
    db: sqlalchemy.engine.base.Engine,
    env_aead: tink.aead.KmsEnvelopeAead,
    table_name: str,
) -> None:
    with db.connect() as conn:
        # Execute the query and fetch all results
        recent_votes = conn.execute(
            f"SELECT TOP(5) team, time_cast, voter_email FROM {table_name} "
            "ORDER BY time_cast DESC"

        print("Team\tEmail\tTime Cast")

        for row in recent_votes:
            team = row[0]
            # Use the envelope AEAD primitive to decrypt the email, using the team name as
            # associated data. Encryption with associated data ensures authenticity
            # (who the sender is) and integrity (the data has not been tampered with) of that
            # data, but not its secrecy. (see RFC 5116 for more info)
            email = env_aead.decrypt(row[2], team).decode()
            time_cast = row[1]

            # Print recent votes

  1. Sie können den Zugriff auch auf Instanz- oder Datenbankebene beschränken.