はじめまして、CyberAgent AI Lab Intaractive Agentチームの技術研究員の大平といいます。
この記事は CyberAgent Developers Advent Calendar 2023 1日目の記事です。
ChatGPTの登場以降、自然なチャット対話はAPI呼び出しだけで簡単に実装できるようになりました。
更に人間のようなインタラクションを実現しようとすれば、音声対話に発展させたいと思う方も多いかと思われます。
しかし実際にLLMを使って音声対話システムを構築してみると、そのレスポンスの遅さに不満を感じることになります。
この記事ではよくあるシンプルなLLMを用いた音声対話に対していくつかの工夫を施し、その応答速度をできるだけ早めてみようという試みになります。
よくある構成として、以下を用います。
- 音声認識 Google STT
- LLM ChatGPT 3.5-turbo
- 音声合成 voicevox
記事ではざっくり流れを追いますが、実際に動かしてみたい人はgithubをみてみてください。
音声認識
マイクから入ってきた音声をリアルタイム音声認識するドキュメントは、公式をたどっていくとこのリンクにたどり着きます。基本これをそのまま使いますが、ちょっとだけ修正します。
wget https://github.com/GoogleCloudPlatform/python-docs-samples/raw/main/speech/microphone/transcribe_streaming_infinite.py -O stt/google_stt.py
まず、listen_print_loopという関数を以下のように修正
before
def listen_print_loop(responses: object, stream: object) -> object:
[中略]
if result.is_final:
sys.stdout.write(GREEN)
sys.stdout.write("\033[K")
sys.stdout.write(str(corrected_time) + ": " + transcript + "\n")
stream.is_final_end_time = stream.result_end_time
stream.last_transcript_was_final = True
# Exit recognition if any of the transcribed phrases could be
# one of our keywords.
if re.search(r"\b(exit|quit)\b", transcript, re.I):
sys.stdout.write(YELLOW)
sys.stdout.write("Exiting...\n")
stream.closed = True
break
else:
sys.stdout.write(RED)
sys.stdout.write("\033[K")
sys.stdout.write(str(corrected_time) + ": " + transcript + "\r")
stream.last_transcript_was_final = False
return transcript
after
def listen_print_loop(responses: object, stream: object, callback_interim: object, callback_final: object) -> object:
[中略]
if result.is_final:
sys.stdout.write(GREEN)
sys.stdout.write("\033[K")
sys.stdout.write(str(corrected_time) + ": " + transcript + "\n")
if callback_final != None:
callback_final(transcript)
stream.is_final_end_time = stream.result_end_time
stream.last_transcript_was_final = True
# Exit recognition if any of the transcribed phrases could be
# one of our keywords.
if re.search(r"\b(exit|quit)\b", transcript, re.I):
sys.stdout.write(YELLOW)
sys.stdout.write("Exiting...\n")
stream.closed = True
break
else:
sys.stdout.write(RED)
sys.stdout.write("\033[K")
sys.stdout.write(str(corrected_time) + ": " + transcript + "\r")
if callback_interim != None:
callback_interim(transcript)
stream.last_transcript_was_final = False
return transcript
Google STTの出力にはinterimと呼んでいる音声認識の途中結果と、finalと呼んでいる音声認識の最終結果があります。それぞれが出力されたときに別モジュールにデータを渡すためcallbackを呼び出すようにしました。また公式のミスなのかあるいは意図的なのかはわかりませんが return transcript
の記述位置がstreamのfor文の中になっており、このままでは精度の悪いデータしか得られないため return transcript
をfor文の外に出しています。
LLM
2023年11月現在の公式のサンプルに対話履歴のみをくっつけたシンプルなものです。記事のコンセプトが応答速度高速化なので、gpt-4よりも速いと噂のgpt-3.5-turboで試しています。
from openai import OpenAI
class ChatGPT():
def __init__(self, valid_stream) -> None:
self.client = OpenAI()
self.dialogue_history = []
self.valid_stream = valid_stream
def get(self, user_utterance):
self.dialogue_history.append({"role": "user", "content": user_utterance})
completion = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=self.dialogue_history,
stream = self.valid_stream
)
return completion
def set_agent_utterance(self, agent_utterance):
self.dialogue_history.append({"role": "assistant", "content": agent_utterance})
音声合成
voicevoxは mac, windows, LinuxのすべてのOSでアプリが提供されています。アプリ版voicevoxは起動すると裏ではhttp serverとして機能するのでそれを使用します。extract_wav_lengthでは生成される音声データの秒数を取得しています。後ほど使用します。
import requests
import json
import time
def get_audio_query(text, speaker = 1):
query_payload = {"text": text, "speaker": speaker}
while True:
try:
url = "http://localhost:50021/audio_query"
r = requests.post(url, params=query_payload, timeout=(10.0, 300.0))
if r.status_code == 200:
return r.json()
except requests.exceptions.ConnectionError:
print('fail connect...', url)
time.sleep(0.1)
def run_synthesis(query_data, speaker = 1):
synth_payload = {"speaker": speaker}
while True:
try:
url = "http://localhost:50021/synthesis"
r = requests.post(url, params=synth_payload, data=json.dumps(query_data), timeout=(10.0, 300.0))
if r.status_code == 200:
return r.content
except requests.exceptions.ConnectionError:
print('fail connect...', url)
time.sleep(0.1)
def extract_wav_length(query_data):
length = 0
for accent_phrase in query_data["accent_phrases"]:
for mora in accent_phrase["moras"]:
if mora["consonant_length"] != None:
length += mora["consonant_length"]
if mora["vowel_length"] != None:
length += mora["vowel_length"]
return length
def get_audio_file_from_text(text):
query_data = get_audio_query(text)
return run_synthesis(query_data), extract_wav_length(query_data)
シンプルな音声対話システム
構成要素が出揃ったので、さっそく音声対話システムを構築していきましょう。
何も考えずとりあえず実装すると以下のようになります。
from stt import google_stt
from llm import chatgpt
from tts import voicevox
import threading
from playsound import playsound
import time
class Main():
def __init__(self) -> None:
stt_thread = threading.Thread(target=google_stt.main, args=(self.callback_interim, self.callback_final,))
self.llm = chatgpt.ChatGPT(valid_stream=False)
self.latest_user_utterance = None
self.finished_user_speeching = False
# 計測用
self.time_user_speeching_end = None
stt_thread.start()
def wait(self):
thread_list = threading.enumerate()
thread_list.remove(threading.main_thread())
for thread in thread_list:
thread.join()
def callback_interim(self, user_utterance):
self.latest_user_utterance = user_utterance
def callback_final(self, user_utterance):
self.time_user_speeching_end = time.time()
self.latest_user_utterance = user_utterance
threading.Thread(target=self.main_process, args=(self.latest_user_utterance,)).start()
def main_process(self, user_utterance):
llm_result = self.llm.get(user_utterance)
if type(llm_result) == str:
wav_data, _ = voicevox.get_audio_file_from_text(llm_result)
self.audio_play(wav_data)
def audio_play(self, wav_data):
with open("tmp.wav", mode='bw') as f:
f.write(wav_data)
if self.time_user_speeching_end != None:
print("応答までの時間", time.time() - self.time_user_speeching_end)
self.time_user_speeching_end = None
playsound("tmp.wav")
if __name__ == '__main__':
ins = Main()
ins.wait()
Google STTがfinalを出力するのを待ち、LLMを起動します。LLMからの応答がstringで返ってくるので、それを音声合成にかけます。音声合成が終了すると再生します。
これにかかった時間が以下です。
Agent発話が短文 | 3.46s | |
Agent発話が長文 | 4.37s |
相手の発話を待つ、という確たる意識がないと厳しいくらいの遅さですね。
では今からこれを改善していきます。
1. Google STT の final を待たない
Google STTはinterimですらざっくり0.8sくらいの遅延があり、finalに至っては数秒になることもあります。(経験上、インターネットの速度に大きく影響を受けます)
ここで問題になるのが「finalを使わないなら、どのタイミングをユーザの発話が終わったとみなすのか」ということです。
interimはユーザ発話中の音声認識の途中結果という立ち位置であり、これをトリガーにするとユーザが気分良く話しているときに割り込んでしまう恐れがあります。話を最後まで聞かない人は嫌われます。
この「ユーザの発話区間検出」のために別のモジュールを実装します。
import pyaudio as pa
import webrtcvad
# https://pypi.org/project/webrtcvad-wheels/
# sample_rateは 8000, 16000, 32000 or 48000 Hzのいずれか
RATE=16000
# duration timeは10, 20, 30 msのいずれか
BUFFER_SIZE=160 # 10ms
class GOOGLE_WEBRTC():
def __init__(self):
## ストリーム準備
self.audio = pa.PyAudio()
self.stream = self.audio.open(
rate=RATE,
channels=2,
format=pa.paInt16,
input=True,
frames_per_buffer=BUFFER_SIZE
)
if self.stream == None:
raise EnvironmentError("audio streamが開けませんでした")
# 無音区間検出
self.vad = webrtcvad.Vad()
self.thread_alive = True
def vad_loop(self, callback):
self.before_result = False
while self.thread_alive:
## ストリームからデータを取得
audio_data = self.stream.read(BUFFER_SIZE, exception_on_overflow = False)
vad_result = self.vad.is_speech(audio_data, RATE)
if vad_result != self.before_result:
if callback != None:
callback(vad_result)
self.before_result = vad_result
def shutdown(self):
self.thread_alive = False
self.stream.stop_stream()
self.stream.close()
self.audio.terminate()
Google webrtcです。これは音声が入ったタイミングでTrueになり、音声が途切れればFalseになります。VADは音声区間検出と言われますが、音声認識より時間応答に優れています。とくにGoogle webrtcは精度が良いと評判です。
ただ精度が良すぎるのか、音声以外の「音」にも反応します。キーボードをカタカタ叩くだけでもONOFF切り替わります。そのためGoogle STTのinterimが存在し(すでにユーザは発話中であるという確信が持てていて)、Google webrtcがOFFになったタイミング(発話が終了した直後である)で実行するようにしてみます。
from stt import google_stt
from vad import google_vad
from llm import chatgpt
from tts import voicevox
import threading
from playsound import playsound
import time
class Main():
def __init__(self) -> None:
self.valid_stream = False
vad = google_vad.GOOGLE_WEBRTC()
vad_thread = threading.Thread(target=vad.vad_loop, args=(self.callback_vad, ))
stt_thread = threading.Thread(target=google_stt.main, args=(self.callback_interim, self.callback_final,))
self.llm = chatgpt.ChatGPT(valid_stream=self.valid_stream)
self.latest_user_utterance = None
self.finished_user_speeching = False
# 計測用
self.time_user_speeching_end = None
stt_thread.start()
vad_thread.start()
def wait(self):
thread_list = threading.enumerate()
thread_list.remove(threading.main_thread())
for thread in thread_list:
thread.join()
def callback_interim(self, user_utterance):
self.latest_user_utterance = user_utterance
def callback_final(self, user_utterance):
self.latest_user_utterance = user_utterance
def callback_vad(self, flag):
if flag == True:
self.latest_user_utterance = None
elif self.latest_user_utterance != None:
self.time_user_speeching_end = time.time()
threading.Thread(target=self.main_process, args=(self.latest_user_utterance,)).start()
def main_process(self, user_utterance):
llm_result = self.llm.get(user_utterance)
if self.valid_stream == False:
agent_utterance = llm_result.choices[0].message.content
wav_data, _ = voicevox.get_audio_file_from_text(agent_utterance)
self.audio_play(wav_data)
def audio_play(self, wav_data):
start_time = time.time()
with open("tmp.wav", mode='bw') as f:
f.write(wav_data)
if self.time_user_speeching_end != None:
print("応答までの時間", time.time() - self.time_user_speeching_end)
self.time_user_speeching_end = None
playsound("tmp.wav")
if __name__ == '__main__':
ins = Main()
ins.wait()
VADがOFFになったタイミングでmain_procesを実行するように修正しました。ちゃんと実装するならVADがOFFになっているのにinterimが到達しない場合も想定すべきですが、検証用コードなので今回はいいでしょう。
その場合の結果がこちらです。
Agent発話が短文 | 2.45s |
1秒くらい短くなりましたね。
ただこの方法はGoogle STTと併用する場合には一部デメリットもあります。Google STTはinterimと比べるとfinalの精度がいいので、精度の悪い音声認識結果に対して応答することになります。
他の音声認識AIとかだとそうでもないものも多いです。
2. 音声合成をGPUで行う
シンプルな解決策です。voicevoxにはGPUモードとCPUモードを切り替えらるので、これを行ってみます。
お手持ちのPCにNVIDIA製のGPUがはいっており、CUDA系の環境が整っていればアプリ上から設定可能なはずです。
GPU動作にすることで、特に長文の時のパフォーマンスが向上します。(なのに長文データを取り忘れました)
Agent発話が短文 | 1.41s |
もちろんAgentの発話内容がそれぞれ違うので厳密な比較はできないですが、ついに1秒台になりました。
3. 音声合成を文節区切りで行う
今までの方法だと、いかにGPUで音声合成が行われているといっても、Agentの発話がものすごい長文になるとかなりの時間がかかります。音声合成の時間だけではなく、LLMが文章を生成する時間も比例して大きくなってしまうためです。そのためLLMが文章を生成した傍らからどんどん音声合成していくことでより速度を向上させることができます。
リアルタイム生成が可能な音声合成システムも徐々にできてきているようですがまだまだ数は少ないです。既存の音声合成システムを活かすならLLMの出力をstreamで受け取り、一定の(音声合成の品質が劣化しない程度の)長さになるまでためておき、音声合成を行う仕組みを使ってみます。
リアルタイム生成が不可能な音声合成システムに対して1文字ごとに音声合成して音声をつなぎ合わせるという方法を使うと、音声の品質がめちゃめちゃ低下するので注意してくださいね。
from stt import google_stt
from vad import google_vad
from llm import chatgpt
from tts import voicevox
import threading
from playsound import playsound
import time
class Main():
def __init__(self) -> None:
self.valid_stream = True
vad = google_vad.GOOGLE_WEBRTC()
vad_thread = threading.Thread(target=vad.vad_loop, args=(self.callback_vad, ))
stt_thread = threading.Thread(target=google_stt.main, args=(self.callback_interim, self.callback_final,))
self.llm = chatgpt.ChatGPT(valid_stream=self.valid_stream)
self.latest_user_utterance = None
self.finished_user_speeching = False
# 計測用
self.time_user_speeching_end = None
stt_thread.start()
vad_thread.start()
def wait(self):
thread_list = threading.enumerate()
thread_list.remove(threading.main_thread())
for thread in thread_list:
thread.join()
def callback_interim(self, user_utterance):
print("interim", user_utterance)
self.latest_user_utterance = user_utterance
def callback_final(self, user_utterance):
print("final", user_utterance)
self.latest_user_utterance = user_utterance
def callback_vad(self, flag):
print("vad", flag)
if flag == True:
self.latest_user_utterance = None
elif self.latest_user_utterance != None:
self.time_user_speeching_end = time.time()
threading.Thread(target=self.main_process, args=(self.latest_user_utterance,)).start()
def main_process(self, user_utterance):
llm_result = self.llm.get(user_utterance)
if self.valid_stream == False:
agent_utterance = llm_result.choices[0].message.content
wav_data, wav_length = voicevox.get_audio_file_from_text(agent_utterance)
self.audio_play(wav_data, wav_length)
else:
u = ""
for chunk in llm_result:
word = chunk.choices[0].delta.content
if word == None:
break
u += word
for split_word in ["、","。", "?", "!"]:
if split_word in u:
print(u)
wav_data, wav_length = voicevox.get_audio_file_from_text(u)
self.audio_play(wav_data, wav_length)
u = ""
if u != "":
wav_data, wav_length = voicevox.get_audio_file_from_text(u)
self.audio_play(wav_data, wav_length)
def audio_play(self, wav_data, wav_length):
start_time = time.time()
with open("tmp.wav", mode='bw') as f:
f.write(wav_data)
if self.time_user_speeching_end != None:
print("応答までの時間", time.time() - self.time_user_speeching_end)
self.time_user_speeching_end = None
playsound("tmp.wav")
while time.time() - start_time < wav_length:
pass
if __name__ == '__main__':
ins = Main()
ins.wait()
こちらの方法では、1ユーザ発話に対して複数回Agentが音声を再生することになります。発話速度よりも音声合成の速度の方が速いので、何も考えずに再生しまくると声が重なってしまいます。そのため生成した音声データの秒数だけ待つ必要があります。
Agent発話が短文 | 0.887s | |
Agent発話が長文 | 1.22s |
ついに1秒を切り始めましたね。
ここまで来るとほぼ かかっている時間=LLMの初期読み込みの時間 と言えます。
ただ、ChatGPTの初期読み込みの時間は結構不安定なので、1s 切りが安定して出てくることはあまりありません。
となると次はLLMに手を加える必要が出てきます。
4. オンプレLLM
ChatGPTの使用を諦めて、オンプレLLMを用います。
一般的に買えるGPUに乗れるようにし、速度も向上させるためなるべく小さめのモデルを使用したうえで、llama.cppを用いて更に軽量化します。
今回はこちらのモデルを使用させていただきました。
まずはllama.cppをダウンロードしビルドします。
# bash
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp
make -j 8 LLAMA_CUBLAS=1
次に、LLMのデータをダウンロードします。
# python
import huggingface_hub
huggingface_hub.snapshot_download(repo_id='elyza/ELYZA-japanese-Llama-2-7b-instruct', cache_dir="original_models")
ダウンロードされたデータをGGUFモデルに変換し、その後8bitに軽量化します。
終わったら生成されたデータ gguf-models/elyza-q8.gguf
をpythonから読み込みやすい位置に移動しておいてください。
python3 convert.py original_models/models--elyza--ELYZA-japanese-Llama-2-7b-instruct/snapshots/48fa08b3098a23d3671e09565499a4cfbaff1923 --outfile gguf-models/elyza.gguf
./quantize gguf-models/elyza.gguf gguf-models/elyza-q8.gguf q8_0
llamaを読み込んでstream形式で出力できるようにします。
n_gpu_layers
が大きければ大きいほど早くはなりますが、メモリが大きなGPUが必要になります。0にすればGPUのないPCでも動きます。
# python
from llama_cpp import Llama
class Llama2():
def __init__(self, valid_stream=True) -> None:
self.llama = Llama(model_path="llm/models/elyza-q8.gguf", n_gpu_layers=50)
self.valid_stream = valid_stream
def get(self, user_utterance):
streamer = self.llama.create_chat_completion(
[{"role":"user", "content": f"""[INST] <>\nあなたはアシスタントです。\n<>\n\n{user_utterance}[/INST]"""}],
stream=self.valid_stream
)
return streamer
def set_agent_utterance(self, agent_utterance):
pass
最後に、これを読み込んで音声対話システムとして実行してみましょう。
from stt import google_stt
from vad import google_vad
from llm import llama2
from tts import voicevox
import threading
from playsound import playsound
import time
class Main():
def __init__(self) -> None:
self.valid_stream = True
vad = google_vad.GOOGLE_WEBRTC()
vad_thread = threading.Thread(target=vad.vad_loop, args=(self.callback_vad, ))
stt_thread = threading.Thread(target=google_stt.main, args=(self.callback_interim, self.callback_final,))
self.llm = llama2.Llama2(valid_stream=self.valid_stream)
self.latest_user_utterance = None
self.finished_user_speeching = False
# 計測用
self.time_user_speeching_end = None
stt_thread.start()
vad_thread.start()
def wait(self):
thread_list = threading.enumerate()
thread_list.remove(threading.main_thread())
for thread in thread_list:
thread.join()
def callback_interim(self, user_utterance):
print("interim", user_utterance)
self.latest_user_utterance = user_utterance
def callback_final(self, user_utterance):
print("final", user_utterance)
self.latest_user_utterance = user_utterance
def callback_vad(self, flag):
print("vad", flag)
if flag == True:
self.latest_user_utterance = None
elif self.latest_user_utterance != None:
self.time_user_speeching_end = time.time()
threading.Thread(target=self.main_process, args=(self.latest_user_utterance,)).start()
def main_process(self, user_utterance):
llm_result = self.llm.get(user_utterance)
if self.valid_stream == False:
# llama2のライブラリは旧バージョンのOpenAIに仕様を合わせているので微調整が必要
agent_utterance = llm_result["choices"][0]["message"]["content"]
wav_data, wav_length = voicevox.get_audio_file_from_text(agent_utterance)
self.audio_play(wav_data, wav_length)
else:
u = ""
for chunk in llm_result:
# llama2のライブラリは旧バージョンのOpenAIに仕様を合わせているので微調整が必要
if not "content" in chunk["choices"][0]["delta"]:
continue
word = chunk["choices"][0]["delta"]["content"]
if word == None:
break
u += word
for split_word in ["、","。", "?", "!"]:
if split_word in u:
print(u)
wav_data, wav_length = voicevox.get_audio_file_from_text(u)
self.audio_play(wav_data, wav_length)
u = ""
if u != "":
wav_data, wav_length = voicevox.get_audio_file_from_text(u)
self.audio_play(wav_data, wav_length)
def audio_play(self, wav_data, wav_length):
start_time = time.time()
with open("tmp.wav", mode='bw') as f:
f.write(wav_data)
if self.time_user_speeching_end != None:
print("応答までの時間", time.time() - self.time_user_speeching_end)
self.time_user_speeching_end = None
playsound("tmp.wav")
while time.time() - start_time < wav_length:
pass
if __name__ == '__main__':
ins = Main()
ins.wait()
結果はこちら
Agent発話が短文 | 0.195s | |
Agent発話が長文 | 0.368s |
即応答、と言っても差し支えないほどの速さになりました。
ここまで来ると速度の違和感は全く無いですね。(内容の違和感はでてきてしまいましたが)
最後に
CyberAgent AI Lab Intaractive Agentチームでは、フィールド環境で動作する音声対話システムの研究開発を行っています。また本記事で執筆したことに加え、さらにいくつかの改良を行った音声対話システムを言語・音声理解と対話処理研究会 SIG-SLUDで展示予定です。もしご興味がございましたらご来場・ご連絡いただけたらと思います。