클라이언트 측 암호화에 관한 정보

이 페이지에서는 Cloud SQL에서 클라이언트 측 암호화를 구현하는 방법을 설명합니다.

개요

클라이언트 측 암호화는 Cloud SQL에 데이터를 기록하기 전에 데이터를 암호화하는 작업입니다. 애플리케이션만 복호화할 수 있는 방식으로 Cloud SQL 데이터를 암호화할 수 있습니다.

클라이언트 측 암호화를 사용 설정하려면 다음 옵션을 사용하세요.

  1. Cloud Key Management Service(Cloud KMS)에 저장된 암호화 키를 사용합니다.
  2. 애플리케이션에 로컬로 저장된 암호화 키를 사용합니다.

이 주제에서는 가장 원활한 키 관리 옵션을 제공하는 첫 번째 옵션을 사용하는 방법을 설명합니다. Google에서는 Cloud KMS에 암호화 키를 만들고 Google의 오픈소스 암호화 라이브러리인 Tink를 사용하여 봉투 암호화를 구현합니다.

클라이언트 측 암호화가 필요한 이유는 무엇인가요?

열 수준 1에서 Cloud SQL 데이터를 보호하려면 클라이언트측 암호화가 필요합니다. 이름과 신용카드 번호를 포함하는 테이블이 있다고 가정해 보겠습니다. 사용자에게 이 테이블에 대한 액세스 권한을 부여하려고 하지만 사용자가 신용카드 번호를 보는 것은 원하지 않을 수 있습니다. 클라이언트 측 암호화를 사용하여 번호를 암호화할 수 있습니다. 사용자가 Cloud KMS의 암호화 키에 대한 액세스 권한을 부여받지 않는 한 신용카드 정보를 읽을 수 없습니다.

Cloud KMS를 사용하여 키 만들기

Cloud KMS를 사용하면 Google Cloud Platform에서 키를 만들고 관리할 수 있습니다.

Cloud KMS는 다양한 키 유형을 지원합니다. 클라이언트 측 암호화의 경우 대칭 키를 만들어야 합니다.

애플리케이션에 Cloud KMS의 키에 대한 액세스 권한을 부여하려면 애플리케이션에서 사용하는 서비스 계정에 cloudkms.cryptoKeyEncrypterDecrypter 역할을 부여해야 합니다. gcloud에서는 다음 명령어를 사용하여 이 작업을 수행합니다.

gcloud kms keys add-iam-policy-binding key \
--keyring=key-ring \
--location=location \
--member=serviceAccount:service-account-name@example.domain.com \
--role=roles/cloudkms.cryptoKeyEncrypterDecrypter

KMS 키를 사용하여 직접 데이터를 암호화할 수 있지만 여기에서는 봉투 암호화라는 더 유연한 솔루션을 사용합니다. 그러면 Cloud Key Management Service API가 지원할 수 있는 최대 메시지 크기인 64KB보다 긴 메시지를 암호화할 수 있습니다.

Cloud KMS 봉투 암호화

봉투 암호화에서 KMS 키는 키 암호화 키(KEK) 역할을 합니다. 즉, 실제 데이터를 암호화하는 데 사용되는 데이터 암호화 키(DEK)를 암호화하는 데 사용됩니다.

Cloud KMS에서 KEK를 만든 후 각 메시지를 암호화하려면 다음을 수행해야 합니다.

  • 로컬에서 데이터 암호화 키(DEK)를 생성합니다.
  • 이 DEK를 로컬에서 사용하여 메시지를 암호화합니다.
  • Cloud KMS를 호출하여 KEK로 DEK를 암호화(래핑)합니다.
  • 암호화된 데이터와 래핑된 DEK를 저장합니다.

이 주제에서는 봉투 암호화를 처음부터 구현하는 대신 Tink를 사용합니다.

Tink

