ストリーミング アップロード

Cloud Storage では、データをファイルに保存することなく、データをバケットにストリーミングできます。これは、プロセスからアップロード データを生成する場合や、状況に応じてオブジェクトを圧縮する場合など、アップロードの開始時に最終的なサイズがわからないデータをアップロードする場合に役に立ちます。

ストリーミング時にチェックサム検証を使用する

チェックサムはアップロードの最初のリクエストでしか提供されないため、ストリーミング時に Cloud Storage のチェックサム検証を利用できないことも少なくありません。チェックサム検証は常に使用することをおすすめします。ストリーミング アップロードの完了後に手動で行うこともできますが、転送の完了後に検証を行った場合、破損を確認して削除する前に、破損したデータがアクセスされる可能性があります。

アップロードが完了してデータがアクセス可能になる前にチェックサムを検証する必要がある場合は、ストリーミング アップロードを使用しないでください。オブジェクトの最終処理の前にチェックサム検証を行う別のアップロード オプションを使用する必要があります。

必要なロール

アップロードのストリーミングに必要な権限を取得するには、次のいずれかのロールの付与を管理者に依頼します。

  • オブジェクト保持ロックを含むアップロードの場合は、バケットに対する Storage オブジェクト管理者(roles/storage.objectAdmin)IAM ロールの付与を管理者に依頼します。

  • それ以外の場合は、バケットに対するストレージ オブジェクト ユーザー(roles/storage.objectUser)の IAM ロールの付与を管理者に依頼します。

これらの事前定義ロールには、Cloud Storage へのアップロードのストリーミングに必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

  • storage.objects.create
  • storage.objects.delete
    • この権限は、既存のオブジェクトを上書きするアップロードにのみ必要です。
  • storage.objects.list
    • この権限は、ここで説明する操作を Google Cloud CLI で行う場合にのみ必要です。
  • storage.objects.setRetention
    • この権限は、オブジェクト保持ロックを含むアップロードにのみ必要です。

これらの権限は、他の事前定義ロールカスタムロールを使用して取得することもできます。

バケットに対するロールの付与については、バケットで IAM を使用するをご覧ください。

ストリーミング アップロード

次の例は、プロセスから Cloud Storage オブジェクトに対してストリーミング アップロードを行う方法を示しています。

コンソール

Google Cloud コンソールでは、ストリーミング アップロードはサポートされていません。代わりに gcloud CLI を使用してください。

コマンドライン

  1. データを gcloud storage cp コマンドに連結し、ソース URL に対してダッシュを使用します。

    PROCESS_NAME | gcloud storage cp - gs://BUCKET_NAME/OBJECT_NAME

    ここで

    • PROCESS_NAME は、データを収集するプロセスの名前です。例: collect_measurements
    • BUCKET_NAME は、オブジェクトを含むバケットの名前です。例: my_app_bucket
    • OBJECT_NAME は、データから作成されるオブジェクトの名前です。例: data_measurements

クライアント ライブラリ

C++

詳細については、Cloud Storage C++ API のリファレンス ドキュメントをご覧ください。

Cloud Storage に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
[](gcs::Client client, std::string const& bucket_name,
   std::string const& object_name, int desired_line_count) {
  std::string const text = "Lorem ipsum dolor sit amet";
  gcs::ObjectWriteStream stream =
      client.WriteObject(bucket_name, object_name);

  for (int lineno = 0; lineno != desired_line_count; ++lineno) {
    // Add 1 to the counter, because it is conventional to number lines
    // starting at 1.
    stream << (lineno + 1) << ": " << text << "\n";
  }

  stream.Close();

  StatusOr<gcs::ObjectMetadata> metadata = std::move(stream).metadata();
  if (!metadata) throw std::move(metadata).status();
  std::cout << "Successfully wrote to object " << metadata->name()
            << " its size is: " << metadata->size()
            << "\nFull metadata: " << *metadata << "\n";
}

C#

詳細については、Cloud Storage C# API のリファレンス ドキュメントをご覧ください。

Cloud Storage に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


