自動電話応対を実現する連携サーバーをGoで実装する
はじめに
RightTouchでは、電話での音声自動応対を行うVoicebotプロダクトを開発しています。
Webアプリケーションと異なり、Voicebotはユーザーの電話機や企業が持つ電話基盤とサーバとの連携が必須になりますが、その際にはSIPやRTPといったプロトコルが用いられます。
ただ、通常のNode.js等で作られるサーバはHTTPやWebSocketといったインターフェースしか備えていないことが多いため、SIP/RTPとこれらのプロトコルを変換するための何かしらのプロキシ層が必要となります。
代表的なプロキシにはTwilioなどのSaaSがありますが、高額な費用や国外リージョンであることなどデメリットも多いです。また、AsteriskのようなPBXのOSSを自前でホスティングする選択肢もありますが、やりたいことに対して機能が多すぎ、また複雑な設定ファイルの管理が必要など、課題もあります。
本記事では、上記課題を踏まえ、シンプルにSIP/RTPとWebSocketの変換だけを行うような軽量なプロキシを自前実装してみましたので、その概要と工夫点を紹介します。
おおまかな設計
今回実装するサーバーは、クラウドPBXサービスであるGenesys Cloudから電話を受け、音声ボットが動作している別のサーバーにWebSocketで接続して音声の受け渡しを行うことが目的です。
具体的には、以下のような流れで動作します。
- PBXクライアントとSIP通信を張り、SDPで通信方法を交換する
- 1が完了したら音声受信用のポートを開き、PBXクライアントから音声をRTPで受け取る
- 音声ボットサーバーとWebSocket通信を張り、2で受け取った音声を転送し続ける
- 音声ボットサーバーからは自動音声のデータをWebSocketで受信する
- 受け取った自動音声データをRTPに乗せ、PBXクライアントに転送する
音声ボットサーバーは、実際には自動音声を送信しつつ架電元の音声を聞いて処理を行うため、手順2、3、4、5は全て並列で行われます。また実際にはこの後SMS送信や電話転送などの手順が加わりますが、今回の記事では割愛します。
なぜGo?
今回実装するサーバーは二つの異なるエンティティ(PBXクライアントと音声ボットサーバー)と通信を行うため、マルチスレッドでの処理が求められます。マルチスレッドプログラミングが容易かつSIPやSDP、RTPやWebSocketの枯れた実装がすでに存在するプログラミング言語を調査した結果、Goを採用することに決めました。
サーバーの実装
ここからは、実際のコードを示しつつサーバーの実装を進めていきます。
SIP/SDP連携
まずはPBXクライアントから電話を受け取るためのSIP/SDP連携を実装します。
SIP(Session Initiation Protocol)とは、主にIP上で電話接続を行う、いわゆるVoIP通信の際に用いられるプロトコルです。今回のような自動電話対応のケースでは、PBXクライアントからSIPでのリクエストが届き、サーバーはそれに応答する形で通信が確立します。リクエストとレスポンスのペイロードには音声をやりとりするための情報(音声プロトコル、周波数、ポート番号など)がSDP(Session Description Protocol)というプロトコルの形式に沿って記述されます。
SIPのシーケンス
INVITEの受信とハンドラーの登録
サーバーはまずクライアントからのメッセージを待ち、受信したらメッセージタイプに応じてハンドラーに処理を振り分けていきます。今回SIPの通信にはemiago/sipgoを利用します。
ua, err := sipgo.NewUA(
sipgo.WithUserAgent("sample-useragent"),
)
if err != nil {
log.Fatal().Err(err).Msg("Fail to setup user agent")
}
srv, err := sipgo.NewServer(ua)
if err != nil {
log.Fatal().Err(err).Msg("Fail to setup server handle")
}
ctx := context.TODO()
srv.OnInvite(func(req *sip.Request, tx sip.ServerTransaction) {
SipOnInvite(req, tx, ctx)
})
ここではInviteに対するハンドラーのみ登録していますが、実際はACKやBYEなど他のメッセージタイプに対してもハンドラーを登録します。
INVITEハンドラーの実装
INVITEハンドラー(上のコードのSipOnInvite
)は以下の処理を担います。
- 認証情報のチェック
- Status 100 Tryingメッセージの返信
- INVITEのSDPを読み、対応するSDPと共にStatus 200 OKメッセージを返信
- 架電元番号など必要な情報をSDPから取得してWebSocket通信用スレッドを作成
- RTP通信のためのポート番号をSDPから取得してRTP通信用スレッドを作成
それぞれを実装したハンドラーが以下になります。
func SipOnInvite(req *sip.Request, tx sip.ServerTransaction, ctx context.Context) {
// リクエストの認証情報をチェック
// 認証情報が無い/間違っている場合401 Unauthorizedを返す
ok := SipCheckAuth(req, tx)
if !ok {
return
}
// 100 Tryingを返す(PBXクライアントによってはこれを返す必要がない場合もある)
resp100 := sip.NewResponseFromRequest(req, 100, "Trying", nil)
errResp100 := tx.Respond(resp100)
if errResp100 != nil {
log.Fatal().Err(errResp100).Msg("Error sending 100 Trying response")
return
}
// SDPをデコードしてレスポンスを生成、200 OKを返す
sess := decodeSdp(string(req.Body()))
respBody := encodeSdp(sess.Origin.SessionID, sess.Origin.SessionVersion)
resp := sip.NewSDPResponseFromRequest(req, respBody.Bytes())
resp.AppendHeader(sip.NewHeader("Contact", fmt.Sprintf("<sip:%s:%s>", HOST_ADDRESS, HOST_PORT)))
errResp200 := tx.Respond(resp)
if errResp200 != nil {
log.Fatal().Err(errResp200).Msg("Error sending 200 to INVITE")
}
// 必要な情報をSDPから取得しWebSocket/RTPスレッドを生成
// 架電元番号やプロジェクトIDなどを元にWebSocketサーバーのURLを決定するが、詳細な実装は省く
wsUrl := getWsUrl(sess)
rtpSend := make(chan *rtp.Packet)
wsSend := make(chan []byte)
ws_thread.WsClient(wsUrl, rtpSend, wsSend)
dst_addr_rtp := fmt.Sprintf("%s:%d", sess.Connection.Address, sess.Media[0].Port)
src_port_rtp := strconv.FormatInt(int64(respBody.Media[0].Port), 10)
rtp_thread.RtpSocket(&src_port_rtp, &dst_addr_rtp, rtpSend, wsSend)
}
WebSocket通信のために必要な情報は音声ボットサーバーの実装にも依存するため詳細は省きますが、この流れでSIPの通信が確立します。
SDPのエンコーダ/デコーダ実装
今回はPBXクライアントが一つに定まっているため、ある程度決め打ちでSDPを書きました。特段詰まった点などはなく実装も簡単なためコードは貼りませんが、SDPエンコーダ/デコーダにはpixelbender/go-sdpを利用しています。今回は音声コーデックはulaw(PCMU)の8000Hz、DTMF(キーパッド入力の情報)は8000Hzで通信を行います。
WebSocket/RTP通信の実装
続いてSIP通信が確立した後の処理を実装していきます。
スレッドの分割
SIP連携に関しては通信開始時にのみ発生する通信のためメインスレッドで処理しますが、その後のRTPとWebSocketの処理については常時接続なためそれぞれwrite/readのスレッドに分けて行います。それぞれのスレッドの役割は以下のように分類されます。
- WebSocket Writeスレッド
- RTP Readスレッドから架電元の音声を受信し音声ボットサーバーに送信
- WebSocket Readスレッド
- 音声ボットサーバーから自動音声を受信しRTP Writeスレッドに転送
- RTP Writeスレッド
- WebSocket Readスレッドから自動音声を受信しPBXクライアントに送信
- RTP Readスレッド
- PBXクライアントから架電元の音声を受信しWebSocket Writeスレッドに転送
文字で書くといささかややこしいですが、図で示すと以下のようになります。
前の章でmainスレッドでのSIP連携が完了しています。スレッド間の通信にはGoのチャネル(chan
)を利用します。
RTPスレッドの実装
SIP連携が完了したら、まずRTP通信用のソケットを作ります。RTPのライブラリにはWebRTCの実装で名高いpionのpion/rtpを利用しています。
func RtpSocket(srcPort *string, dstAddress *string, rtpSend chan *rtp.Packet, wsSend chan []byte) {
srcAddr, err := net.ResolveUDPAddr("udp", ":"+*srcPort)
if err != nil {
log.Err(err).Msg("Failed to resolve src UDP address")
}
conn, err := net.ListenUDP("udp", srcAddr)
if err != nil {
log.Err(err).Str("addr", srcAddr.String()).Msg("Failed to listen")
}
log.Info().Int("port", srcAddr.Port).Msg("Listening RTP packets")
dstAddr, err := net.ResolveUDPAddr("udp", *dstAddress)
if err != nil {
log.Err(err).Str("addr", *dstAddress).Msg("Failed to resolve dst UDP address")
}
log.Info().Str("addr", dstAddr.String()).Msg("RTP destination address")
go WriteThread(conn, dstAddr, rtpSend)
go ReadThread(conn, wsSend)
}
このRtpSocket関数はSIPのINVITEハンドラーから呼ばれ、SDPから得たsrcポートとdstポートを用いてUDPソケットを作成します。最後の2行でread/writeスレッドを呼び、以降はこの2つのスレッドで処理が行われます。
RTP Writeスレッドの実装
func WriteThread(conn *net.UDPConn, dstAddress *net.UDPAddr, rtpSend chan *rtp.Packet) {
defer conn.Close()
for {
packet, ok := <-rtpSend // 基本はrtp_thread.Writerからの入力なのでパケット分割済み
if !ok {
return
}
data, err := packet.Marshal()
if err != nil {
log.Err(err).Msg("Failed to marshal RTP packet")
}
_, err = conn.WriteToUDP(data, dstAddress)
if err != nil {
log.Err(err).Msg("Failed to send RTP packet")
}
}
}
RTP Writeスレッドは、チャネルを通して送られてくる自動音声のRTPパケットをPBXクライアントに送信するだけの非常に簡単な処理を行います。チャネルから来るメッセージは以下に示すrtp_thread.Writer
関数での処理のため、全て適切なサイズのRTPパケットに整形されています。
type WriterType struct {
Send func([]byte, chan *rtp.Packet)
}
func Writer() WriterType {
seq := uint16(rand.Intn(55535))
timeStamp := uint32(0)
ssrc := rand.Uint32()
fragmentSize := shared.ULAW_RATE / 50 // 20ms
Send := func(data []byte, c chan *rtp.Packet) {
fragments := Fragment(data, &fragmentSize) // Fragment関数の中身は省くが要はdataを指定されたfragmentSizeで区切る
log.Info().Int("vduration(ms)", len(fragments)*FRAGMENT_MS).Msg("Sending audio fragments")
for i, fragment := range fragments {
var p *rtp.Packet
if i == 0 {
p = Package(&fragment, &seq, &timeStamp, &ssrc, true)
} else {
p = Package(&fragment, &seq, &timeStamp, &ssrc, false)
}
c <- p
// 一気にパケットを送ると再生されないので19msごとに送る
// sleepではなくバッファーのようにした方が良いかも
time.Sleep((FRAGMENT_MS - 1) * time.Millisecond)
seq++
timeStamp += uint32(len(fragment))
}
}
return WriterType{Send}
}
WebSocket Readスレッドは自動音声を受信したらRTP Writeスレッドに転送したいわけですが、RTP WriteスレッドはRTPパケットを送る処理のみを行います。そこで両スレッドの橋渡し的に自動音声の分割とRTPパケット生成を行うのがこのrtp_thread.Writer
関数です。
RTP Readスレッドの実装
func ReadThread(conn *net.UDPConn, wsSend chan []byte) {
defer conn.Close()
buffer := make([]byte, 2048)
for {
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Err(err).Msg("Failed to read RTP packets")
}
// strip the first 12 bytes of the RTP header
data := buffer[12:n]
if err != nil {
log.Err(err).Msg("Failed to encode audio")
}
base64Encoded := base64.StdEncoding.EncodeToString(data)
// BotMessageはWS通信に用いられる構造体
msg := BotMessage{
Type: "audio",
Data: BotMessageData{
Audio: base64Encoded,
},
}
jsonEncoded, derr := json.Marshal(msg)
if derr != nil {
log.Err(err).Msg("Failed to encode audio")
}
wsSend <- jsonEncoded
}
}
RTP ReadスレッドはPBXクライアントから音声を受信し、ペイロードをbase64エンコードしてWebSocket Writerスレッドに送っています。パケットの分割など複雑な処理がないため、RTP readスレッド-WS Writeスレッド間にrtp_thread.Writer
のような関数は設けていません。
ちなみに、SIPでの通信が確立した瞬間からRTPパケットが継続的に送られ続けるため、音声ボットサーバーには架電元が喋っていようがいまいがRTPパケットが転送され続けています。
WebSocketスレッドの実装
RTP同様に、WebSocketもまず通信を確立し、その後read/writeスレッドを起動する処理を行います。WebSocketライブラリはgorilla/websocketを利用しています。
詳細な実装は、WebSocketサーバ側の実装にも依存するため、ここでは省略します。
サーバー実装のまとめ
以上の実装で一通りPBXクライアントと音声ボットサーバーの中継サーバーとしての役割は果たせるかと思います。今回はSIP/SDP、RTP over UDP、WebSocketと複数のプロトコルを用いましたが、PBXクライアントの種類によっては中継サーバーを必要とせず、直接自動応対のサーバーと通信が可能な場合もあります。
おわりに
Genesys Cloudをはじめとした電話系サービスの多くはドキュメントが整備されておらず、ユーザーの記事も少ないためデバッグが難しいというのが今回最も苦労した点でした。おそらく世の中の電話エンジニアは筆者と同じようにパケットキャプチャと睨めっこしながら苦しんでシステムを構築しているかと思いますが、この記事の一部でも参考になれば幸いです。
Discussion