AsyncIO for the working PyGame programmer (part II)
In part I, we learned about asyncio
and coroutines. Functions that are declared with async def
are called coroutine functions. You can use them to create coroutine objects, and schedule these as coroutine tasks in an event loop. Although an asyncio
event loop runs single-threaded, it can execute multiple coroutines concurrently, by scheduling coroutines to run when i/o events arrive, and suspending them whenever they await
another coroutine.
Let's look at an extended version of the example game from the last post. Instead of printing a line to standard output, we simulate an online API by using the requests
module to send ten HTTP POST
messages to httpbin.org1.
If you play this version, you will probably notice a stuttering delay when you get the achievement, even when your network connection is quite fast. The HTTP requests happen sequentially, and they block2 until the response has arrived.
import pygame import requests import getpass pygame.init() blob_yposition=30 blob_yspeed=0 achievement=False gravity=1 screen_size=640,480 screen=pygame.display.set_mode(screen_size) clock=pygame.time.Clock() running=True flying_frames=0 best=0 color=(50,50,50) font=pygame.font.SysFont("Helvetica Neue,Helvetica,Ubuntu Sans,Bitstream Vera Sans,DejaVu Sans,Latin Modern Sans,Liberation Sans,Nimbus Sans L,Noto Sans,Calibri,Futura,Beteckna,Arial", 16) while running: clock.tick(30) events=pygame.event.get() for e in events: if e.type==pygame.QUIT: running=False if e.type==pygame.KEYDOWN and e.key==pygame.K_UP: blob_yspeed+=10 # ... # move sprites around, collision detection, etc blob_yposition+=blob_yspeed blob_yspeed-=gravity if blob_yposition<=30: blob_yspeed=0 blob_yposition=30 flying_frames=0 else: flying_frames+=1 if flying_frames>best: best=flying_frames if not achievement and best>300: # 10 seconds payload=dict(name=getpass.getuser(), title="ten seconds", subtitle="last for ten seconds without touching the ground") try: for i in range(10): response = requests.post("https://httpbin.org/post", data=payload) if response.status_code == requests.codes.ok: print("achievement posted") print(response.content) else: print("something went wrong") except requests.exceptions.ConnectionError: print("something went wrong") achievement=True color=(100,0,0) if blob_yposition>480: blob_yposition=480 blob_yspeed=-1*abs(blob_yspeed) # ... # draw screen.fill((255,255,255)) pygame.draw.rect(screen,color, pygame.Rect(screen_size[0]/2, screen_size[1]-blob_yposition, 18,25)) fps=clock.get_fps() message=f"current:{flying_frames//30}, best:{best//30}, fps:{fps:.2f}" surf=font.render(message, True, (0,0,0)) screen.blit(surf,(0,0)) pygame.display.update() print("Thank you for playing!")
This sequence diagram shows the problem. During the updating phase of the game loop, we wait for a reply from the server. Even if we sent the request to the server after we drew and updated the frame, any waiting longer than 33 milliseconds would inevitably delay the next frame after that.
Here is a version of the same game, but with aiohttp
instead of requests. We use the run_once()
snippet shown in part I to for fine-grained control over the event-loop.
Instead of directly running the asynchronous task/coroutine in the updating part of our game loop, we only create a coroutine object and schedule it as a task with loop.create_task()
, without actually running it, e.g. with loop.run_until_complete()
.
Then, after updating the game state and drawing on the screen, we only invoke run_once()
to respond to events. We run the event loop before clock.tick()
, but after rendering.
import pygame import aiohttp # instead of requests import asyncio import getpass pygame.init() blob_yposition=30 blob_yspeed=0 achievement=False gravity=1 screen_size=640,480 screen=pygame.display.set_mode(screen_size) clock=pygame.time.Clock() running=True flying_frames=0 best=0 color=(50,50,50) font=pygame.font.SysFont("Helvetica Neue,Helvetica,Ubuntu Sans,Bitstream Vera Sans,DejaVu Sans,Latin Modern Sans,Liberation Sans,Nimbus Sans L,Noto Sans,Calibri,Futura,Beteckna,Arial", 16) # NEW CODE loop = asyncio.get_event_loop() async def post_achievement(): # Note that it says "async def", # post_achievement() creates a coroutine that you can't run directly # you have to create a task in an event loop to run it payload=dict(name=getpass.getuser(), title="ten seconds", subtitle="last for ten seconds without touching the ground") async with aiohttp.ClientSession() as session: # await returns the control flow to the event loop # until there is data available response= await session.post('http://httpbin.org/post', data=payload) if response.status==200: print("achievement posted") # await again, maybe response body is large body = await response.read() print(body) return body else: print("something went wrong") def run_once(loop): loop.call_soon(loop.stop) loop.run_forever() while running: clock.tick(30) events=pygame.event.get() for e in events: if e.type==pygame.QUIT: running=False if e.type==pygame.KEYDOWN and e.key==pygame.K_UP: blob_yspeed+=10 # ... # move sprites around, collision detection, etc blob_yposition+=blob_yspeed blob_yspeed-=gravity if blob_yposition<=30: blob_yspeed=0 blob_yposition=30 flying_frames=0 else: flying_frames+=1 if flying_frames>best: best=flying_frames # 300 frames=10 seconds if not achievement and best>300: # NEW CODE # Create coroutines and add them to the event loop as tasks # This does not actually run the coroutines yet! for i in range(10): loop.create_task(post_achievement()) achievement=True color=(100,0,0) if blob_yposition>480: blob_yposition=480 blob_yspeed=-1*abs(blob_yspeed) # ... # draw screen.fill((255,255,255)) pygame.draw.rect(screen,color, pygame.Rect(screen_size[0]/2, screen_size[1]-blob_yposition, 18,25)) fps=clock.get_fps() # NEW CODE # display number of currently running tasks in event loop ntasks=len(asyncio.Task.all_tasks(loop)) message=f"current:{flying_frames//30}, best:{best//30}, fps:{fps:.2f}, tasks:{ntasks}" surf=font.render(message, True, (0,0,0)) screen.blit(surf,(0,0)) pygame.display.update() # tell event loop to run once # if there are no i/o events, this might return right away # if there are events or tasks that don't need to wait for i/o, then # run ONE task until the next "await" statement run_once(loop) # we run this *after* display.update(), but *before* # clock.tick(fps) and getting input events. This way i/o only eats # into the time when clock.tick(fps) would wait anyway. while len(asyncio.Task.all_tasks(loop)): run_once(loop) loop.shutdown_asyncgens() loop.close() print("Thank you for playing!")
If you play this game, or look at the source code, you will see that multiple tasks with HTTP requests are scheduled at the same time, instead of sequentially. Unless you have a very slow connection or upload huge files so that connection throughput becomes a bottleneck, this reduces waiting times immensely, because all waiting happens at once. In games, you usually need low latency, but not a lot of throughput.
As you can see in this sequence diagram, this still runs single-threaded, but as long as there is no response from the server, run_once()
will just immediately return and not slow down your game loop!
In part III, we will look at how this could have been done with threads, and which pitfalls of threading can also apply to coroutines. Stay tuned!
Footnotes:
This service by the developer of requests
just echoes back your request parameters as JSON. Don't overuse it. Your login name from getpass is just takes as an example. If you don't want to your username to show up in somebody's server logs, please edit the script!
I used requests
here because it is a well-tested and reasonably optimised library with a non-blocking alternative. If you don't care about blocking, requests
is really nice. Not every problem needs asyncio
.