CCAI 스크립트 작성

CCAI 스크립트를 사용하면 스트리밍 오디오 데이터를 실시간으로 스크립트 텍스트로 변환할 수 있습니다. Agent Assist는 텍스트를 기반으로 추천을 제공하므로 오디오 데이터를 사용하려면 먼저 변환해야 합니다. CCAI Insights에서 스크립트가 생성된 스트리밍 오디오를 사용하여 상담사 대화에 관한 실시간 데이터 (예: 주제 모델링)를 수집할 수도 있습니다.

CCAI와 함께 사용할 스트리밍 오디오를 스크립트로 변환하는 방법에는 두 가지가 있습니다. SIPREC 기능을 사용하거나 오디오 데이터를 페이로드로 사용하여 gRPC를 호출하는 것입니다. 이 페이지에서는 gRPC 호출을 사용하여 스트리밍 오디오 데이터를 텍스트로 변환하는 프로세스를 설명합니다.

CCAI 스크립트는 Speech-to-Text 스트리밍 음성 인식을 사용하여 작동합니다. Speech-to-Text는 표준 및 고급 등 여러 인식 모델을 제공합니다. CCAI 스크립트는 고급 전화 통화 모델과 함께 사용되는 경우에만 GA 수준에서 지원됩니다.

기본 요건

대화 프로필 만들기

대화 프로필을 만들려면 Agent Assist 콘솔을 사용하거나 ConversationProfile 리소스에서 create 메서드를 직접 호출합니다.

CCAI 스크립트의 경우 대화에서 오디오 데이터를 전송할 때 ConversationProfile.stt_config를 기본 InputAudioConfig로 구성하는 것이 좋습니다.

대화 런타임에 스크립트 가져오기

대화 런타임에 스크립트를 가져오려면 대화의 참여자를 만들고 각 참여자의 오디오 데이터를 전송해야 합니다.

참여자 만들기

참여자에는 세 가지 유형이 있습니다. 역할에 관한 자세한 내용은 참조 문서를 참고하세요. participant에서 create 메서드를 호출하고 role를 지정합니다. END_USER 또는 HUMAN_AGENT 참여자만 스크립트를 가져오는 데 필요한 StreamingAnalyzeContent를 호출할 수 있습니다.

오디오 데이터 전송 및 스크립트 가져오기

StreamingAnalyzeContent를 사용하여 다음 매개변수를 사용하여 참여자의 오디오를 Google에 전송하고 스크립트를 가져올 수 있습니다.

  • 스트림의 첫 번째 요청은 InputAudioConfig여야 합니다. 여기에서 구성된 필드는 ConversationProfile.stt_config의 해당 설정을 재정의합니다. 두 번째 요청이 있을 때까지 오디오 입력을 전송하지 마세요.

    • audioEncodingAUDIO_ENCODING_LINEAR_16 또는 AUDIO_ENCODING_MULAW로 설정해야 합니다.
    • model: 오디오를 텍스트로 변환하는 데 사용할 Speech-to-Text 모델입니다. 이 필드를 telephony로 설정합니다.
    • 최적의 스크립트 품질을 위해 singleUtterancefalse로 설정해야 합니다. singleUtterancefalse인 경우 END_OF_SINGLE_UTTERANCE를 기대해서는 안 되지만 StreamingAnalyzeContentResponse.recognition_result 내의 isFinal==true를 사용하여 스트림을 절반으로 닫을 수 있습니다.
    • 선택적 추가 매개변수: 다음 매개변수는 선택사항입니다. 이러한 매개변수에 액세스하려면 Google 담당자에게 문의하세요.
      • languageCode: 오디오의 language_code입니다. 기본값은 en-US입니다.
      • alternativeLanguageCodes: 오디오에서 감지될 수 있는 추가 언어입니다. Agent Assist는 language_code 필드를 사용하여 오디오 시작 부분에서 언어를 자동으로 감지하고 이후 모든 대화에서 해당 언어를 사용합니다. alternativeLanguageCodes 필드를 사용하면 Agent Assist에서 선택할 수 있는 옵션을 더 지정할 수 있습니다.
      • phraseSets: Speech-to-Text 모델 적응 phraseSet 리소스 이름입니다. CCAI 스크립트로 모델 적응을 사용하려면 먼저 Speech-to-Text API를 사용하여 phraseSet를 만들고 여기에 리소스 이름을 지정해야 합니다.
  • 오디오 페이로드가 포함된 두 번째 요청을 전송한 후에는 스트림에서 일부 StreamingAnalyzeContentResponses를 수신해야 합니다.

    • StreamingAnalyzeContentResponse.recognition_result에서 is_finaltrue로 설정된 경우 스트림을 반쯤 닫거나 Python과 같은 일부 언어에서 전송을 중지할 수 있습니다.
    • 스트림을 반쯤 닫으면 서버는 최종 스크립트가 포함된 응답과 잠재적인 Dialogflow 추천 또는 Agent Assist 추천을 다시 전송합니다.
  • 최종 스크립트는 다음 위치에서 확인할 수 있습니다.

  • 이전 스트림이 닫힌 후 새 스트림을 시작합니다.

    • 오디오 재전송: 새 스트림 시작 시간에 대한 is_final=true가 포함된 응답의 마지막 speech_end_offset 후에 생성된 오디오 데이터는 최상의 스크립트 품질을 위해 StreamingAnalyzeContent로 다시 전송해야 합니다.
  • 다음은 스트림의 작동 방식을 보여주는 다이어그램입니다.

