All posts

Transcribing Twilio Voice Calls in Real-Time with Deepgram

Transcribing Twilio Voice Calls in Real-Time with Deepgram

Twilio is a very popular voice platform, and Deepgram is a great automatic speech recognition (ASR) solution, so there is great value in integrating the two. This tutorial will guide you through building an integration that allows multiple client subscribers to watch live transcripts from ongoing Twilio calls. The code for this tutorial is located here.

Pre-requisites

You will need:

Setting Up A TwiML Bin

We need to tell Twilio to fork audio data from calls going to your Twilio number to the server we are going to write. This is done via “TwiML Bin”s. In the Twilio Console, search for TwiML Bin, and click “Create TwiML Bin.”

Navigate to your TwiML Bins.

Give the TwiML Bin a “Friendly Name” - something like “Streaming” or “Deepgram Streaming,” and then make the contents of the TwiML Bin the following:

<?xml version="1.0" encoding="UTF-8"?>
<Response>
  <Start>
    <Stream url="wss://INSERT_YOUR_SERVER_URL/twilio" track="both_tracks"/>
  </Start>
  <Say voice="woman" language="en">"This call may be monitored or recorded for quality purposes."</Say>
  <Dial>+11231231234</Dial>
</Response>

Replace the number in the Dial section with the phone number you want incoming calls to be forwarded to (this should not be your Twilio number, it should be the number of a real phone in your possession!). Then, where it says INSERT_YOUR_SERVER_URL type in the URL where you will be running your server. Without having to setup an AWS or DigitalOcean server, you can use ngrok to expose a local server. To expose port 5000 on your computer, you can use ngrok as follows:

ngrok http 5000

ngrok will then tell you the public URL which points to your localhost:5000. Your URL may end up looking something like: c52e-71-212-124-133.ngrok.io.

Now, we need to connect this TwiML Bin to your Twilio phone number. Go to the “Develop” tab on the left side of the Twilio Console, navigate to Phone Numbers -> Manage -> Active numbers, and click on your Twilio number in the list. Then, under the field “A Call Comes In,” click the drop-down and select “TwiML Bin”; for the field directly next to this one, click the drop-down and select “Streaming” (or whatever your TwiML Bin’s “Friendly Name” is). Finally, click “Save” at the bottom of the Twilio Console. Everything on Twilio’s side should now be set up, and we are ready to move on to the Deepgram integration server!

The Twilio Proxy Server

Let’s take a look at the system we are building here:

The big picture.

We have pairs of callers, inbound and outbound, and, for each call passing through Twilio’s servers, Twilio is able to fork the audio from the call to our proxy server via websockets. Our server then has to do some light processing of that audio, forward it on to Deepgram, receive transcripts back from Deepgram, and forward those transcripts on to potentially multiple clients who are subscribed to watch the call’s transcripts. So in order to view real-time transcripts in a client application, our backend server must maintain a minimum of three websockets connections - we can see how this can get complicated, especially when dealing with many concurrent Twilio calls and subscribed clients!

Download the code from this repository. It contains a single file, twilio.py!

Let’s look at the code (make sure to replace INSERT_YOUR_DEEPGRAM_API_KEY with your Deepgram API Key):

import asyncio
import base64
import json
import sys
import websockets
import ssl
from pydub import AudioSegment

subscribers = {}

def deepgram_connect():
 extra_headers = {
  'Authorization': 'Token INSERT_YOUR_DEEPGRAM_API_KEY'
 }
 deepgram_ws = websockets.connect('wss://api.deepgram.com/v1/listen?encoding=mulaw&sample_rate=8000&channels=2&multichannel=true', extra_headers = extra_headers)

 return deepgram_ws

