流式传输

Cloud Storage 支持流式传输(允许您通过流式传输将数据传入和传出 Cloud Storage 帐号,而无需先将数据保存到文件中)。流式传输在以下情况下很有用:

  • 您想要上传数据,但是在上传开始时不知道数据的最终大小(例如从进程生成上传数据或者即时压缩对象时)。

  • 您想要将 Cloud Storage 中的数据下载到进程中。

流式传输时使用校验和验证

由于只能在上传的初始请求中提供校验和,因此在流式传输时利用 Cloud Storage 的校验和验证通常不可行。建议您始终使用校验和验证,并且可以在流式上传完成后手动执行此操作:但是,在传输完成后进行验证意味着在确认损坏并移除数据期间可以访问任何损坏的数据。

如果您需要在上传完成且数据可以访问之前进行校验和验证,则不应使用流式上传。您应该改用在对象完成之前执行校验和验证的上传选项

同样,如果要求在下载完成且数据可访问之前执行校验和验证,则不应使用流式下载。这是因为流式下载使用 Range 标头,并且 Cloud Storage 不会对此类请求执行校验和验证。

前提条件

前提条件因所使用的工具而异:

控制台

如需使用 Google Cloud 控制台完成本指南,您必须拥有适当的 IAM 权限。如果您要访问以进行流式传输的存储桶所在项目不是您创建的,则可能需要项目所有者为您提供包含必要权限的角色。

如需查看特定操作所需权限的列表,请参阅 Google Cloud 控制台的 IAM 权限

如需查看相关角色的列表,请参阅 Cloud Storage 角色。或者,您也可以创建一个自定义角色,并为其提供具体受限的权限。

命令行

如需使用命令行实用程序完成本指南,您必须拥有适当的 IAM 权限。如果您要访问以进行流式传输的存储桶所在项目不是您创建的,则可能需要项目所有者为您提供具有必要权限的角色。

如需查看特定操作所需权限的列表,请参阅 gsutil 命令的 IAM 权限

如需查看相关角色的列表,请参阅 Cloud Storage 角色。或者,您也可以创建一个自定义角色,并为其提供具体受限的权限。

代码示例

如需使用 Cloud Storage 客户端库完成本指南,您必须拥有适当的 IAM 权限。如果您要访问以进行流式传输的存储桶所在项目不是您创建的,则可能需要项目所有者为您提供具有必要权限的角色。除非另有说明,否则客户端库请求通过 JSON API 发出。

如需查看特定操作所需的权限列表,请参阅 JSON 方法的 IAM 权限

如需查看相关角色的列表,请参阅 Cloud Storage 角色。或者,您也可以创建一个自定义角色,并为其提供具体受限的权限。

REST API

JSON API

如需使用 JSON API 完成本指南,您必须拥有适当的 IAM 权限。如果您要访问以进行流式传输的存储桶所在项目不是您创建的,则可能需要项目所有者为您提供具有必要权限的角色。

如需查看特定操作所需的权限列表,请参阅 JSON 方法的 IAM 权限

如需查看相关角色的列表,请参阅 Cloud Storage 角色。或者,您也可以创建一个自定义角色,并为其提供具体受限的权限。

流式上传

以下示例展示了如何执行从进程到 Cloud Storage 对象的流式上传:

控制台

Google Cloud 控制台不支持流式上传。请改用 gcloud CLI。

命令行

gcloud

  1. 将数据传输到 gcloud storage cp 命令并使用短划线表示源网址:

    PROCESS_NAME | gcloud storage cp - gs://BUCKET_NAME/OBJECT_NAME

    其中:

    • PROCESS_NAME 是您要从中收集数据的进程的名称。例如 collect_measurements
    • BUCKET_NAME 是包含对象的存储分区的名称。例如 my_app_bucket
    • OBJECT_NAME 是使用数据创建的对象的名称。例如 data_measurements

gsutil

  1. 将数据传输到 gsutil cp 命令并使用短划线表示源网址:

    PROCESS_NAME | gsutil cp - gs://BUCKET_NAME/OBJECT_NAME

    其中:

    • PROCESS_NAME 是您要从中收集数据的进程的名称。例如 collect_measurements
    • BUCKET_NAME 是包含对象的存储分区的名称。例如 my_app_bucket
    • OBJECT_NAME 是使用数据创建的对象的名称。例如 data_measurements

