📞

Amazon Connect で留守番電話を作る (2025-01)

2025/03/06に公開

Amazon Connectで留守番電話を作りました。
基本的には 参考記事 の手法を踏襲していますが、一部異なる部分もあるのでメモを残します。

  • 通話に関する情報を取得し S3 に保存する
  • Amazon Connect から Kinesis Video Streams に通話音声を流す
  • 通話終了後に Kinesis の当該通話のストリームから音声ファイルに変換し S3 に保存する

留守番電話のフロー

Amazon Connect のフローエディタで作成する留守番電話フローは概ね次のようになります。

  1. 着信
  2. AWS Lambda 関数を呼び出す: 通話に関する情報を取得し S3 に保存
  3. プロンプトの再生: 「ピーという音の後、○○秒以内に・・・」
  4. プロンプトの再生: 開始音
  5. メディアストリーミングの開始
  6. プロンプトの再生: 規定秒数待つ
  7. メディアストリーミングの停止
  8. 切断

通話に関する情報の取得と保存は着信直後に当該処理を行う lambda を呼び出して行います。

一方 Kinesis の通話ストリームから抽出し音声ファイルに変換・保存する機能も lambda で実装しましたが、フローからは呼び出しません。通話相手が規定時間より早く通話を切ってしまうような場合もこの関数が確実に実行されるようにする必要があるからです。

音声ファイル保存処理のタイミングについて

EventBridge を用いて DISCONNECTED イベントから lambda を呼び出すようにしました。Connect のイベントから contactId が取得できますので、これと着信時に保存した通話情報を用いて Kinesis のストリームにアクセスします。

通話に関する情報を取得する関数の例

import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';

const zp = (n) => ('0' + n).slice(-2);

export const handler = async (event) => {    
    console.log(JSON.stringify(event));

    const region = process.env['S3_REGION'];
    const bucketName = process.env['S3_BUCKET'];
    const path = process.env['S3_PATH'];
     
    const contactData = event.Details.ContactData;
    const audio = contactData.MediaStreams.Customer.Audio;
    const streamInformation = {
        contactId:           contactData.ContactId,
        customerEndpoint:    contactData.CustomerEndpoint.Address,
        systemEndpoint:      contactData.SystemEndpoint.Address,
        startFragmentNumber: audio.StartFragmentNumber,
        startTimestamp:      audio.StartTimestamp,
        stopFragmentNumber:  audio.StopFragmentNumber,
        stopTimestamp:       audio.StopTimestamp,
        streamARN:           audio.StreamARN,
    };

    const s3 = new S3Client({region:region});
    const d = new Date();
    const key = `${path}${d.getFullYear()}/${zp(d.getMonth()+1)}/${zp(d.getDate())}/${zp(d.getHours())}-${zp(d.getMinutes())}-${zp(d.getSeconds())}_${contactData.ContactId}.json`;
    const body = JSON.stringify(streamInformation);
    const params = {
        Bucket: bucketName,
        Key: key,
        Body: body
    };
    
    await s3.send(new PutObjectCommand(params));
    return {};
};

音声ファイルの変換・保存処理を行う関数の例

