Asyncio servers in Python

From what I read: “normal” architecture for a web-server is that one assings single thread to a client, this is mostly OK, but after certain number of clients your performance drops (because of thread memory overhead, context switching costs, and other things). Specific limit is hart to guess, and depends on OS, app, hardware and so on.

Asynchronous servers were supposedly a solution to this problem, but I didn’t really believe this. From what I understood you could always buy more frontend boxes and scale it this way.

But today I actually written a async server, this server is very simple, has no error control (it took me less than an hour, including reading manuals), protocol is very simple:

  1. Protocol is line based
  2. When any client sends a line of text to the server
  3. This line is sent to every another client.

It was written in Python 3.5, server uses PEP-492 and the asyncio library. Go ahead and read this PEP, there is even a working example (that I based upon).

I didn’t do extensive tests, but it seems that this server handles 4000 connections easily on single core, throughput is about 40K messages per second (still on single core).

Note

I wouldn’t rely on these numbers much, to do a proper tests I would have to:

  1. Add error handling to the server.
  2. Change the protocol to acknowledge results.
  3. Write some proper tests.

Tests are based on running 4 processes each spawning 1000 sockets to the server, and then writing one message per 100ms in yet another thread. Results were observed by looking at the output :)

Server works as follows:

  1. Keeps a set of all connections (sockets in essence)
  2. Each relieved line is sent to all sockets

Server code is here:

# -*- coding: utf-8 -*-

import asyncio
import threading
from asyncio.streams import StreamReader, StreamWriter

import time

from multiprocessing import Process, Lock, Condition


class AsynChat(object):

  def __init__(self, port, client_may_end_connection=False):
    self.clients = set()
    self.port = port
    self.client_may_end_connection=client_may_end_connection
    self.__loop = None

  async def handle_connection(self, reader:StreamReader, writer:StreamWriter):
    self.clients.add(writer)
    while True:
      data = await reader.readline()
      print(data.strip())
      if self.client_may_end_connection and data.strip() == b'END':
        print("STOP")
        self.__loop.stop()
      for w in self.clients:
        if w == writer:
          continue
        w.write(data)
        await w.drain()
      if not data:
        if writer in self.clients:
          self.clients.remove(writer)
          try:
            writer.write_eof()
          except OSError:
            pass # Sometimes it explodes if socket was closed very soon, didn't investigate
          return

  async def echo_server(self):
      await asyncio.start_server(self.handle_connection, 'localhost', self.port)

  @classmethod
  def run_in_process(cls, *args, **kwargs) -> Process:
    c = Condition(Lock())
    def build_and_run(*args, **kwargs):
      ac = cls(*args, **kwargs)
      ac.run_loop(c)

    p = Process(target=build_and_run, args=args, kwargs=kwargs)
    p.start()
    with c:
      c.wait()

    return p

  def run_loop(self, c:Condition=None):
    self.__started = True
    self.__loop = asyncio.get_event_loop()
    self.__loop.run_until_complete(self.echo_server())

    def notif():
      with c:
        c.notify()

    try:
      if c:
        self.__loop.call_soon(notif)
      self.__loop.run_forever()
    finally:
      self.__loop.close()



if __name__ == "__main__":
  a = AsynChat(1234, True)
  a.run_loop()