Stream video annotation to storage

The Video Intelligence API enables you to specify a Cloud Storage bucket that will hold the results of your streaming video annotation.

The following code sample demonstrates how to configure your video annotation request to store the result in Cloud Storage.

Java

To authenticate to Video Intelligence, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


import com.google.api.gax.rpc.BidiStream;
import com.google.cloud.videointelligence.v1p3beta1.StreamingAnnotateVideoRequest;
import com.google.cloud.videointelligence.v1p3beta1.StreamingAnnotateVideoResponse;
import com.google.cloud.videointelligence.v1p3beta1.StreamingFeature;
import com.google.cloud.videointelligence.v1p3beta1.StreamingLabelDetectionConfig;
import com.google.cloud.videointelligence.v1p3beta1.StreamingStorageConfig;
import com.google.cloud.videointelligence.v1p3beta1.StreamingVideoConfig;
import com.google.cloud.videointelligence.v1p3beta1.StreamingVideoIntelligenceServiceClient;
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StreamingAnnotationToStorage {

  // Perform streaming video detection for explicit content
  static void streamingAnnotationToStorage(String filePath, String gcsUri)
      throws IOException, TimeoutException, StatusRuntimeException {
    // String filePath = "path_to_your_video_file";
    // String gcsUri = "gs://BUCKET_ID";

    try (StreamingVideoIntelligenceServiceClient client =
        StreamingVideoIntelligenceServiceClient.create()) {

      Path path = Paths.get(filePath);
      byte[] data = Files.readAllBytes(path);
      // Set the chunk size to 5MB (recommended less than 10MB).
      int chunkSize = 5 * 1024 * 1024;
      int numChunks = (int) Math.ceil((double) data.length / chunkSize);

      StreamingStorageConfig streamingStorageConfig =
          StreamingStorageConfig.newBuilder()
              .setEnableStorageAnnotationResult(true)
              .setAnnotationResultStorageDirectory(gcsUri)
              .build();

      StreamingLabelDetectionConfig labelConfig =
          StreamingLabelDetectionConfig.newBuilder().setStationaryCamera(false).build();

      StreamingVideoConfig streamingVideoConfig =
          StreamingVideoConfig.newBuilder()
              .setFeature(StreamingFeature.STREAMING_LABEL_DETECTION)
              .setLabelDetectionConfig(labelConfig)
              .setStorageConfig(streamingStorageConfig)
              .build();

      BidiStream<StreamingAnnotateVideoRequest, StreamingAnnotateVideoResponse> call =
          client.streamingAnnotateVideoCallable().call();

      // The first request must **only** contain the audio configuration:
      call.send(
          StreamingAnnotateVideoRequest.newBuilder().setVideoConfig(streamingVideoConfig).build());

      // Subsequent requests must **only** contain the audio data.
      // Send the requests in chunks
      for (int i = 0; i < numChunks; i++) {
        call.send(
            StreamingAnnotateVideoRequest.newBuilder()
                .setInputContent(
                    ByteString.copyFrom(
                        Arrays.copyOfRange(data, i * chunkSize, i * chunkSize + chunkSize)))
                .build());
      }

      // Tell the service you are done sending data
      call.closeSend();

      for (StreamingAnnotateVideoResponse response : call) {
        System.out.format("Storage Uri: %s\n", response.getAnnotationResultsUri());
      }
    }
  }
}

Node.js

To authenticate to Video Intelligence, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const path = 'Local file to analyze, e.g. ./my-file.mp4';
// const outputUri = 'Path to output, e.g. gs://path_to_output';

const {StreamingVideoIntelligenceServiceClient} =
  require('@google-cloud/video-intelligence').v1p3beta1;
const fs = require('fs');

// Instantiates a client
const client = new StreamingVideoIntelligenceServiceClient();
// Streaming configuration
const configRequest = {
  videoConfig: {
    feature: 'STREAMING_LABEL_DETECTION',
    storageConfig: {
      enableStorageAnnotationResult: true,
      annotationResultStorageDirectory: outputUri,
    },
  },
};
const readStream = fs.createReadStream(path, {
  highWaterMark: 5 * 1024 * 1024, //chunk size set to 5MB (recommended less than 10MB)
  encoding: 'base64',
});
//Load file content
const chunks = [];
readStream
  .on('data', chunk => {
    const request = {
      inputContent: chunk.toString(),
    };
    chunks.push(request);
  })
  .on('close', () => {
    // configRequest should be the first in the stream of requests
    stream.write(configRequest);
    for (let i = 0; i < chunks.length; i++) {
      stream.write(chunks[i]);
    }
    stream.end();
  });

const stream = client.streamingAnnotateVideo().on('data', response => {
  //Gets annotations for video
  console.log(
    `The annotation is stored at: ${response.annotationResultsUri} `
  );
});

Python

To authenticate to Video Intelligence, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

from google.cloud import videointelligence_v1p3beta1 as videointelligence

# path = 'path_to_file'
# output_uri = 'gs://path_to_output'

client = videointelligence.StreamingVideoIntelligenceServiceClient()

# Set streaming config specifying the output_uri.
# The output_uri is the prefix of the actual output files.
storage_config = videointelligence.StreamingStorageConfig(
    enable_storage_annotation_result=True,
    annotation_result_storage_directory=output_uri,
)
# Here we use label detection as an example.
# All features support output to GCS.
config = videointelligence.StreamingVideoConfig(
    feature=(videointelligence.StreamingFeature.STREAMING_LABEL_DETECTION),
    storage_config=storage_config,
)

# config_request should be the first in the stream of requests.
config_request = videointelligence.StreamingAnnotateVideoRequest(
    video_config=config
)

# Set the chunk size to 5MB (recommended less than 10MB).
chunk_size = 5 * 1024 * 1024

# Load file content.
stream = []
with io.open(path, "rb") as video_file:
    while True:
        data = video_file.read(chunk_size)
        if not data:
            break
        stream.append(data)

def stream_generator():
    yield config_request
    for chunk in stream:
        yield videointelligence.StreamingAnnotateVideoRequest(input_content=chunk)

requests = stream_generator()

# streaming_annotate_video returns a generator.
# The default timeout is about 300 seconds.
# To process longer videos it should be set to
# larger than the length (in seconds) of the stream.
responses = client.streaming_annotate_video(requests, timeout=600)

for response in responses:
    # Check for errors.
    if response.error.message:
        print(response.error.message)
        break

    print("Storage URI: {}".format(response.annotation_results_uri))