import { Buffer } from 'node:buffer';
import { S3Client, ListObjectsV2Command, GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3';
import { ConnectClient, DescribeContactCommand, StopContactStreamingCommand } from "@aws-sdk/client-connect";

import { Decoder } from 'ebml';
import { KinesisVideoClient, GetDataEndpointCommand } from '@aws-sdk/client-kinesis-video';
import { KinesisVideoMediaClient, GetMediaCommand } from "@aws-sdk/client-kinesis-video-media";
import { KinesisVideoArchivedMediaClient, ListFragmentsCommand, GetMediaForFragmentListCommand } from "@aws-sdk/client-kinesis-video-archived-media";

const zp = (n) => ('0' + n).slice(-2);
const heading = (word) => word.padStart(9, " ");

export const handler = async (event) => {
    console.log(JSON.stringify(event));

    const region = process.env['S3_REGION'];
    const bucketName = process.env['S3_BUCKET'];
    const getKey = process.env['S3_KEY_GET'];
    const putKey = process.env['S3_KEY_PUT'];

    // トリガーを Connect からのイベントに変更したので event から contactId などが得られる
    const contactId = event['detail']['contactId'];
    const instanceId = event['detail']['tags']['aws:connect:instanceId'];

    // S3に保存した通話情報からストリーミング開始時刻を取得
    // なおフロー内で保存処理をストリーミング開始直後に呼び出すようになったため
    // 終了時刻 endTimestamp が記録されなくなった
    const now = new Date();
    // TODO: 日付変更直後にこの関数が呼び出された場合、ファイルは前日のフォルダに作られている可能性がある
    const dir = `${now.getFullYear()}/${zp(now.getMonth()+1)}/${zp(now.getDate())}`;
    const prefix = `${getKey}${dir}/`;
    const s3 = new S3Client({region: region});
    const info = await getContactInfo(contactId, s3, bucketName, prefix);
 
    if (info != null) {
        const startTimestamp = parseInt(info.startTimestamp);
        const endTimestamp = info['endTimestamp'] ? parseInt(info.endTimestamp) : now.getTime();
        const mediaData = await getMediaData(info.streamName, startTimestamp, endTimestamp);
        const raw = await getRawMedia(mediaData);
        const wav = Converter.createWav(raw, 8000);
 
        // WAVファイルをS3に保存する
        let tagging = ''; // 付加情報をタグに追加する
        tagging += "customerEndpoint=" + info.customerEndpoint + '&';
        tagging += "systemEndpoint=" + info.systemEndpoint + '&';
        tagging += "startTimestamp=" + info.startTimestamp;
        const key_wav = `${putKey}${dir}/${info.fileName.replace(/\.json$/i, '.wav')}`;
        await s3.send(new PutObjectCommand({
            Bucket: bucketName,
            Key: key_wav,
            Body: Buffer.from(wav.buffer),
            Tagging: tagging
        }));
        // TODO: 録音データについてどこかに通知するなど
    }
    return {};
};

async function getContactInfo(contactId, s3, bucketName, prefix) {
    const records = await s3.send(new ListObjectsV2Command({
        Bucket: bucketName,
        Prefix: prefix,
    }));
    if (! records.Contents) {
        return null;
    }
 
    for (const record of records.Contents) {
        // console.log(record);
        const p = record.Key.split('/');
        if (p.length < 2 || ! p.at(-1).includes('_')) {
            continue;
        }
        const fileName = p.at(-1);
        // console.log(`get ${record.Key}, ${fileName}`);
        const c = fileName.split('_');
        if (c.length < 2 || !c[1].startsWith(contactId)) {
            // console.warn(`skip ${fileName}`);
            continue;
        };

        const data = await s3.send(new GetObjectCommand({
            Bucket: bucketName,
            Key: record.Key
        }));
        const infoStr = await data.Body.transformToString();
        const info = JSON.parse(infoStr);
        info.streamName = info.streamARN.split('stream/')[1].split('/')[0];
        info.fileName = fileName;
        return info;
    }
    return null;
}

async function getRawMedia(mediaData) {
    const decoder = new Decoder();
    let chunks = [];
    decoder.on('data', chunk => {
        if(chunk[1].name == 'SimpleBlock'){
            chunks.push(chunk[1].data);
        }
    });
    const bytes = await mediaData.Payload.transformToByteArray();
    // console.log(`bytes.length: ${bytes.length}`);
    decoder.write(Buffer.from(bytes));
     
    // chunksの結合
    const margin = 4; // 各chunkの先頭4バイトを破棄する
    var sumLength = 0;
    chunks.forEach( chunk => {
        sumLength += chunk.byteLength - margin;
    })
    var sample = new Uint8Array(sumLength);
    var pos = 0;
    chunks.forEach(chunk => {
        let tmp = new Uint8Array(chunk.byteLength - margin);
        for(var e = 0; e < chunk.byteLength -  margin; e++){
            tmp[e] = chunk[e + margin];
        }
        sample.set(tmp, pos);
        pos += chunk.byteLength - margin;
 
    })
    console.log(`sample.length: ${sample.length}`);
    return sample.buffer;
}

async function getMediaData(streamName, startTimestamp, endTimestamp) {
    const region = process.env['S3_REGION'];

    // ListFragments用のEndpointの取得
    const kinesisvideo = new KinesisVideoClient({region: region});
    let listFragmentsEndParams = {
        APIName: "LIST_FRAGMENTS",
        StreamName: streamName
    };
    const listFragmentsEnd = await kinesisvideo.send(new GetDataEndpointCommand(listFragmentsEndParams));

    const listFragmentsClient = new KinesisVideoArchivedMediaClient({endpoint: listFragmentsEnd.DataEndpoint, region:region});
    let listFragmentsParams = {
        FragmentSelector: {
            FragmentSelectorType: "PRODUCER_TIMESTAMP",
            TimestampRange: {
                StartTimestamp: new Date(startTimestamp),
                EndTimestamp: new Date(endTimestamp),
            }
        },
        StreamName: streamName
    };
    // ListFragmentsデータの取得
    const listFragments = await listFragmentsClient.send(new ListFragmentsCommand(listFragmentsParams));

    // GetMediaForFragmentList用のEndpointの取得
    let mediaEndParams = {
        APIName: "GET_MEDIA_FOR_FRAGMENT_LIST",
        StreamName: streamName
    };
    const mediaEnd = await kinesisvideo.send(new GetDataEndpointCommand(mediaEndParams));

    const mediaClient = new KinesisVideoArchivedMediaClient({endpoint: mediaEnd.DataEndpoint, region:region});

    listFragments.Fragments.sort(function(a, b){
        return (a.ProducerTimestamp > b.ProducerTimestamp ? 1 : -1);
    });
    let fragmentNumberArray = [];
    listFragments.Fragments.forEach(function(fragment) {
        fragmentNumberArray.push(fragment["FragmentNumber"]);
    });

    const mediaParams = {
        Fragments: fragmentNumberArray,
        StreamName: streamName
    };
    const media = await mediaClient.send(new GetMediaForFragmentListCommand(mediaParams));
    return media;
}

class Converter {
    // WAVファイルの生成
    static createWav(samples, sampleRate) {
        const len = samples.byteLength;
        const view = new DataView(new ArrayBuffer(44 + len));
        this._writeString(view, 0, 'RIFF');
        view.setUint32(4, 32 + len, true);
        this._writeString(view, 8, 'WAVE');
        this._writeString(view, 12, 'fmt ');
        view.setUint32(16, 16, true);
        view.setUint16(20, 1, true); // リニアPCM
        view.setUint16(22, 1, true); // モノラル
        view.setUint32(24, sampleRate, true); 
        view.setUint32(28, sampleRate * 2, true);
        view.setUint16(32, 2, true);
        view.setUint16(34, 16, true);
        this._writeString(view, 36, 'data');
        view.setUint32(40, len, true);
        let offset = 44;
        const srcView = new DataView(samples);
        for (var i = 0; i < len; i+=4, offset+=4) {
            view.setInt32(offset, srcView.getUint32(i));
        }
        return view;
    }
     
    static _writeString(view, offset, string) {
        for (var i = 0; i < string.length; i++) {
          view.setUint8(offset + i, string.charCodeAt(i));
        }
    }
}

おまけ:代表電話を作る

留守番電話フローの手前でキューやエージェントの状態確認を行い、対応可能な人員がいる場合は通話をキューに渡すようにすると良いでしょう。また、通話に関する情報取得の lambda はメールや Slack 等への着信通知の実行にも使えるでしょう。

参考にした記事

https://dev.classmethod.jp/articles/amazon-connect-voice-mail-from-kinesis-video-stream/

https://qiita.com/echolimitless/items/e5044c73656f8ae29f93#目次

https://qiita.com/enumura1/items/4b2bc98870cd95daca23

https://dev.classmethod.jp/articles/amazon-connect-kinesis-video-stream-media-data/

Discussion