Tink는 고수준 암호화 API를 제공하는 다국어 교차 플랫폼 라이브러리입니다. Tink의 봉투 암호화로 데이터를 암호화하려면 Cloud KMS의 KEK를 가리키는 키 URI 및 Tink가 KEK를 사용할 수 있도록 하는 사용자 인증 정보를 Tink에 제공해야 합니다. Tink는 DEK를 생성하고, 데이터를 암호화하고, DEK를 래핑하고, 암호화된 데이터와 래핑된 DEK가 있는 단일 암호문을 반환합니다.

Tink는 AEAD API를 사용하여 C++, 자바, Go, Python에서 봉투 암호화를 지원합니다.

public interface Aead{
  byte[] encrypt(final byte[] plaintext, final byte[] associatedData)
  throws
  byte[] decrypt(final byte[] ciphertext, final byte[] associatedData)
  throws
}

일반 메시지/암호문 인수 외에도 암호화 및 복호화 메서드는 추가 관련 데이터를 지원합니다. 이 인수는 암호문을 데이터 조각과 연결하는 데 사용할 수 있습니다. 예를 들어 user-id 필드와 encrypted-medical-history 필드가 있는 데이터베이스가 있다고 가정해 보겠습니다. 이 경우 의료 기록을 암호화할 때 user-id 필드를 연결된 데이터로 사용해야 할 수 있습니다. 이렇게 하면 공격자가 한 사용자의 의료 기록을 다른 사용자로 이동할 수 없습니다. 또한 쿼리를 실행할 때 올바른 데이터 행이 있는지 확인하는 데도 사용됩니다.

샘플

이 섹션에서는 클라이언트 측 암호화를 사용하는 유권자 정보 데이터베이스의 샘플 코드를 살펴봅니다. 샘플 코드는 다음을 수행하는 방법을 보여줍니다.

  • 데이터베이스 테이블 및 연결 풀 만들기
  • 봉투 암호화를 위한 Tink 설정
  • Cloud KMS의 KEK로 Tink의 봉투 암호화를 사용하여 데이터 암호화 및 복호화

시작하기 전에

  1. 다음 안내에 따라 Cloud SQL 인스턴스를 만듭니다. 생성한 연결 문자열, 데이터베이스 사용자, 데이터베이스 비밀번호를 기록해 둡니다.

  2. 안내에 따라 애플리케이션용 데이터베이스를 만듭니다. 데이터베이스 이름을 기록해 둡니다.

  3. 안내에 따라 애플리케이션의 KMS 키를 만듭니다. 생성된 키의 리소스 이름을 복사합니다.

  4. 안내에 따라 'Cloud SQL 클라이언트' 권한으로 서비스 계정을 만듭니다.

  5. 안내에 따라 서비스 계정에 키에 대한 'Cloud KMS CryptoKey 암호화/복호화' 권한을 추가합니다.

연결 풀을 만들고 데이터베이스에 새 테이블을 만듭니다.

자바


import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.sql.DataSource;

public class CloudSqlConnectionPool {

  public static DataSource createConnectionPool(String dbUser, String dbPass, String dbName,
      String instanceConnectionName) {
    HikariConfig config = new HikariConfig();
    config.setDataSourceClassName("com.microsoft.sqlserver.jdbc.SQLServerDataSource");
    config.setUsername(dbUser); // e.g. "root", "sqlserver"
    config.setPassword(dbPass); // e.g. "my-password"
    config.addDataSourceProperty("databaseName", dbName);

    // The Cloud SQL Java Connector provides SSL encryption so 
    // it should be disabled at the driver level
    config.addDataSourceProperty("encrypt", "false");

    config.addDataSourceProperty("socketFactoryClass",
        "com.google.cloud.sql.sqlserver.SocketFactory");
    config.addDataSourceProperty("socketFactoryConstructorArg", instanceConnectionName);
    DataSource pool = new HikariDataSource(config);
    return pool;
  }