스트리밍 인식 요청 코드 샘플

다음 코드 샘플은 스트리밍 텍스트 변환 요청을 보내는 방법을 보여줍니다.

Python

Agent Assist에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Google Cloud Dialogflow API sample code using the StreamingAnalyzeContent
API.

Also please contact Google to get credentials of this project and set up the
credential file json locations by running:
export GOOGLE_APPLICATION_CREDENTIALS=<cred_json_file_location>

Example usage:
    export GOOGLE_CLOUD_PROJECT='cloud-contact-center-ext-demo'
    export CONVERSATION_PROFILE='FnuBYO8eTBWM8ep1i-eOng'
    export GOOGLE_APPLICATION_CREDENTIALS='/Users/ruogu/Desktop/keys/cloud-contact-center-ext-demo-78798f9f9254.json'
    python streaming_transcription.py

Then started to talk in English, you should see transcription shows up as you speak.

Say "Quit" or "Exit" to stop.
"""

import os
import re
import sys

from google.api_core.exceptions import DeadlineExceeded

import pyaudio

from six.moves import queue

import conversation_management
import participant_management

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
CONVERSATION_PROFILE_ID = os.getenv("CONVERSATION_PROFILE")

# Audio recording parameters
SAMPLE_RATE = 16000
CHUNK_SIZE = int(SAMPLE_RATE / 10)  # 100ms
RESTART_TIMEOUT = 160  # seconds
MAX_LOOKBACK = 3  # seconds

YELLOW = "\033[0;33m"


class ResumableMicrophoneStream:
    """Opens a recording stream as a generator yielding the audio chunks."""

    def __init__(self, rate, chunk_size):
        self._rate = rate
        self.chunk_size = chunk_size
        self._num_channels = 1
        self._buff = queue.Queue()
        self.is_final = False
        self.closed = True
        # Count the number of times the stream analyze content restarts.
        self.restart_counter = 0
        self.last_start_time = 0
        # Time end of the last is_final in millisec since last_start_time.
        self.is_final_offset = 0
        # Save the audio chunks generated from the start of the audio stream for
        # replay after restart.
        self.audio_input_chunks = []
        self.new_stream = True
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            channels=self._num_channels,
            rate=self._rate,
            input=True,
            frames_per_buffer=self.chunk_size,
            # Run the audio stream asynchronously to fill the buffer object.
            # This is necessary so that the input device's buffer doesn't
            # overflow while the calling thread makes network requests, etc.
            stream_callback=self._fill_buffer,
        )

    def __enter__(self):
        self.closed = False
        return self

    def __exit__(self, type, value, traceback):
        self._audio_stream.stop_stream()
        self._audio_stream.close()
        self.closed = True
        # Signal the generator to terminate so that the client's
        # streaming_recognize method will not block the process termination.
        self._buff.put(None)
        self._audio_interface.terminate()

    def _fill_buffer(self, in_data, *args, **kwargs):
        """Continuously collect data from the audio stream, into the buffer in
        chunksize."""

        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self):
        """Stream Audio from microphone to API and to local buffer"""
        try:
            # Handle restart.
            print("restart generator")
            # Flip the bit of is_final so it can continue stream.
            self.is_final = False
            total_processed_time = self.last_start_time + self.is_final_offset
            processed_bytes_length = (
                int(total_processed_time * SAMPLE_RATE * 16 / 8) / 1000
            )
            self.last_start_time = total_processed_time
            # Send out bytes stored in self.audio_input_chunks that is after the
            # processed_bytes_length.
            if processed_bytes_length != 0:
                audio_bytes = b"".join(self.audio_input_chunks)
                # Lookback for unprocessed audio data.
                need_to_process_length = min(
                    int(len(audio_bytes) - processed_bytes_length),
                    int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8),
                )
                # Note that you need to explicitly use `int` type for substring.
                need_to_process_bytes = audio_bytes[(-1) * need_to_process_length :]
                yield need_to_process_bytes

            while not self.closed and not self.is_final:
                data = []
                # Use a blocking get() to ensure there's at least one chunk of
                # data, and stop iteration if the chunk is None, indicating the
                # end of the audio stream.
                chunk = self._buff.get()

                if chunk is None:
                    return
                data.append(chunk)
                # Now try to the rest of chunks if there are any left in the _buff.
                while True:
                    try:
                        chunk = self._buff.get(block=False)

                        if chunk is None:
                            return
                        data.append(chunk)

                    except queue.Empty:
                        break
                self.audio_input_chunks.extend(data)
                if data:
                    yield b"".join(data)
        finally:
            print("Stop generator")


def main():
    """start bidirectional streaming from microphone input to Dialogflow API"""
    # Create conversation.
    conversation = conversation_management.create_conversation(
        project_id=PROJECT_ID, conversation_profile_id=CONVERSATION_PROFILE_ID
    )

    conversation_id = conversation.name.split("conversations/")[1].rstrip()

    # Create end user participant.
    end_user = participant_management.create_participant(
        project_id=PROJECT_ID, conversation_id=conversation_id, role="END_USER"
    )
    participant_id = end_user.name.split("participants/")[1].rstrip()

    mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE)
    print(mic_manager.chunk_size)
    sys.stdout.write(YELLOW)
    sys.stdout.write('\nListening, say "Quit" or "Exit" to stop.\n\n')
    sys.stdout.write("End (ms)       Transcript Results/Status\n")
    sys.stdout.write("=====================================================\n")

    with mic_manager as stream:
        while not stream.closed:
            terminate = False
            while not terminate:
                try:
                    print(f"New Streaming Analyze Request: {stream.restart_counter}")
                    stream.restart_counter += 1
                    # Send request to streaming and get response.
                    responses = participant_management.analyze_content_audio_stream(
                        conversation_id=conversation_id,
                        participant_id=participant_id,
                        sample_rate_herz=SAMPLE_RATE,
                        stream=stream,
                        timeout=RESTART_TIMEOUT,
                        language_code="en-US",
                        single_utterance=False,
                    )

                    # Now, print the final transcription responses to user.
                    for response in responses:
                        if response.message:
                            print(response)
                        if response.recognition_result.is_final:
                            print(response)
                            # offset return from recognition_result is relative
                            # to the beginning of audio stream.
                            offset = response.recognition_result.speech_end_offset
                            stream.is_final_offset = int(
                                offset.seconds * 1000 + offset.microseconds / 1000
                            )
                            transcript = response.recognition_result.transcript
                            # Half-close the stream with gRPC (in Python just stop yielding requests)
                            stream.is_final = True
                            # Exit recognition if any of the transcribed phrase could be
                            # one of our keywords.
                            if re.search(r"\b(exit|quit)\b", transcript, re.I):
                                sys.stdout.write(YELLOW)
                                sys.stdout.write("Exiting...\n")
                                terminate = True
                                stream.closed = True
                                break
                except DeadlineExceeded:
                    print("Deadline Exceeded, restarting.")

            if terminate:
                conversation_management.complete_conversation(
                    project_id=PROJECT_ID, conversation_id=conversation_id
                )
                break


if __name__ == "__main__":
    main()