"""
Example of using the OS Login API to apply public SSH keys for a service
account, and use that service account to run commands on a remote
instance over SSH. This example uses zonal DNS names to address instances
on the same internal VPC network.
"""
from __future__ import annotations
import argparse
import subprocess
import time
from typing import Optional
import uuid
from google.cloud import oslogin_v1
import requests
SERVICE_ACCOUNT_METADATA_URL = (
"http://metadata.google.internal/computeMetadata/v1/instance/"
"service-accounts/default/email"
)
HEADERS = {"Metadata-Flavor": "Google"}
def execute(
cmd: list[str],
cwd: Optional[str] = None,
capture_output: bool = False,
env: Optional[dict] = None,
raise_errors: bool = True,
) -> tuple[int, str]:
"""
Run an external command (wrapper for Python subprocess).
Args:
cmd: The command to be run.
cwd: Directory in which to run the command.
capture_output: Should the command output be captured and returned or just ignored.
env: Environmental variables passed to the child process.
raise_errors: Should errors in run command raise exceptions.
Returns:
Return code and captured output.
"""
print(f"Running command: {cmd}")
process = subprocess.run(
cmd,
cwd=cwd,
stdout=subprocess.PIPE if capture_output else subprocess.DEVNULL,
stderr=subprocess.STDOUT,
text=True,
env=env,
check=raise_errors,
)
output = process.stdout
returncode = process.returncode
if returncode:
print(f"Command returned error status {returncode}")
if capture_output:
print(f"With output: {output}")
return returncode, output
def create_ssh_key(
oslogin_client: oslogin_v1.OsLoginServiceClient,
account: str,
expire_time: int = 300,
) -> str:
"""
Generates a temporary SSH key pair and apply it to the specified account.
Args:
oslogin_client: OS Login client object.
account: Name of the service account this key will be assigned to.
This should be in form of `user/<service_account_username>`.
expire_time: How many seconds from now should this key be valid.
Returns:
The path to private SSH key. Public key can be found by appending `.pub`
to the file name.
"""
private_key_file = f"/tmp/key-{uuid.uuid4()}"
execute(["ssh-keygen", "-t", "rsa", "-N", "", "-f", private_key_file])
with open(f"{private_key_file}.pub") as original:
public_key = original.read().strip()
# Expiration time is in microseconds.
expiration = int((time.time() + expire_time) * 1000000)
request = oslogin_v1.ImportSshPublicKeyRequest()
request.parent = account
request.ssh_public_key.key = public_key
request.ssh_public_key.expiration_time_usec = expiration
print(f"Setting key for {account}...")
oslogin_client.import_ssh_public_key(request)
# Let the key properly propagate
time.sleep(5)
return private_key_file
def run_ssh(cmd: str, private_key_file: str, username: str, hostname: str) -> str:
"""
Runs a command on a remote system.
Args:
cmd: command to be run.
private_key_file: private SSH key to be used for authentication.
username: username to be used for authentication.
hostname: hostname of the machine you want to run the command on.
Returns:
Output of the executed command.
"""
ssh_command = [
"ssh",
"-i",
private_key_file,
"-o",
"StrictHostKeyChecking=no",
"-o",
"UserKnownHostsFile=/dev/null",
f"{username}@{hostname}",
cmd,
]
print(f"Running ssh command: {' '.join(ssh_command)}")
tries = 0
while tries < 3:
try:
ssh = subprocess.run(
ssh_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
check=True,
env={"SSH_AUTH_SOCK": ""},
timeout=10,
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as err:
time.sleep(10)
tries += 1
if tries == 3:
if isinstance(err, subprocess.CalledProcessError):
print(
f"Failed to run SSH command (return code: {err.returncode}. Output received: {err.output}"
)
else:
print("Failed to run SSH - timed out.")
raise err
else:
return ssh.stdout
def main(
cmd: str,
project: str,
instance: Optional[str] = None,
zone: Optional[str] = None,
account: Optional[str] = None,
hostname: Optional[str] = None,
oslogin: Optional[oslogin_v1.OsLoginServiceClient] = None,
) -> str:
"""
Runs a command on a remote system.
Args:
cmd: command to be executed on the remote host.
project: name of the project in which te remote instance is hosted.
instance: name of the remote system instance.
zone: zone in which the remote system resides. I.e. us-west3-a
account: account to be used for authentication.
hostname: hostname of the remote system.
oslogin: OSLogin service client object. If not provided, a new client will be created.
Returns:
The commands output.
"""
# Create the OS Login API object.
if oslogin is None:
oslogin = oslogin_v1.OsLoginServiceClient()
# Identify the service account ID if it is not already provided.
account = (
account or requests.get(SERVICE_ACCOUNT_METADATA_URL, headers=HEADERS).text
)
if not account.startswith("users/"):
account = f"users/{account}"
# Create a new SSH key pair and associate it with the service account.
private_key_file = create_ssh_key(oslogin, account)
try:
# Using the OS Login API, get the POSIX username from the login profile
# for the service account.
profile = oslogin.get_login_profile(name=account)
username = profile.posix_accounts[0].username
# Create the hostname of the target instance using the instance name,
# the zone where the instance is located, and the project that owns the
# instance.
hostname = hostname or f"{instance}.{zone}.c.{project}.internal"
# Run a command on the remote instance over SSH.
result = run_ssh(cmd, private_key_file, username, hostname)
# Print the command line output from the remote instance.
print(result)
return result
finally:
# Shred the private key and delete the pair.
execute(["shred", private_key_file])
execute(["rm", private_key_file])
execute(["rm", f"{private_key_file}.pub"])
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--cmd", default="uname -a", help="The command to run on the remote instance."
)
parser.add_argument("--project", help="Your Google Cloud project ID.")
parser.add_argument("--zone", help="The zone where the target instance is located.")
parser.add_argument("--instance", help="The target instance for the ssh command.")
parser.add_argument("--account", help="The service account email.")
parser.add_argument(
"--hostname",
help="The external IP address or hostname for the target instance.",
)
args = parser.parse_args()
main(
args.cmd,
args.project,
instance=args.instance,
zone=args.zone,
account=args.account,
hostname=args.hostname,
)