  public static void createTable(DataSource pool, String tableName) throws SQLException {
    // Safely attempt to create the table schema.
    try (Connection conn = pool.getConnection()) {

      String stmt = String.format("IF NOT EXISTS("
          + "SELECT * FROM sysobjects WHERE name='%s' and xtype='U')"
          + "CREATE TABLE %s ("
          + "vote_id INT NOT NULL IDENTITY,"
          + "time_cast DATETIME NOT NULL,"
          + "team VARCHAR(6) NOT NULL,"
          + "voter_email VARBINARY(255)"
          + "PRIMARY KEY (vote_id));", tableName, tableName);
      try (PreparedStatement createTableStatement = conn.prepareStatement(stmt);) {
        createTableStatement.execute();
      }
    }
  }
}

Python

import pytds
import sqlalchemy
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import Integer
from sqlalchemy import Table


def init_tcp_connection_engine(
    db_user: str, db_pass: str, db_name: str, db_host: str
) -> sqlalchemy.engine.base.Engine:
    # Remember - storing secrets in plaintext is potentially unsafe. Consider using
    # something like https://cloud.google.com/secret-manager/docs/overview to help keep
    # secrets secret.

    # Extract host and port from db_host
    host_args = db_host.split(":")
    db_hostname, db_port = host_args[0], int(host_args[1])

    def connect_with_pytds() -> pytds.Connection:
        return pytds.connect(
            db_hostname,  # e.g. "127.0.0.1"
            user=db_user,  # e.g. "my-database-user"
            password=db_pass,  # e.g. "my-database-password"
            database=db_name,  # e.g. "my-database-name"
            port=db_port,  # e.g. 1433
            bytes_to_unicode=False,  # disables automatic decoding of bytes
        )

    pool = sqlalchemy.create_engine(
        # This allows us to use the pytds sqlalchemy dialect, but also set the
        # bytes_to_unicode flag to False, which is not supported by the dialect
        "mssql+pytds://",
        creator=connect_with_pytds,
    )

    print("Created TCP connection pool")
    return pool


def init_db(
    db_user: str,
    db_pass: str,
    db_name: str,
    db_host: str,
    table_name: str,
) -> sqlalchemy.engine.base.Engine:
    db = init_tcp_connection_engine(db_user, db_pass, db_name, db_host)

    # Create tables (if they don't already exist)
    if not db.has_table(table_name):
        metadata = sqlalchemy.MetaData(db)
        Table(
            table_name,
            metadata,
            Column("vote_id", Integer, primary_key=True, nullable=False),
            Column("voter_email", sqlalchemy.types.VARBINARY, nullable=False),
            Column("time_cast", DateTime, nullable=False),
            Column("team", sqlalchemy.types.VARCHAR(6), nullable=False),
        )
        metadata.create_all()

    print(f"Created table {table_name} in db {db_name}")
    return db

Tink로 봉투 AEAD 프리미티브를 초기화합니다.

자바


import com.google.crypto.tink.Aead;
import com.google.crypto.tink.KmsClient;
import com.google.crypto.tink.aead.AeadConfig;
import com.google.crypto.tink.aead.AeadKeyTemplates;
import com.google.crypto.tink.aead.KmsEnvelopeAead;
import com.google.crypto.tink.integration.gcpkms.GcpKmsClient;
import java.security.GeneralSecurityException;

public class CloudKmsEnvelopeAead {

  public static Aead get(String kmsUri) throws GeneralSecurityException {
    AeadConfig.register();

    // Create a new KMS Client
    KmsClient client = new GcpKmsClient().withDefaultCredentials();

    // Create an AEAD primitive using the Cloud KMS key
    Aead gcpAead = client.getAead(kmsUri);

    // Create an envelope AEAD primitive.
    // This key should only be used for client-side encryption to ensure authenticity and integrity
    // of data.
    return new KmsEnvelopeAead(AeadKeyTemplates.AES128_GCM, gcpAead);
  }
}

Python

import logging

import tink
from tink import aead
from tink.integration import gcpkms

logger = logging.getLogger(__name__)


