python yield scheduler - modrpc/info GitHub Wiki

Yield-From Example 2: A Scheduler for Generator-Based Threads
=============================================================

Just for fun, let's write a thread scheduler.

Each thread will be a generator. Whenever a thread wants to suspend
itself, it will do a 'yield'. We won't make any use of values sent
or received by yields in this example.

We'll start with a global variable to hold the currently running
thread.

  current = None

We'll also want a queue of threads that are waiting to run.

  ready_list = []

The first thing we'll want is a way of getting a thread into
the scheduling system.

  def schedule(g):
    ready_list.append(g)

The core loop of the scheduler will repeatedly take the thread at the
head of the queue and run it until it yields.

  def run():
    global current
    while ready_list:
      g = ready_list[0]
      current = g
      try:
        g.next()
      except StopIteration:
        unschedule(g)
      else:
        expire_timeslice(g)

If the thread is still at the head of the ready list after it has
yielded, we move it to the end, so that the ready threads will run
round-robin fashion.

  def expire_timeslice(g):
    if ready_list and ready_list[0] is g:
      del ready_list[0]
      ready_list.append(g)
 
When the thread finishes, we use the following function to remove
it from the scheduling system.

  def unschedule(g):
    if g in ready_list:
      ready_list.remove(g)

We've got enough so far to try a simple test.

  def person(name, count):
    for i in xrange(count):
      print name, "running"
      yield

  schedule(person("John", 2))
  schedule(person("Michael", 3))
  schedule(person("Terry", 4))
  
  run()

We can run this with present-day Python, and we get

  John running
  Michael running
  Terry running
  John running
  Michael running
  Terry running
  Michael running
  Terry running
  Terry running


Waiting for Resources
---------------------

Things get more interesting when our threads do something non-trivial.

Let's turn our people into dining philosophers. For this we'll need a
way to represent forks (the eating kind, not the unix kind) and a way
for a thread to wait for one to become available.

But before launching into that, let's add two more functions to our
scheduler that will come in useful.

  def block(queue):
    queue.append(current)
    unschedule(current)

This removes the currently running thread from the ready list and
adds it to a list that you specify.

  def unblock(queue):
    if queue:
      g = queue.pop(0)
      schedule(g)

This removes the thread at the head of the specified list, if
any, and adds it to the ready list.

Now we can start implementing an eating utensil.

  class Utensil:
  
    def __init__(self, id):
      self.id = id
      self.available = True
      self.queue = []

The utensil has a flag indicating whether it's available, and a queue
of threads waiting to use it. To acquire a utensil, we first check to see
whether it is available. If not, we block the current thread on the
queue, and then yield. When we get to run again, it's our turn, so we
mark the utensil as being in use.

    def acquire(self):
      if not self.available:
        block(self.queue)
        yield
      self.available = False

To release the utensil, we mark it as available and then unblock the
thread at the head of the queue, if any.

    def release(self):
      self.available = True
      unblock(self.queue)

Next we need a life cycle for a philosopher.

  def philosopher(name, lifetime, think_time, eat_time, left_fork, right_fork):
    for i in xrange(lifetime):
      for j in xrange(think_time):
        print name, "thinking"
        yield
      print name, "waiting for fork", left_fork.id
      yield from left_fork.acquire()
      print name, "acquired fork", left_fork.id
      print name, "waiting for fork", right_fork.id
      yield from right_fork.acquire()
      print name, "acquired fork", right_fork.id
      for j in xrange(eat_time):
        # They're Python philosophers, so they eat spam rather than spaghetti
        print name, "eating spam"
        yield
      print name, "releasing forks", left_fork.id, "and", right_fork.id
      left_fork.release()
      right_fork.release()
    print name, "leaving the table"

Now we can set up a scenario.

  forks = [Utensil(i) for i in xrange(3)]
  schedule(philosopher("Plato", 7, 2, 3, forks[0], forks[1]))
  schedule(philosopher("Socrates", 8, 3, 1, forks[1], forks[2]))
  schedule(philosopher("Euclid", 5, 1, 4, forks[2], forks[0]))
  run()

We can't run this in current Python, because of the yield-froms.
However, we can test it by substituting for-loops such as

      for _ in left_fork.acquire(): yield 

After doing this, the output is

  Plato thinking
  Socrates thinking
  Euclid thinking
  Plato thinking
  Socrates thinking
  Euclid waiting for fork 2
  Euclid acquired fork 2
  Euclid waiting for fork 0
  Euclid acquired fork 0
  Euclid eating spam
  ...etc...


Waiting for External Events
---------------------------

So far our thread system has been completely self-absorbed and unable
to deal with the outside world. Let's arrange things so that, if there
are no threads ready to run, the scheduler will wait for some file
to become readable or writable using select(). It's easiest to do
this by writing a new main loop that builds on the previous one.

  def run2():
    while 1:
      run()
      if not wait_for_event():
        return

We will need a data structure to hold threads waiting for files. Each
file needs two queues associated with it, for threads waiting to read
and write respectively.

  class FdQueues:

    def __init__(self):
      self.readq = []
      self.writeq = []

We will keep a mapping from file objects to their associated FdQueue
instances.

  fd_queues = {}

The following function retrieves the queues for a given fd, creating
new ones if they don't already exist.

  def get_fd_queues(fd):
    q = fd_queues.get(fd)
    if not q:
      q = FdQueues()
      fd_queues[fd] = q
    return q

Now we can write a new pair of scheduling primitives to block on
a file.

  def block_for_reading(fd):
    block(get_fd_queues(fd).readq)

  def block_for_writing(fd):
    block(get_fd_queues(fd).writeq)