async def twilio_handler(twilio_ws):
 audio_queue = asyncio.Queue()
 callsid_queue = asyncio.Queue()

 async with deepgram_connect() as deepgram_ws:

  async def deepgram_sender(deepgram_ws):
   print('deepgram_sender started')
   while True:
    chunk = await audio_queue.get()
    await deepgram_ws.send(chunk)

  async def deepgram_receiver(deepgram_ws):
   print('deepgram_receiver started')
   # we will wait until the twilio ws connection figures out the callsid
   # then we will initialize our subscribers list for this callsid
   callsid = await callsid_queue.get()
   subscribers[callsid] = []
   # for each deepgram result received, forward it on to all
   # queues subscribed to the twilio callsid
   async for message in deepgram_ws:
    for client in subscribers[callsid]:
     client.put_nowait(message)

   # once the twilio call is over, tell all subscribed clients to close
   # and remove the subscriber list for this callsid
   for client in subscribers[callsid]:
    client.put_nowait('close')

   del subscribers[callsid]

  async def twilio_receiver(twilio_ws):
   print('twilio_receiver started')
   # twilio sends audio data as 160 byte messages containing 20ms of audio each
   # we will buffer 20 twilio messages corresponding to 0.4 seconds of audio to improve throughput performance
   BUFFER_SIZE = 20 * 160
   # the algorithm to deal with mixing the two channels is somewhat complex
   # here we implement an algorithm which fills in silence for channels if that channel is either
   #   A) not currently streaming (e.g. the outbound channel when the inbound channel starts ringing it)
   #   B) packets are dropped (this happens, and sometimes the timestamps which come back for subsequent packets are not aligned)
   inbuffer = bytearray(b'')
   outbuffer = bytearray(b'')
   inbound_chunks_started = False
   outbound_chunks_started = False
   latest_inbound_timestamp = 0
   latest_outbound_timestamp = 0
   async for message in twilio_ws:
    try:
     data = json.loads(message)
     if data['event'] == 'start':
      start = data['start']
      callsid = start['callSid']
      callsid_queue.put_nowait(callsid)
     if data['event'] == 'connected':
      continue
     if data['event'] == 'media':
      media = data['media']
      chunk = base64.b64decode(media['payload'])
      if media['track'] == 'inbound':
       # fills in silence if there have been dropped packets
       if inbound_chunks_started:
        if latest_inbound_timestamp + 20 < int(media['timestamp']):
         bytes_to_fill = 8 * (int(media['timestamp']) - (latest_inbound_timestamp + 20))
         # NOTE: 0xff is silence for mulaw audio
         # and there are 8 bytes per ms of data for our format (8 bit, 8000 Hz)
         inbuffer.extend(b'\xff' * bytes_to_fill)
       else:
        # make it known that inbound chunks have started arriving
        inbound_chunks_started = True
        latest_inbound_timestamp = int(media['timestamp'])
        # this basically sets the starting point for outbound timestamps
        latest_outbound_timestamp = int(media['timestamp']) - 20
       latest_inbound_timestamp = int(media['timestamp'])
       # extend the inbound audio buffer with data
       inbuffer.extend(chunk)
      if media['track'] == 'outbound':
       # make it known that outbound chunks have started arriving
       outbound_chunked_started = True
       # fills in silence if there have been dropped packets
       if latest_outbound_timestamp + 20 < int(media['timestamp']):
        bytes_to_fill = 8 * (int(media['timestamp']) - (latest_outbound_timestamp + 20))
        # NOTE: 0xff is silence for mulaw audio
        # and there are 8 bytes per ms of data for our format (8 bit, 8000 Hz)
        outbuffer.extend(b'\xff' * bytes_to_fill)
       latest_outbound_timestamp = int(media['timestamp'])
       # extend the outbound audio buffer with data
       outbuffer.extend(chunk)
     if data['event'] == 'stop':
      break

     # check if our buffer is ready to send to our audio_queue (and, thus, then to deepgram)
     while len(inbuffer) >= BUFFER_SIZE and len(outbuffer) >= BUFFER_SIZE:
      asinbound = AudioSegment(inbuffer[:BUFFER_SIZE], sample_width=1, frame_rate=8000, channels=1)
      asoutbound = AudioSegment(outbuffer[:BUFFER_SIZE], sample_width=1, frame_rate=8000, channels=1)
      mixed = AudioSegment.from_mono_audiosegments(asinbound, asoutbound)

      # sending to deepgram via the audio_queue
      audio_queue.put_nowait(mixed.raw_data)

      # clearing buffers
      inbuffer = inbuffer[BUFFER_SIZE:]
      outbuffer = outbuffer[BUFFER_SIZE:]
    except:
     break

   # the async for loop will end if the ws connection from twilio dies
   # and if this happens, we should forward an empty byte to deepgram
   # to signal deepgram to send back remaining messages before closing
   audio_queue.put_nowait(b'')

  await asyncio.wait([
   asyncio.ensure_future(deepgram_sender(deepgram_ws)),
   asyncio.ensure_future(deepgram_receiver(deepgram_ws)),
   asyncio.ensure_future(twilio_receiver(twilio_ws))
  ])

  await twilio_ws.close()

async def client_handler(client_ws):
 client_queue = asyncio.Queue()

 # first tell the client all active calls
 await client_ws.send(json.dumps(list(subscribers.keys())))

 # then recieve from the client which call they would like to subscribe to
 # and add our client's queue to the subscriber list for that call
 try:
  # you may want to parse a proper json input here
  # instead of grabbing the entire message as the callsid verbatim
  callsid = await client_ws.recv()
  callsid = callsid.strip()
  if callsid in subscribers:
   subscribers[callsid].append(client_queue)
  else:
   await client_ws.close()
 except:
  await client_ws.close()

 async def client_sender(client_ws):
  while True:
   message = await client_queue.get()
   if message == 'close':
    break
   try:
    await client_ws.send(message)
   except:
    # if there was an error, remove this client queue
    subscribers[callsid].remove(client_queue)
    break

 await asyncio.wait([
  asyncio.ensure_future(client_sender(client_ws)),
 ])

 await client_ws.close()

async def router(websocket, path):
 if path == '/client':
  print('client connection incoming')
  await client_handler(websocket)
 elif path == '/twilio':
  print('twilio connection incoming')
  await twilio_handler(websocket)

def main():
 # use this if using ssl
# ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# ssl_context.load_cert_chain('cert.pem', 'key.pem')
# server = websockets.serve(router, '0.0.0.0', 443, ssl=ssl_context)

 # use this if not using ssl
 server = websockets.serve(router, 'localhost', 5000)

 asyncio.get_event_loop().run_until_complete(server)
 asyncio.get_event_loop().run_forever()

if __name__ == '__main__':
 sys.exit(main() or 0)

This server uses the Python websocket library to connect to Twilio, Deepgram, and client applications, and the asyncio library to handle concurrent connections. The server has two routes: /twilio and /client. As we have configured in our TwiML Bin, Twilio will be connecting to and sending audio data to the /twilio endpoint, and we will use the /client endpoint for client applications which will watch the streaming transcripts.

The server uses a dictionary, called subscribers, to handle concurrent connected clients. Specifically, subscribers is a dictionary whose keys are Twilio callSids which uniquely identify calls, and whose values are a list of queues for clients who are “subscribed” to those calls (i.e. watching for streaming transcripts from those calls).

To dive into the code, let’s look at the client_handler function. When a client connects to the /client endpoint, the client_handler function will first send a websocket message to the client listing the callSids of all currently streaming calls. The function then waits to receive a websocket message which it expects to be the callSid of the call that the client wants to view live transcripts for (and if the function does not receive a valid callSid, it will bail). Having received a valid callSid, the function then inserts this client’s queue into the subscribers dictionary and starts an async task which reads from this queue, sending transcription results back to the client via websocket messages, or gracefully closing the websocket connection if the message “close” was received on the queue.

Now let’s jump into the more involved twilio_handler function. This function handles incoming websocket connections from Twilio, and begins by setting up a queue for audio data, and a queue to handle passing the incoming callSid between async tasks. It then connects to Deepgram and sets up three async tasks: deepgram_receiver, deepgram_sender, and twilio_receiver (we will never send websocket messages back to Twilio, hence no “twilio_sender” task).

The twilio_receiver task handles incoming websocket messages from Twilio. Before Twilio sends audio data, it will send some metadata as part of a start event. One of these pieces of metadata is the callSid of the call, and we will pass that on to the deepgram_receiver task via a queue. Then, when Twilio starts streaming media (i.e. audio) events, we will perform some logic to buffer and mix this audio. In particular, Twilio will stream audio in via separate inbound and outbound audio tracks; we must make sure we mix these two audio tracks together as correct stereo audio to pass on to Deepgram. Some issues arise if call packets are dropped from one of these tracks, and logic is implemented with ample comments to deal with this without having the two channels in the mixed stereo audio get out of sync. Finally, with correctly mixed audio buffers prepared, twilio_receiver will pass this audio on to the deepgram_sender task via a queue. The deepgram_sender task then simply passes this audio on to Deepgram via the Deepgram websocket handle.

Finally, we get to the deepgram_receiver task. In order to pass transcripts from Deepgram on to subscribed clients, we must first know the callSid of the call, so the first thing deepgram_receiver does is wait to obtain this from the twilio_receiver via a queue. Once the callSid is obtained, the deepgram_receiver is then able to forward on all transcription results from Deepgram to all clients subscribed to that callSid. It does this via another queue, which is handled by the async task defined in client_handler, and thus we come full circle.

Running the Server and Testing with WebSocat

To run the server, first pip3 install the websockets, pydub, and asyncio libraries, and then run:

python3 twilio.py

If you are running this on your own cloud server, make sure port 5000 is accessible. If you followed the optional suggestion of using ngrok, this should be all set up simply by running ngrok http 5000 on a separate terminal.

To quickly test the integration, start a call to your Twilio number - this call will be forwarded to the phone number in the Dial section of your TwiML Bin, so you will need two phones (so feel free to grab a friend, or set up a Google Voice account or something similar!).

After the phone call has started, use a tool like websocat to connect to ws://localhost:5000/client. Upon connecting, the server should output a list of the callSids of ongoing calls (it should be a list of exactly one call at this point); reply to the server with one of these callSids and watch the Deepgram transcription responses roll in! You can start multiple clients and have them all subscribe to the same callSid to see how a concurrent system could work.

Using websocat to view the transcripts.

Further Development

The Deepgram-Twilio integration design presented here is slightly opinionated, in the interest of getting a reasonably complete demo up and running. You may want to factor in authentication, as the /client endpoint explained here is completely unauthenticated. You also may want to find an alternate way of labelling calls to subscribe to - instead of grabbing callSids, one could subscribe directly to Twilio numbers, but this would require extra Twilio API integration to look up the status of calls to your Twilio numbers.

Another clear next step would be to develop a proper client application. Programs like websocat are fantastic for testing, but you will likely want to design a front-end application which handles selecting callSids to subscribe to, parses and formats the Deepgram transcription response, and possibly other features.

If you have any questions, please feel free to reach out on Twitter - we’re @DeepgramDevs.

More with these tags:

Share your feedback

Thank you! Can you tell us what you liked about it? (Optional)

Thank you. What could we have done better? (Optional)

We may also want to contact you with updates or questions related to your feedback and our product. If don't mind, you can optionally leave your email address along with your comments.

Thank you!

We appreciate your response.