def init_tink_env_aead(key_uri: str, credentials: str) -> tink.aead.KmsEnvelopeAead:
    aead.register()

    try:
        gcp_client = gcpkms.GcpKmsClient(key_uri, credentials)
        gcp_aead = gcp_client.get_aead(key_uri)
    except tink.TinkError as e:
        logger.error("Error initializing GCP client: %s", e)
        raise e

    # Create envelope AEAD primitive using AES256 GCM for encrypting the data
    # This key should only be used for client-side encryption to ensure authenticity and integrity
    # of data.
    key_template = aead.aead_key_templates.AES256_GCM
    env_aead = aead.KmsEnvelopeAead(key_template, gcp_aead)

    print(f"Created envelope AEAD Primitive using KMS URI: {key_uri}")

    return env_aead

데이터를 암호화하고 데이터베이스에 삽입합니다.

자바


import com.google.crypto.tink.Aead;
import java.security.GeneralSecurityException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
import javax.sql.DataSource;

public class EncryptAndInsertData {

  public static void main(String[] args) throws GeneralSecurityException, SQLException {
    // Saving credentials in environment variables is convenient, but not secure - consider a more
    // secure solution such as Cloud Secret Manager to help keep secrets safe.
    String dbUser = System.getenv("DB_USER"); // e.g. "root", "mysql"
    String dbPass = System.getenv("DB_PASS"); // e.g. "mysupersecretpassword"
    String dbName = System.getenv("DB_NAME"); // e.g. "votes_db"
    String instanceConnectionName =
        System.getenv("INSTANCE_CONNECTION_NAME"); // e.g. "project-name:region:instance-name"
    String kmsUri = System.getenv("CLOUD_KMS_URI"); // e.g. "gcp-kms://projects/...path/to/key
    // Tink uses the "gcp-kms://" prefix for paths to keys stored in Google Cloud KMS. For more
    // info on creating a KMS key and getting its path, see
    // https://cloud.google.com/kms/docs/quickstart

    String team = "TABS";
    String tableName = "votes";
    String email = "hello@example.com";

    // Initialize database connection pool and create table if it does not exist
    // See CloudSqlConnectionPool.java for setup details
    DataSource pool =
        CloudSqlConnectionPool.createConnectionPool(dbUser, dbPass, dbName, instanceConnectionName);
    CloudSqlConnectionPool.createTable(pool, tableName);

    // Initialize envelope AEAD
    // See CloudKmsEnvelopeAead.java for setup details
    Aead envAead = CloudKmsEnvelopeAead.get(kmsUri);

    encryptAndInsertData(pool, envAead, tableName, team, email);
  }

  public static void encryptAndInsertData(
      DataSource pool, Aead envAead, String tableName, String team, String email)
      throws GeneralSecurityException, SQLException {

    try (Connection conn = pool.getConnection()) {
      String stmt =
          String.format(
              "INSERT INTO %s (team, time_cast, voter_email) VALUES (?, ?, ?);", tableName);
      try (PreparedStatement voteStmt = conn.prepareStatement(stmt); ) {
        voteStmt.setString(1, team);
        voteStmt.setTimestamp(2, new Timestamp(new Date().getTime()));

        // Use the envelope AEAD primitive to encrypt the email, using the team name as
        // associated data. This binds the encryption of the email to the team name, preventing
        // associating an encrypted email in one row with a team name in another row.
        byte[] encryptedEmail = envAead.encrypt(email.getBytes(), team.getBytes());
        voteStmt.setBytes(3, encryptedEmail);

        // Finally, execute the statement. If it fails, an error will be thrown.
        voteStmt.execute();
        System.out.println(String.format("Successfully inserted row into table %s", tableName));
      }
    }
  }
}

Python

import datetime
import logging
import os

import sqlalchemy
import tink

from .cloud_kms_env_aead import init_tink_env_aead
from .cloud_sql_connection_pool import init_db


logger = logging.getLogger(__name__)