代码示例

C++

如需了解详情,请参阅 Cloud Storage C++ API 参考文档

namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
[](gcs::Client client, std::string const& bucket_name,
   std::string const& object_name, int desired_line_count) {
  std::string const text = "Lorem ipsum dolor sit amet";
  gcs::ObjectWriteStream stream =
      client.WriteObject(bucket_name, object_name);

  for (int lineno = 0; lineno != desired_line_count; ++lineno) {
    // Add 1 to the counter, because it is conventional to number lines
    // starting at 1.
    stream << (lineno + 1) << ": " << text << "\n";
  }

  stream.Close();

  StatusOr<gcs::ObjectMetadata> metadata = std::move(stream).metadata();
  if (!metadata) throw std::runtime_error(metadata.status().message());
  std::cout << "Successfully wrote to object " << metadata->name()
            << " its size is: " << metadata->size()
            << "\nFull metadata: " << *metadata << "\n";
}

C#

如需了解详情,请参阅 Cloud Storage C# API 参考文档


using Google.Cloud.Storage.V1;
using System;
using System.IO;

public class UploadFileSample
{
    public void UploadFile(
        string bucketName = "your-unique-bucket-name",
        string localPath = "my-local-path/my-file-name",
        string objectName = "my-file-name")
    {
        var storage = StorageClient.Create();
        using var fileStream = File.OpenRead(localPath);
        storage.UploadObject(bucketName, objectName, null, fileStream);
        Console.WriteLine($"Uploaded {objectName}.");
    }
}

Go

如需了解详情,请参阅 Cloud Storage Go API 参考文档

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// streamFileUpload uploads an object via a stream.
func streamFileUpload(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %v", err)
	}
	defer client.Close()

	b := []byte("Hello world.")
	buf := bytes.NewBuffer(b)

	ctx, cancel := context.WithTimeout(ctx, time.Second*50)
	defer cancel()

	// Upload an object with storage.Writer.
	wc := client.Bucket(bucket).Object(object).NewWriter(ctx)
	wc.ChunkSize = 0 // note retries are not supported for chunk size 0.

	if _, err = io.Copy(wc, buf); err != nil {
		return fmt.Errorf("io.Copy: %v", err)
	}
	// Data can continue to be added to the file until the writer is closed.
	if err := wc.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %v", err)
	}
	fmt.Fprintf(w, "%v uploaded to %v.\n", object, bucket)

	return nil
}

Java

如需了解详情,请参阅 Cloud Storage Java API 参考文档


import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class StreamObjectUpload {

  public static void streamObjectUpload(
      String projectId, String bucketName, String objectName, String contents) throws IOException {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The string of contents you wish to upload
    // String contents = "Hello world!";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
    BlobId blobId = BlobId.of(bucketName, objectName);
    BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
    byte[] content = contents.getBytes(StandardCharsets.UTF_8);
    try (WriteChannel writer = storage.writer(blobInfo)) {
      writer.write(ByteBuffer.wrap(content));
      System.out.println(
          "Wrote to " + objectName + " in bucket " + bucketName + " using a WriteChannel.");
    }
  }
}

Node.js

如需了解详情,请参阅 Cloud Storage Node.js API 参考文档

/**
 * TODO(developer): Uncomment the following lines before running the sample
 */
// The ID of your GCS bucket
// const bucketName = 'your-unique-bucket-name';

// The new ID for your GCS file
// const destFileName = 'your-new-file-name';

// The content to be uploaded in the GCS file
// const contents = 'your file content';

// Imports the Google Cloud client library
const {Storage} = require('@google-cloud/storage');

// Import Node.js stream
const stream = require('stream');

// Creates a client
const storage = new Storage();

// Get a reference to the bucket
const myBucket = storage.bucket(bucketName);

// Create a reference to a file object
const file = myBucket.file(destFileName);

// Create a pass through stream from a string
const passthroughStream = new stream.PassThrough();
passthroughStream.write(contents);
passthroughStream.end();

async function streamFileUpload() {
  passthroughStream.pipe(file.createWriteStream()).on('finish', () => {
    // The file upload is complete
  });

  console.log(`${destFileName} uploaded to ${bucketName}`);
}

streamFileUpload().catch(console.error);