using Google.Cloud.Storage.V1;
using System;
using System.IO;

public class UploadFileSample
{
    public void UploadFile(
        string bucketName = "your-unique-bucket-name",
        string localPath = "my-local-path/my-file-name",
        string objectName = "my-file-name")
    {
        var storage = StorageClient.Create();
        using var fileStream = File.OpenRead(localPath);
        storage.UploadObject(bucketName, objectName, null, fileStream);
        Console.WriteLine($"Uploaded {objectName}.");
    }
}

Go

詳細については、Cloud Storage Go API のリファレンス ドキュメントをご覧ください。

Cloud Storage に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// streamFileUpload uploads an object via a stream.
func streamFileUpload(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	b := []byte("Hello world.")
	buf := bytes.NewBuffer(b)

	ctx, cancel := context.WithTimeout(ctx, time.Second*50)
	defer cancel()

	// Upload an object with storage.Writer.
	wc := client.Bucket(bucket).Object(object).NewWriter(ctx)
	wc.ChunkSize = 0 // note retries are not supported for chunk size 0.

	if _, err = io.Copy(wc, buf); err != nil {
		return fmt.Errorf("io.Copy: %w", err)
	}
	// Data can continue to be added to the file until the writer is closed.
	if err := wc.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "%v uploaded to %v.\n", object, bucket)

	return nil
}

Java

詳細については、Cloud Storage Java API のリファレンス ドキュメントをご覧ください。

Cloud Storage に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class StreamObjectUpload {

  public static void streamObjectUpload(
      String projectId, String bucketName, String objectName, String contents) throws IOException {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The string of contents you wish to upload
    // String contents = "Hello world!";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
    BlobId blobId = BlobId.of(bucketName, objectName);
    BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
    byte[] content = contents.getBytes(StandardCharsets.UTF_8);
    try (WriteChannel writer = storage.writer(blobInfo)) {
      writer.write(ByteBuffer.wrap(content));
      System.out.println(
          "Wrote to " + objectName + " in bucket " + bucketName + " using a WriteChannel.");
    }
  }
}

Node.js

詳細については、Cloud Storage Node.js API のリファレンス ドキュメントをご覧ください。

Cloud Storage に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

/**
 * TODO(developer): Uncomment the following lines before running the sample
 */
// The ID of your GCS bucket
// const bucketName = 'your-unique-bucket-name';

// The new ID for your GCS file
// const destFileName = 'your-new-file-name';

// The content to be uploaded in the GCS file
// const contents = 'your file content';

// Imports the Google Cloud client library
const {Storage} = require('@google-cloud/storage');

// Import Node.js stream
const stream = require('stream');

// Creates a client
const storage = new Storage();

// Get a reference to the bucket
const myBucket = storage.bucket(bucketName);

// Create a reference to a file object
const file = myBucket.file(destFileName);

// Create a pass through stream from a string
const passthroughStream = new stream.PassThrough();
passthroughStream.write(contents);
passthroughStream.end();

async function streamFileUpload() {
  passthroughStream.pipe(file.createWriteStream()).on('finish', () => {
    // The file upload is complete
  });

  console.log(`${destFileName} uploaded to ${bucketName}`);
}

streamFileUpload().catch(console.error);

PHP

詳細については、Cloud Storage PHP API のリファレンス ドキュメントをご覧ください。

Cloud Storage に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

use Google\Cloud\Storage\StorageClient;
use Google\Cloud\Storage\WriteStream;

/**
 * Upload a chunked file stream.
 *
 * @param string $bucketName The name of your Cloud Storage bucket.
 *        (e.g. 'my-bucket')
 * @param string $objectName The name of your Cloud Storage object.
 *        (e.g. 'my-object')
 * @param string $contents The contents to upload via stream chunks.
 *        (e.g. 'these are my contents')
 */