def main() -> None:
    db_user = os.environ["DB_USER"]  # e.g. "root", "sqlserver"
    db_pass = os.environ["DB_PASS"]  # e.g. "mysupersecretpassword"
    db_name = os.environ["DB_NAME"]  # e.g. "votes_db"

    # Set if connecting using TCP:
    db_host = os.environ["DB_HOST"]  # e.g. "127.0.0.1"

    # Set if connecting using Unix sockets:
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]
    # e.g. "project-name:region:instance-name"

    credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")
    key_uri = "gcp-kms://" + os.environ["GCP_KMS_URI"]
    # e.g. "gcp-kms://projects/...path/to/key
    # Tink uses the "gcp-kms://" prefix for paths to keys stored in Google
    # Cloud KMS. For more info on creating a KMS key and getting its path, see
    # https://cloud.google.com/kms/docs/quickstart

    table_name = "votes"
    team = "TABS"
    email = "hello@example.com"

    env_aead = init_tink_env_aead(key_uri, credentials)
    db = init_db(
        db_user,
        db_pass,
        db_name,
        table_name,
        instance_connection_name,
        db_socket_dir,
        db_host,
    )

    encrypt_and_insert_data(db, env_aead, table_name, team, email)


def encrypt_and_insert_data(
    db: sqlalchemy.engine.base.Engine,
    env_aead: tink.aead.KmsEnvelopeAead,
    table_name: str,
    team: str,
    email: str,
) -> None:
    time_cast = datetime.datetime.now(tz=datetime.timezone.utc)
    # Use the envelope AEAD primitive to encrypt the email, using the team name as
    # associated data. Encryption with associated data ensures authenticity
    # (who the sender is) and integrity (the data has not been tampered with) of that
    # data, but not its secrecy. (see RFC 5116 for more info)
    encrypted_email = env_aead.encrypt(email.encode(), team.encode())
    # Verify that the team is one of the allowed options
    if team != "TABS" and team != "SPACES":
        logger.error(f"Invalid team specified: {team}")
        return

    # Preparing a statement before hand can help protect against injections.
    stmt = sqlalchemy.text(
        f"INSERT INTO {table_name} (time_cast, team, voter_email)"
        " VALUES (:time_cast, :team, CONVERT(varbinary(max), :voter_email, 0))"
    )

    # Using a with statement ensures that the connection is always released
    # back into the pool at the end of statement (even if an error occurs)
    with db.connect() as conn:
        conn.execute(stmt, time_cast=time_cast, team=team, voter_email=encrypted_email)
    print(f"Vote successfully cast for '{team}' at time {time_cast}!")

데이터베이스를 쿼리하고 저장된 데이터를 복호화합니다.

자바


import com.google.crypto.tink.Aead;
import java.security.GeneralSecurityException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import javax.sql.DataSource;

public class QueryAndDecryptData {

  public static void main(String[] args) throws GeneralSecurityException, SQLException {
    // Saving credentials in environment variables is convenient, but not secure - consider a more
    // secure solution such as Cloud Secret Manager to help keep secrets safe.
    String dbUser = System.getenv("DB_USER"); // e.g. "root", "mysql"
    String dbPass = System.getenv("DB_PASS"); // e.g. "mysupersecretpassword"
    String dbName = System.getenv("DB_NAME"); // e.g. "votes_db"
    String instanceConnectionName =
        System.getenv("INSTANCE_CONNECTION_NAME"); // e.g. "project-name:region:instance-name"
    String kmsUri = System.getenv("CLOUD_KMS_URI"); // e.g. "gcp-kms://projects/...path/to/key
    // Tink uses the "gcp-kms://" prefix for paths to keys stored in Google Cloud KMS. For more
    // info on creating a KMS key and getting its path, see
    // https://cloud.google.com/kms/docs/quickstart

    String tableName = "votes123";

    // Initialize database connection pool and create table if it does not exist
    // See CloudSqlConnectionPool.java for setup details
    DataSource pool =
        CloudSqlConnectionPool.createConnectionPool(dbUser, dbPass, dbName, instanceConnectionName);
    CloudSqlConnectionPool.createTable(pool, tableName);

    // Initialize envelope AEAD
    // See CloudKmsEnvelopeAead.java for setup details
    Aead envAead = CloudKmsEnvelopeAead.get(kmsUri);

    // Insert row into table to test
    // See EncryptAndInsert.java for setup details
    EncryptAndInsertData.encryptAndInsertData(
        pool, envAead, tableName, "SPACES", "hello@example.com");

    queryAndDecryptData(pool, envAead, tableName);
  }