PHP

如需了解详情,请参阅 Cloud Storage PHP API 参考文档

use Google\Cloud\Storage\StorageClient;
use Google\Cloud\Storage\WriteStream;

/**
 * Upload a chunked file stream.
 *
 * @param string $bucketName The name of your Cloud Storage bucket.
 *        (e.g. 'my-bucket')
 * @param string $objectName The name of your Cloud Storage object.
 *        (e.g. 'my-object')
 * @param string $contents The contents to upload via stream chunks.
 *        (e.g. 'these are my contents')
 */
function upload_object_stream(string $bucketName, string $objectName, string $contents): void
{
    $storage = new StorageClient();
    $bucket = $storage->bucket($bucketName);
    $writeStream = new WriteStream(null, [
        'chunkSize' => 1024 * 256, // 256KB
    ]);
    $uploader = $bucket->getStreamableUploader($writeStream, [
        'name' => $objectName,
    ]);
    $writeStream->setUploader($uploader);
    $stream = fopen('data://text/plain,' . $contents, 'r');
    while (($line = stream_get_line($stream, 1024 * 256)) !== false) {
        $writeStream->write($line);
    }
    $writeStream->close();

    printf('Uploaded %s to gs://%s/%s' . PHP_EOL, $contents, $bucketName, $objectName);
}

Python

如需了解详情,请参阅 Cloud Storage Python API 参考文档

from google.cloud import storage