It's expected that the thread calling these will immediately yield
afterwards. We could incorporate the yield into these functions, but
we'll be building higher level functions on top of these shortly, and
it will be more convenient to do the yield there.

We'll also want a way of removing a file from the fd_queues when
we've finished with it, so we'll add a function to close it and
clean up.

  def close_fd(fd):
    if fd in fd_queues:
      del fd_queues[fd]
    fd.close()

Now we can write wait_for_event(). It's a bit longwinded, but fairly
straightforward. We build lists of file objects having nonempty read
or write queues, pass them to select(), and for each one that's ready,
we unblock the thread at the head of the relevant queue. If there are
no threads waiting on any files, we return False to tell the scheduler
there's no more work to do.

  def wait_for_event():
    from select import select
    read_fds = []
    write_fds = []
    for fd, q in fd_queues.iteritems():
      if q.readq:
        read_fds.append(fd)
      if q.writeq:
        write_fds.append(fd)
    if not (read_fds or write_fds):
      return False
    read_fds, write_fds, _ = select(read_fds, write_fds, [])
    for fd in read_fds:
      unblock(fd_queues[fd].readq)
    for fd in write_fds:
      unblock(fd_queues[fd].writeq)
    return True

At this point we can try a quick test to see if everything works
so far.

  def loop():
    while 1:
      print "Waiting for input"
      block_for_reading(stdin)
      yield
      print "Input is ready"
      line = stdin.readline()
      print "Input was:", repr(line)
      if not line:
        break

  schedule(loop())
  run2()

Sample session:

  Waiting for input
  asdf
  Input is ready
  Input was: 'asdf\n'
  Waiting for input
  qwer
  Input is ready
  Input was: 'qwer\n'
  Waiting for input
  Input is ready
  Input was: ''

It's not a very convincing test yet, though, since there's only
one thread, so let's play around with some sockets and build a
multithreaded server.


A Spam Server
-------------

We're going to implement the following protocol. The client sends the
word "SPAM" followed by a number, and the server replies with "100
SPAM FOLLOWS" and the corresponding number of repetitions of the
phrase "spam glorious spam". If the requested number is not greater
than zero or the request is malformed, the server replies "400 WE ONLY
SERVE SPAM".

We could do with some higher-level functions for blocking operations
on sockets, so let's write a few. First, accepting a connection from
a listening socket.

  def sock_accept(sock):
    block_for_reading(sock)
    yield
    return sock.accept()

Now reading a line of text from a socket. We keep reading until the
data ends with a newline or EOF is reached. (We're assuming that the
client will wait for a reply before sending another line, so we don't
have to worry about reading too much.) We also close the socket on
EOF, since we won't be reading from it again after that.

  def sock_readline(sock):
    buf = ""
    while buf[-1:] != "\n":
      block_for_reading(sock)
      yield
      data = sock.recv(1024)
      if not data:
        break
      buf += data
    if not buf:
      close_fd(sock)
    return buf

Writing data to a socket. We loop until all the data has been written.
We don't use sendall(), because it might block, and we don't want to
hold up other threads.

  def sock_write(sock, data):
    while data:
      block_for_writing(sock)
      yield
      n = sock.send(data)
      data = data[n:]

Now we're ready to write the main loop of the server. It will set up a
listening socket, then repeatedly accept connections and spawn a
thread to handle each one.

  port = 4200

  def listener():
    lsock = socket(AF_INET, SOCK_STREAM)
    lsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    lsock.bind(("", port))
    lsock.listen(5)
    while 1:
      csock, addr = yield from sock_accept(lsock)
      print "Listener: Accepted connection from", addr
      schedule(handler(csock))

The handler function handles the interaction with one client session.

  def handler(sock):
    while 1:
      line = yield from sock_readline(sock)
      if not line:
        break
      try:
        n = parse_request(line)
        yield from sock_write(sock, "100 SPAM FOLLOWS\n")
        for i in xrange(n):
          yield from sock_write(sock, "spam glorious spam\n")
      except BadRequest:
        yield from sock_write(sock, "400 WE ONLY SERVE SPAM\n")
  
The handler uses the following function to parse the request and check
it for validity.

  class BadRequest(Exception):
    pass
  
  def parse_request(line):
    tokens = line.split()
    if len(tokens) != 2 or tokens[0] != "SPAM":
      raise BadRequest
    try:
      n = int(tokens[1])
    except ValueError:
      raise BadRequest
    if n < 1:
      raise BadRequest
    return n

All we need to do now is spawn the main loop and run the scheduler.

  schedule(listener())
  run2()
  
At this point, I got fed up with expanding yield-from statements by
hand, and wrote a program to do it for me. It's called yfpp.py and you
can find it in the attached zip file.

Here's a sample client session:

  % telnet localhost 4200
  Trying 127.0.0.1...
  Connected to localhost.
  Escape character is '^]'.
  SPAM 3
  100 SPAM FOLLOWS
  spam glorious spam
  spam glorious spam
  spam glorious spam
  EGGS
  400 WE ONLY SERVE SPAM
  ^]
  telnet> Connection closed.
  %


Conclusions
-----------

The yield-from statement makes it possible to write thread code using
generators almost the same way as you would write ordinary code.

Whether it's any easier or clearer than using things like yield
Call(g(x)) and yield Return(x) is debatable. However, I think this
example does show that the implementation of a generator-based
scheduler can be very clean and simple when yield-from is available,
and if it is suitably optimised, probably more efficient as well.

All the code presented here is included in a runnable form in the
attached file Threads.zip.

-- 
Greg
⚠️ **GitHub.com Fallback** ⚠️