  public static void queryAndDecryptData(DataSource pool, Aead envAead, String tableName)
      throws GeneralSecurityException, SQLException {

    try (Connection conn = pool.getConnection()) {
      String stmt =
          String.format(
              "SELECT TOP(5) team, time_cast, voter_email FROM %s ORDER BY time_cast DESC;",
              tableName);
      try (PreparedStatement voteStmt = conn.prepareStatement(stmt); ) {
        ResultSet voteResults = voteStmt.executeQuery();

        System.out.println("Team\tTime Cast\tEmail");
        while (voteResults.next()) {
          String team = voteResults.getString(1);
          Timestamp timeCast = voteResults.getTimestamp(2);

          // Use the envelope AEAD primitive to encrypt the email, using the team name as
          // associated data. This binds the encryption of the email to the team name, preventing
          // associating an encrypted email in one row with a team name in another row.
          String email = new String(envAead.decrypt(voteResults.getBytes(3), team.getBytes()));

          System.out.println(String.format("%s\t%s\t%s", team, timeCast, email));
        }
      }
    }
  }
}

Python

import os

import sqlalchemy
import tink

from .cloud_kms_env_aead import init_tink_env_aead
from .cloud_sql_connection_pool import init_db
from .encrypt_and_insert_data import encrypt_and_insert_data


def main() -> None:
    db_user = os.environ["DB_USER"]  # e.g. "root", "sqlserver"
    db_pass = os.environ["DB_PASS"]  # e.g. "mysupersecretpassword"
    db_name = os.environ["DB_NAME"]  # e.g. "votes_db"

    # Set if connecting using TCP:
    db_host = os.environ["DB_HOST"]  # e.g. "127.0.0.1"

    # Set if connecting using Unix sockets:
    db_socket_dir = os.environ.get("DB_SOCKET_DIR", "/cloudsql")

    instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"]
    # e.g. "project-name:region:instance-name"

    credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")
    key_uri = "gcp-kms://" + os.environ["GCP_KMS_URI"]
    # e.g. "gcp-kms://projects/...path/to/key
    # Tink uses the "gcp-kms://" prefix for paths to keys stored in Google
    # Cloud KMS. For more info on creating a KMS key and getting its path, see
    # https://cloud.google.com/kms/docs/quickstart

    table_name = "votes"
    team = "TABS"
    email = "hello@example.com"

    env_aead = init_tink_env_aead(key_uri, credentials)
    db = init_db(
        db_user,
        db_pass,
        db_name,
        table_name,
        instance_connection_name,
        db_socket_dir,
        db_host,
    )

    encrypt_and_insert_data(db, env_aead, table_name, team, email)
    query_and_decrypt_data(db, env_aead, table_name)


def query_and_decrypt_data(
    db: sqlalchemy.engine.base.Engine,
    env_aead: tink.aead.KmsEnvelopeAead,
    table_name: str,
) -> None:
    with db.connect() as conn:
        # Execute the query and fetch all results
        recent_votes = conn.execute(
            f"SELECT TOP(5) team, time_cast, voter_email FROM {table_name} "
            "ORDER BY time_cast DESC"
        ).fetchall()

        print("Team\tEmail\tTime Cast")

        for row in recent_votes:
            team = row[0]
            # Use the envelope AEAD primitive to decrypt the email, using the team name as
            # associated data. Encryption with associated data ensures authenticity
            # (who the sender is) and integrity (the data has not been tampered with) of that
            # data, but not its secrecy. (see RFC 5116 for more info)
            email = env_aead.decrypt(row[2], team).decode()
            time_cast = row[1]

            # Print recent votes
            print(f"{team}\t{email}\t{time_cast}")


  1. 인스턴스 또는 데이터베이스 수준에서 액세스를 제한할 수도 있습니다.