function upload_object_stream(string $bucketName, string $objectName, string $contents): void
{
    $storage = new StorageClient();
    $bucket = $storage->bucket($bucketName);
    $writeStream = new WriteStream(null, [
        'chunkSize' => 1024 * 256, // 256KB
    ]);
    $uploader = $bucket->getStreamableUploader($writeStream, [
        'name' => $objectName,
    ]);
    $writeStream->setUploader($uploader);
    $stream = fopen('data://text/plain,' . $contents, 'r');
    while (($line = stream_get_line($stream, 1024 * 256)) !== false) {
        $writeStream->write($line);
    }
    $writeStream->close();

    printf('Uploaded %s to gs://%s/%s' . PHP_EOL, $contents, $bucketName, $objectName);
}

Python

詳細については、Cloud Storage Python API のリファレンス ドキュメントをご覧ください。

Cloud Storage に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

from google.cloud import storage


def upload_blob_from_stream(bucket_name, file_obj, destination_blob_name):
    """Uploads bytes from a stream or other file-like object to a blob."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The stream or file (file-like object) from which to read
    # import io
    # file_obj = io.BytesIO()
    # file_obj.write(b"This is test data.")

    # The desired name of the uploaded GCS object (blob)
    # destination_blob_name = "storage-object-name"

    # Construct a client-side representation of the blob.
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Rewind the stream to the beginning. This step can be omitted if the input
    # stream will always be at a correct position.
    file_obj.seek(0)

    # Upload data from the stream to your bucket.
    blob.upload_from_file(file_obj)

    print(
        f"Stream data uploaded to {destination_blob_name} in bucket {bucket_name}."
    )

Ruby

詳細については、Cloud Storage Ruby API のリファレンス ドキュメントをご覧ください。

Cloud Storage に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


# The ID of your GCS bucket
# bucket_name = "your-unique-bucket-name"

# The stream or file (file-like object) from which to read
# local_file_obj = StringIO.new "This is test data."

# Name of a file in the Storage bucket
# file_name   = "some_file.txt"

require "google/cloud/storage"

storage = Google::Cloud::Storage.new
bucket  = storage.bucket bucket_name

local_file_obj.rewind
bucket.create_file local_file_obj, file_name

puts "Stream data uploaded to #{file_name} in bucket #{bucket_name}"

REST API

JSON API

ストリーミング アップロードを実行するには、次のいずれかの方法を使用します。

  • 次の調整を行った再開可能なアップロード

    • ファイルデータ自体をアップロードする場合は、複数のチャンクのアップロードを使用します。

    • 最終チャンクになるまで合計のファイルサイズがわからないため、中間チャンクの Content-Range ヘッダーの合計ファイルサイズに * を使用します。

      たとえば、最初にアップロードしたチャンクのサイズが 512 KiB の場合、チャンクの Content-Range ヘッダーは bytes 0-524287/* になります。最初のチャンクのアップロード後に 64,000 バイト残っている場合は、残りのバイトを含む最終チャンクを送信します。Content-Range ヘッダーには bytes 524288-588287/588288 という値が設定されます。

  • 次の調整を行った単一リクエストのアップロード:

XML API

ストリーミング アップロードを実行するには、次のいずれかの方法を使用します。

  • XML API マルチパート アップロード

  • 次の調整を行った再開可能なアップロード:

    • ファイルデータ自体をアップロードする場合は、複数のチャンクのアップロードを使用します。

    • 最終チャンクになるまで合計のファイルサイズがわからないため、中間チャンクの Content-Range ヘッダーの合計ファイルサイズに * を使用します。

      たとえば、最初にアップロードしたチャンクのサイズが 512 KiB の場合、チャンクの Content-Range ヘッダーは bytes 0-524287/* になります。最初のチャンクのアップロード後に 64,000 バイト残っている場合は、残りのバイトを含む最終チャンクを送信します。Content-Range ヘッダーには bytes 524288-588287/588288 という値が設定されます。

  • 次の調整を行った単一リクエストのアップロード:

    • Transfer-Encoding: chunked ヘッダーを含め、Content-Length ヘッダーを除外します。

    • 仕様に従ってリクエストを作成し、オブジェクト データが利用可能になったらチャンクで送信します。

    • リクエストで Authorization ヘッダーに署名が使用されている場合、ストリーミング アップロードを実行できません。

次のステップ