from __future__ import annotations
import sys
import time
from typing import Any
from google.api_core.extended_operation import ExtendedOperation
from google.cloud import compute_v1
def wait_for_extended_operation(
operation: ExtendedOperation, verbose_name: str = "operation", timeout: int = 300
) -> Any:
"""
Waits for the extended (long-running) operation to complete.
If the operation is successful, it will return its result.
If the operation ends with an error, an exception will be raised.
If there were any warnings during the execution of the operation
they will be printed to sys.stderr.
Args:
operation: a long-running operation you want to wait on.
verbose_name: (optional) a more verbose name of the operation,
used only during error and warning reporting.
timeout: how long (in seconds) to wait for operation to finish.
If None, wait indefinitely.
Returns:
Whatever the operation.result() returns.
Raises:
This method will raise the exception received from `operation.exception()`
or RuntimeError if there is no exception set, but there is an `error_code`
set for the `operation`.
In case of an operation taking longer than `timeout` seconds to complete,
a `concurrent.futures.TimeoutError` will be raised.
"""
result = operation.result(timeout=timeout)
if operation.error_code:
print(
f"Error during {verbose_name}: [Code: {operation.error_code}]: {operation.error_message}",
file=sys.stderr,
flush=True,
)
print(f"Operation ID: {operation.name}", file=sys.stderr, flush=True)
raise operation.exception() or RuntimeError(operation.error_message)
if operation.warnings:
print(f"Warnings during {verbose_name}:\n", file=sys.stderr, flush=True)
for warning in operation.warnings:
print(f" - {warning.code}: {warning.message}", file=sys.stderr, flush=True)
return result
def add_extended_memory_to_instance(
project_id: str, zone: str, instance_name: str, new_memory: int
):
"""
Modify an existing VM to use extended memory.
Args:
project_id: project ID or project number of the Cloud project you want to use.
zone: name of the zone to create the instance in. For example: "us-west3-b"
instance_name: name of the new virtual machine (VM) instance.
new_memory: the amount of memory for the VM instance, in megabytes.
Returns:
Instance object.
"""
instance_client = compute_v1.InstancesClient()
instance = instance_client.get(
project=project_id, zone=zone, instance=instance_name
)
if not (
"n1-" in instance.machine_type
or "n2-" in instance.machine_type
or "n2d-" in instance.machine_type
):
raise RuntimeError("Extra memory is available only for N1, N2 and N2D CPUs.")
# Make sure that the machine is turned off
if instance.status not in (
instance.Status.TERMINATED.name,
instance.Status.STOPPED.name,
):
operation = instance_client.stop(
project=project_id, zone=zone, instance=instance_name
)
wait_for_extended_operation(operation, "instance stopping")
start = time.time()
while instance.status not in (
instance.Status.TERMINATED.name,
instance.Status.STOPPED.name,
):
# Waiting for the instance to be turned off.
instance = instance_client.get(
project=project_id, zone=zone, instance=instance_name
)
time.sleep(2)
if time.time() - start >= 300: # 5 minutes
raise TimeoutError()
# Modify the machine definition, remember that extended memory is available only for N1, N2 and N2D CPUs
start, end = instance.machine_type.rsplit("-", maxsplit=1)
instance.machine_type = start + f"-{new_memory}-ext"
# TODO: If you prefer to use the CustomMachineType helper class, uncomment this code and comment the 2 lines above
# Using CustomMachineType helper
# cmt = CustomMachineType.from_str(instance.machine_type)
# cmt.memory_mb = new_memory
# cmt.extra_memory_used = True
# instance.machine_type = str(cmt)
operation = instance_client.update(
project=project_id,
zone=zone,
instance=instance_name,
instance_resource=instance,
)
wait_for_extended_operation(operation, "instance update")
return instance_client.get(project=project_id, zone=zone, instance=instance_name)