Building Application with FreeSwitch: Python socket - kerbsx/fs GitHub Wiki
So instead of using the ESL library that come with Freeswitch, let's try to implement our own using pure python. The ESL lib from freeswitch was implemented in C and generated as Python module using SWIG. Having pure Python implementation will make it easier to debug.
This is actually taken from greenswitch implementation, but without the gevent stuff and the event handler. Similar to Freeswitch ESL lib, let's leave the event handler to the application using this lib.
import socket
import logging
from six.moves.urllib.parse import unquote
class NotConnectedError(Exception):
pass
class ESLEvent(object):
def __init__(self, data):
self.parse_data(data)
def parse_data(self, data):
headers = {}
data = unquote(data)
data = data.strip().splitlines()
last_key = None
value = ''
for line in data:
if ': ' in line:
key, value = line.split(': ', 1)
last_key = key
else:
key = last_key
value += '\n' + line
headers[key.strip()] = value.strip()
self.headers = headers
class ESL:
def __init__(self, host, port, password=None):
self.host = host
self.port = port
self.password = password
self.connected = False
self._run = True
self._EOL = '\n'
self._event_ready = False
def connect(self, timeout=None):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(timeout)
self.sock.connect((self.host, self.port))
self.connected = True
self.sock.settimeout(None)
self.sock_file = self.sock.makefile()
def send(self, data):
if not self.connected:
raise NotConnectedError()
raw_msg = (data + self._EOL*2).encode('utf-8')
self.sock.send(raw_msg)
def receive_events(self):
buf = ''
while self._run:
try:
data = self.sock_file.readline()
except Exception:
self._run = False
self.connected = False
self.sock.close()
# logging.exception("Error reading from socket.")
break
if not data:
if self.connected:
logging.error("Error receiving data, is FreeSWITCH running?")
self.connected = False
break
# Empty line
if data == self._EOL:
event = ESLEvent(buf)
buf = ''
if not self._event_ready:
self._authenticate(event)
else:
yield event
continue
buf += data
def _authenticate(self, event):
if event.headers.get('Content-Type') == 'auth/request':
self.send('auth %s' % self.password)
if event.headers.get('Content-Type') == 'command/reply':
if event.headers['Reply-Text'] == '+OK accepted':
print('Ready to go !')
self.send('event plain all')
if event.headers['Reply-Text'] == '+OK event listener enabled plain':
self._event_ready = True
print(event.headers)
if __name__ == '__main__':
import sys
host = sys.argv[1]
port = int(sys.argv[2])
password = sys.argv[3]
ies = ESL(host, port, password)
ies.connect(timeout=10)
for event in ies.receive_events():
print(event.headers)
print()