def upload_blob_from_stream(bucket_name, file_obj, destination_blob_name):
    """Uploads bytes from a stream or other file-like object to a blob."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The stream or file (file-like object) from which to read
    # import io
    # file_obj = io.BytesIO()
    # file_obj.write(b"This is test data.")

    # The desired name of the uploaded GCS object (blob)
    # destination_blob_name = "storage-object-name"

    # Construct a client-side representation of the blob.
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Rewind the stream to the beginning. This step can be omitted if the input
    # stream will always be at a correct position.
    file_obj.seek(0)

    # Upload data from the stream to your bucket.
    blob.upload_from_file(file_obj)

    print(
        f"Stream data uploaded to {destination_blob_name} in bucket {bucket_name}."
    )

Ruby

如需了解详情,请参阅 Cloud Storage Ruby API 参考文档


# The ID of your GCS bucket
# bucket_name = "your-unique-bucket-name"

# The stream or file (file-like object) from which to read
# local_file_obj = StringIO.new "This is test data."

# Name of a file in the Storage bucket
# file_name   = "some_file.txt"

require "google/cloud/storage"

storage = Google::Cloud::Storage.new
bucket  = storage.bucket bucket_name

local_file_obj.rewind
bucket.create_file local_file_obj, file_name

puts "Stream data uploaded to #{file_name} in bucket #{bucket_name}"

REST API

JSON API

如需执行流式上传,请按照执行可续传上传的说明操作,并注意以下几点:

  • 上传文件数据本身时,请使用多个数据块上传

  • 由于您在上传最终数据块之前不知道文件的总大小,因此请在中间数据块的 Content-Range 标头中使用 * 来表示文件的总大小。

    例如,如果您上传的第一个数据块的大小为 512 KiB,则该数据块的 Content-Range 标头为 bytes 0-524287/*。如果上传第一个数据块后剩余 64000 字节,则您随后会发送一个包含剩余字节并且 Content-Range 标头值为 bytes 524288-588287/588288 的最终数据块。

XML API

如需执行流式上传,请使用以下方法之一:

  • XML API 分段上传

  • 可续传上传,但进行以下调整:

    • 上传文件数据本身时,请使用多个数据块上传

    • 由于您在上传最终数据块之前不知道文件的总大小,因此请在中间数据块的 Content-Range 标头中使用 * 来表示文件的总大小。

      例如,如果您上传的第一个数据块的大小为 512 KiB,则该数据块的 Content-Range 标头为 bytes 0-524287/*。如果上传第一个数据块后剩余 64000 字节,则您随后会发送一个包含剩余字节并且 Content-Range 标头值为 bytes 524288-588287/588288 的最终数据块。

流式下载

以下示例展示了如何执行从 Cloud Storage 对象到进程的下载:

控制台

Google Cloud 控制台不支持流式下载。请改用 gcloud CLI。

命令行

gcloud

  1. 运行 gcloud storage cp 命令(注意使用短划线表示目标网址),然后将数据传输到该进程:

    gcloud storage cp gs://BUCKET_NAME/OBJECT_NAME - | PROCESS_NAME

    其中:

    • BUCKET_NAME 是包含对象的存储分区的名称。例如 my_app_bucket
    • OBJECT_NAME 是您要流式传输到进程的对象的名称。例如 data_measurements
    • PROCESS_NAME 是您要向其输送数据的进程的名称。例如 analyze_data

您还可以将数据从 Cloud Storage 对象流式传输到标准 Linux 命令,比如 sort

gcloud storage cp gs://my_app_bucket/data_measurements - | sort

gsutil

  1. 运行 gsutil cp 命令(注意使用短划线表示目标网址),然后将数据传输到该进程:

    gsutil cp gs://BUCKET_NAME/OBJECT_NAME - | PROCESS_NAME

    其中:

    • BUCKET_NAME 是包含对象的存储分区的名称。例如 my_app_bucket
    • OBJECT_NAME 是您要流式传输到进程的对象的名称。例如 data_measurements
    • PROCESS_NAME 是您要向其输送数据的进程的名称。例如 analyze_data

您还可以将数据从 Cloud Storage 对象流式传输到标准 Linux 命令,比如 sort

gsutil cp gs://my_app_bucket/data_measurements - | sort

代码示例

C++

如需了解详情,请参阅 Cloud Storage C++ API 参考文档

namespace gcs = ::google::cloud::storage;
[](gcs::Client client, std::string const& bucket_name,
   std::string const& object_name) {
  gcs::ObjectReadStream stream = client.ReadObject(bucket_name, object_name);

  int count = 0;
  std::string line;
  while (std::getline(stream, line, '\n')) {
    ++count;
  }

  std::cout << "The object has " << count << " lines\n";
}

C#

如需了解详情,请参阅 Cloud Storage C# API 参考文档


using Google.Cloud.Storage.V1;
using System;
using System.IO;

public class DownloadFileSample
{
    public void DownloadFile(
        string bucketName = "your-unique-bucket-name",
        string objectName = "my-file-name",
        string localPath = "my-local-path/my-file-name")
    {
        var storage = StorageClient.Create();
        using var outputFile = File.OpenWrite(localPath);
        storage.DownloadObject(bucketName, objectName, outputFile);
        Console.WriteLine($"Downloaded {objectName} to {localPath}.");
    }
}

Go

如需了解详情,请参阅 Cloud Storage Go API 参考文档


import (
	"context"
	"fmt"
	"io"
	"io/ioutil"
	"time"

	"cloud.google.com/go/storage"
)

// downloadFileIntoMemory downloads an object.
func downloadFileIntoMemory(w io.Writer, bucket, object string) ([]byte, error) {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storage.NewClient: %v", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*50)
	defer cancel()

	rc, err := client.Bucket(bucket).Object(object).NewReader(ctx)
	if err != nil {
		return nil, fmt.Errorf("Object(%q).NewReader: %v", object, err)
	}
	defer rc.Close()

	data, err := ioutil.ReadAll(rc)
	if err != nil {
		return nil, fmt.Errorf("ioutil.ReadAll: %v", err)
	}
	fmt.Fprintf(w, "Blob %v downloaded.\n", object)
	return data, nil
}

Java

如需了解详情,请参阅 Cloud Storage Java API 参考文档


import com.google.cloud.ReadChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class StreamObjectDownload {

  public static void streamObjectDownload(
      String projectId, String bucketName, String objectName, String targetFile)
      throws IOException {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The path to the file to download the object to
    // String targetFile = "path/to/your/file";
    Path targetFilePath = Paths.get(targetFile);

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
    try (ReadChannel reader = storage.reader(BlobId.of(bucketName, objectName));
        FileChannel targetFileChannel =
            FileChannel.open(targetFilePath, StandardOpenOption.WRITE)) {

      ByteStreams.copy(reader, targetFileChannel);

      System.out.println(
          "Downloaded object "
              + objectName
              + " from bucket "
              + bucketName
              + " to "
              + targetFile
              + " using a ReadChannel.");
    }
  }
}

Node.js

如需了解详情,请参阅 Cloud Storage Node.js API 参考文档

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of your GCS bucket
// const bucketName = 'your-unique-bucket-name';

// The ID of your GCS file
// const fileName = 'your-file-name';

// The filename and file path where you want to download the file
// const destFileName = '/local/path/to/file.txt';

// Imports the Google Cloud client library
const {Storage} = require('@google-cloud/storage');

// Creates a client
const storage = new Storage();

async function streamFileDownload() {
  // The example below demonstrates how we can reference a remote file, then
  // pipe its contents to a local file.
  // Once the stream is created, the data can be piped anywhere (process, sdout, etc)
  await storage
    .bucket(bucketName)
    .file(fileName)
    .createReadStream() //stream is created
    .pipe(fs.createWriteStream(destFileName))
    .on('finish', () => {
      // The file download is complete
    });

  console.log(
    `gs://${bucketName}/${fileName} downloaded to ${destFileName}.`
  );
}

streamFileDownload().catch(console.error);

PHP

如需了解详情,请参阅 Cloud Storage PHP API 参考文档

use Google\Cloud\Storage\StorageClient;

/**
 * Download an object from Cloud Storage and save it as a local file.
 *
 * @param string $bucketName The name of your Cloud Storage bucket.
 *        (e.g. 'my-bucket')
 * @param string $objectName The name of your Cloud Storage object.
 *        (e.g. 'my-object')
 * @param string $destination The local destination to save the object.
 *        (e.g. '/path/to/your/file')
 */
function download_object(string $bucketName, string $objectName, string $destination): void
{
    $storage = new StorageClient();
    $bucket = $storage->bucket($bucketName);
    $object = $bucket->object($objectName);
    $object->downloadToFile($destination);
    printf(
        'Downloaded gs://%s/%s to %s' . PHP_EOL,
        $bucketName,
        $objectName,
        basename($destination)
    );
}

Python

如需了解详情,请参阅 Cloud Storage Python API 参考文档

from google.cloud import storage

def download_blob_to_stream(bucket_name, source_blob_name, file_obj):
    """Downloads a blob to a stream or other file-like object."""

    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object (blob)
    # source_blob_name = "storage-object-name"

    # The stream or file (file-like object) to which the blob will be written
    # import io
    # file_obj = io.BytesIO()

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client-side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` in that it doesn't
    # retrieve metadata from Google Cloud Storage. As we don't use metadata in
    # this example, using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_file(file_obj)

    print(f"Downloaded blob {source_blob_name} to file-like object.")

    return file_obj
    # Before reading from file_obj, remember to rewind with file_obj.seek(0).

Ruby

如需了解详情,请参阅 Cloud Storage Ruby API 参考文档

# Downloads a blob to a stream or other file-like object.

# The ID of your GCS bucket
# bucket_name = "your-unique-bucket-name"

# Name of a file in the Storage bucket
# file_name   = "some_file.txt"

# The stream or file (file-like object) to which the contents will be written
# local_file_obj = StringIO.new

require "google/cloud/storage"

storage = Google::Cloud::Storage.new
bucket  = storage.bucket bucket_name
file    = bucket.file file_name

file.download local_file_obj, verify: :none

# rewind the object before starting to read the downloaded contents
local_file_obj.rewind
puts "The full downloaded file contents are: #{local_file_obj.read.inspect}"

REST API

JSON API

如需执行流式下载,请按照下载对象的说明操作,并注意以下几点:

  • 在开始下载之前,请检索对象的元数据并保存对象的世代编号。在您的每个请求中加入此世代编号,以确保在原始版本被覆盖时,您不会下载两个不同世代中的数据。

  • 在请求中使用 Range 标头来检索整个对象的一部分,您可以将该对象发送到所需的本地进程。

  • 继续发出对象的后续部分请求,直到检索到整个对象为止。

XML API

如需执行流式下载,请按照下载对象的说明操作,并注意以下几点:

  • 在开始下载之前,请检索对象的元数据并保存对象的世代编号。在您的每个请求中加入此世代编号,以确保在原始版本被覆盖时,您不会下载两个不同世代中的数据。

  • 在请求中使用 Range 标头来检索整个对象的一部分,您可以将该对象发送到所需的本地进程。

  • 继续发出对象的后续部分请求,直到检索到整个对象为止。

后续步骤