将视频注释流式传输到存储空间

通过 Video Intelligence API,您能够指定用于存储流式视频注释结果的 Cloud Storage 存储分区。

以下代码示例演示了如何配置您的视频注释请求,以便将结果存储在 Cloud Storage 中。

Java

如需向 Video Intelligence 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.api.gax.rpc.BidiStream;
import com.google.cloud.videointelligence.v1p3beta1.StreamingAnnotateVideoRequest;
import com.google.cloud.videointelligence.v1p3beta1.StreamingAnnotateVideoResponse;
import com.google.cloud.videointelligence.v1p3beta1.StreamingFeature;
import com.google.cloud.videointelligence.v1p3beta1.StreamingLabelDetectionConfig;
import com.google.cloud.videointelligence.v1p3beta1.StreamingStorageConfig;
import com.google.cloud.videointelligence.v1p3beta1.StreamingVideoConfig;
import com.google.cloud.videointelligence.v1p3beta1.StreamingVideoIntelligenceServiceClient;
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StreamingAnnotationToStorage {

  // Perform streaming video detection for explicit content
  static void streamingAnnotationToStorage(String filePath, String gcsUri)
      throws IOException, TimeoutException, StatusRuntimeException {
    // String filePath = "path_to_your_video_file";
    // String gcsUri = "gs://BUCKET_ID";

    try (StreamingVideoIntelligenceServiceClient client =
        StreamingVideoIntelligenceServiceClient.create()) {

      Path path = Paths.get(filePath);
      byte[] data = Files.readAllBytes(path);
      // Set the chunk size to 5MB (recommended less than 10MB).
      int chunkSize = 5 * 1024 * 1024;
      int numChunks = (int) Math.ceil((double) data.length / chunkSize);

      StreamingStorageConfig streamingStorageConfig =
          StreamingStorageConfig.newBuilder()
              .setEnableStorageAnnotationResult(true)
              .setAnnotationResultStorageDirectory(gcsUri)
              .build();

      StreamingLabelDetectionConfig labelConfig =
          StreamingLabelDetectionConfig.newBuilder().setStationaryCamera(false).build();

      StreamingVideoConfig streamingVideoConfig =
          StreamingVideoConfig.newBuilder()
              .setFeature(StreamingFeature.STREAMING_LABEL_DETECTION)
              .setLabelDetectionConfig(labelConfig)
              .setStorageConfig(streamingStorageConfig)
              .build();

      BidiStream<StreamingAnnotateVideoRequest, StreamingAnnotateVideoResponse> call =
          client.streamingAnnotateVideoCallable().call();

      // The first request must **only** contain the audio configuration:
      call.send(
          StreamingAnnotateVideoRequest.newBuilder().setVideoConfig(streamingVideoConfig).build());

      // Subsequent requests must **only** contain the audio data.
      // Send the requests in chunks
      for (int i = 0; i < numChunks; i++) {
        call.send(
            StreamingAnnotateVideoRequest.newBuilder()
                .setInputContent(
                    ByteString.copyFrom(
                        Arrays.copyOfRange(data, i * chunkSize, i * chunkSize + chunkSize)))
                .build());
      }

      // Tell the service you are done sending data
      call.closeSend();

      for (StreamingAnnotateVideoResponse response : call) {
        System.out.format("Storage Uri: %s\n", response.getAnnotationResultsUri());
      }
    }
  }
}

Node.js

如需向 Video Intelligence 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const path = 'Local file to analyze, e.g. ./my-file.mp4';
// const outputUri = 'Path to output, e.g. gs://path_to_output';

const {StreamingVideoIntelligenceServiceClient} =
  require('@google-cloud/video-intelligence').v1p3beta1;
const fs = require('fs');

// Instantiates a client
const client = new StreamingVideoIntelligenceServiceClient();
// Streaming configuration
const configRequest = {
  videoConfig: {
    feature: 'STREAMING_LABEL_DETECTION',
    storageConfig: {
      enableStorageAnnotationResult: true,
      annotationResultStorageDirectory: outputUri,
    },
  },
};
const readStream = fs.createReadStream(path, {
  highWaterMark: 5 * 1024 * 1024, //chunk size set to 5MB (recommended less than 10MB)
  encoding: 'base64',
});
//Load file content
const chunks = [];
readStream
  .on('data', chunk => {
    const request = {
      inputContent: chunk.toString(),
    };
    chunks.push(request);
  })
  .on('close', () => {
    // configRequest should be the first in the stream of requests
    stream.write(configRequest);
    for (let i = 0; i < chunks.length; i++) {
      stream.write(chunks[i]);
    }
    stream.end();
  });

const stream = client.streamingAnnotateVideo().on('data', response => {
  //Gets annotations for video
  console.log(
    `The annotation is stored at: ${response.annotationResultsUri} `
  );
});

Python

如需向 Video Intelligence 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import videointelligence_v1p3beta1 as videointelligence

# path = 'path_to_file'
# output_uri = 'gs://path_to_output'

client = videointelligence.StreamingVideoIntelligenceServiceClient()

# Set streaming config specifying the output_uri.
# The output_uri is the prefix of the actual output files.
storage_config = videointelligence.StreamingStorageConfig(
    enable_storage_annotation_result=True,
    annotation_result_storage_directory=output_uri,
)
# Here we use label detection as an example.
# All features support output to GCS.
config = videointelligence.StreamingVideoConfig(
    feature=(videointelligence.StreamingFeature.STREAMING_LABEL_DETECTION),
    storage_config=storage_config,
)

# config_request should be the first in the stream of requests.
config_request = videointelligence.StreamingAnnotateVideoRequest(
    video_config=config
)

# Set the chunk size to 5MB (recommended less than 10MB).
chunk_size = 5 * 1024 * 1024

# Load file content.
stream = []
with io.open(path, "rb") as video_file:
    while True:
        data = video_file.read(chunk_size)
        if not data:
            break
        stream.append(data)

def stream_generator():
    yield config_request
    for chunk in stream:
        yield videointelligence.StreamingAnnotateVideoRequest(input_content=chunk)

requests = stream_generator()

# streaming_annotate_video returns a generator.
# The default timeout is about 300 seconds.
# To process longer videos it should be set to
# larger than the length (in seconds) of the stream.
responses = client.streaming_annotate_video(requests, timeout=600)

for response in responses:
    # Check for errors.
    if response.error.message:
        print(response.error.message)
        break

    print("Storage URI: {}".format(response.annotation_results_uri))