关于redis‐py的Bug解析 - CongGreat/async-await GitHub Wiki
问题复现
import asyncio
from asyncio import run
from redis.asyncio import Redis
from redis.asyncio.connection import BlockingConnectionPool, ConnectionPool
async def handler(r):
# await sleep(0)
await r.get("foo")
async def sleep(time):
await asyncio.sleep(time)
async def main():
async with Redis(single_connection_client=True) as r:
await r.set('bar', 'bar')
await r.set('foo', 'foo')
task = asyncio.create_task(handler(r))
await sleep(0) # 模拟IO
task.cancel()
try:
await t
print("业务层任务未取消")
except asyncio.CancelledError:
print('业务层任务已取消')
print('bar:', await r.get('bar'))
print('foo:', await r.get("foo"))
if __name__ == '__main__':
run(main())
使用redis-py(4.4.0及以上版本,4.3.5以下版本不会出现问题) 执行上述代码的时候,结果出现混乱。
原因
先上直接原因:4.3.5和4.4.0对比,4.3.5对所有错误进行了链接断开处理,在4.4.0版本进行了修改,对asyncio.CanceledError单独捕获,却没有进行处理。导致这个带有buffer的连接被后续继续使用,出现了返回值混乱现象。
# redis-py源码(redis/asyncio/connection.py)
# 4.3.5
async def read_response_without_lock(self, disable_decoding: bool = False):
"""Read the response from a previously sent command"""
try:
if self.socket_timeout:
async with async_timeout.timeout(self.socket_timeout):
response = await self._parser.read_response(
disable_decoding=disable_decoding
)
else:
response = await self._parser.read_response(
disable_decoding=disable_decoding
)
except asyncio.TimeoutError:
await self.disconnect()
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
except OSError as e:
await self.disconnect()
raise ConnectionError(
f"Error while reading from {self.host}:{self.port} : {e.args}"
)
except BaseException:
await self.disconnect()
raise
if self.health_check_interval:
if sys.version_info[0:2] == (3, 6):
func = asyncio.get_event_loop
else:
func = asyncio.get_running_loop
self.next_health_check = func().time() + self.health_check_interval
if isinstance(response, ResponseError):
raise response from None
return response
# 4.4.0
async def read_response(
self,
disable_decoding: bool = False,
timeout: Optional[float] = None,
):
"""Read the response from a previously sent command"""
read_timeout = timeout if timeout is not None else self.socket_timeout
try:
if read_timeout is not None:
async with async_timeout.timeout(read_timeout):
response = await self._parser.read_response(
disable_decoding=disable_decoding
)
else:
response = await self._parser.read_response(
disable_decoding=disable_decoding
)
except asyncio.TimeoutError:
if timeout is not None:
# user requested timeout, return None
return None
# it was a self.socket_timeout error.
await self.disconnect(nowait=True)
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
except OSError as e:
await self.disconnect(nowait=True)
raise ConnectionError(
f"Error while reading from {self.host}:{self.port} : {e.args}"
)
except asyncio.CancelledError:
# need this check for 3.7, where CancelledError
# is subclass of Exception, not BaseException
raise
except Exception:
await self.disconnect(nowait=True)
raise
if self.health_check_interval:
next_time = asyncio.get_running_loop().time() + self.health_check_interval
self.next_health_check = next_time
if isinstance(response, ResponseError):
raise response from None
return response
详细分析
redis-py, asyncio, redis-server交互图
sequenceDiagram
redis-py ->> +asyncio:建立redis conn,send command
asyncio ->> +redis-server:调用writer,写入redis
redis-server ->> asyncio:asyncio调用reader/feed_data(buffer),redis-server返回response
asyncio ->> asyncio: signal notify,调用reader/feed(buffer)
asyncio ->> redis-py:返回response给redis-py
-
task.cancel() 取消了正在运行中的协程,而这个协程包含2个步骤的IO, 发送和接收, 发送:将redis业务层命令发送到redis服务,即将redis命令写入到socket流中,接收:接受redis服务返回的报文,即将报文读到缓存buffer中。
-
在asyncio中,封装了reader和writer2个对象对socket进行读写,分别在2个协程中进行操作,redis-py的异步连接正是基于asyncio。
-
当asyncio连接到redis服务后,建立一个conn对象,管理着其内部所有的信息,(此时reader和writer对象已经就绪,监听redis服务端口的socket),
-
当进行业务层的redis操作时,通过writer将命令发送到redis服务,redis执行命令后并回复报文。reader将报文写入到缓存buffer中,再异步读这个buffer回复给业务层,最后删除这个buffer。(注意:这个异步读取Buffer的协程会一直异步阻塞)
-
在发送了命令后,取消这个业务层的task,那这个task内部正在运行的协程被取消(正是这个异步阻塞的reader读buffer的这个协程)就会抛出asyncio.CanceledError,导致这个reader清理buffer。上层业务也收到这个task被取消了。但是其实它的命令已经发送并且报文已经收到,取消的是读buffer发送到业务层和删除buffer的协程。所以导致这个连接conn对象下的buffer就会残留上一个被取消的报文。最终导致了取值混乱的现象。
redis-py源码(redis/asyncio/client.py),先发送命令,后解析返回报文的
async def _send_command_parse_response(self, conn, command_name, *args, **options):
"""
Send a command and parse the response
"""
await conn.send_command(*args)
return await self.parse_response(conn, command_name, **options)
redis/asyncio/connection.py)reader读取buffer的信息
async def read_from_socket(self):
buffer = await self._stream.read(self._read_size)
if not buffer or not isinstance(buffer, bytes):
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
self._reader.feed(buffer)
# data was read from the socket and added to the buffer.
# return True to indicate that data was read.
return True
(asyncio/stream.py)reader读取buffer中的信息,注释部分为信号函数,如果没有buffer,reader就会异步阻塞,等待有信息再读取。业务层取消task,就是取消了此处异步阻塞的协程,导致后面del buffer未执行,前一个命令的buffer未清除。
async def read(self, n=-1):
"""Read up to `n` bytes from the stream.
If n is not provided, or set to -1, read until EOF and return all read
bytes. If the EOF was received and the internal buffer is empty, return
an empty bytes object.
If n is zero, return empty bytes object immediately.
If n is positive, this function try to read `n` bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
received before any byte is read, this function returns empty byte
object.
Returned value is not limited with limit, configured at stream
creation.
If stream was paused, this function will automatically resume it if
needed.
"""
if self._exception is not None:
raise self._exception
if n == 0:
return b''
if n < 0:
# This used to just loop creating a new waiter hoping to
# collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit
# bytes. So just call self.read(self._limit) until EOF.
blocks = []
while True:
block = await self.read(self._limit)
if not block:
break
blocks.append(block)
return b''.join(blocks)
# 信号函数
if not self._buffer and not self._eof:
await self._wait_for_data('read')
# This will work right even if buffer is less than n bytes
data = bytes(self._buffer[:n])
del self._buffer[:n]
self._maybe_resume_transport()
return data
解决方案:
- 让这个task任务执行完,即不取消task。通过其他方式来实现这个逻辑。
- handler函数加一个sleep(0)就可以不会出现,所以如上述原因,这个问题的出现,只在一个协程A发送出了redis命令,并且收到了回复报文写到了buffer中,在读取和删除buffer的时候,而在另一个协程B在业务取消了A协程,才会出现这个问题。倘若在业务层发送命令之前后者已经删除buffer之后取消,都不会这个问题。上述代码是单一连接的情况,并且在逻辑部分是简化操作,而在实际业务中,有很多IO操作,所以需要大量的连接和大量的取消操作,才能恰好出现此类问题。
- 修改redis-py源码,捕获asyncio.CanceledError错误时,向上层抛错,业务层报错,重置这次业务操作
- 修改redis-py源码,在捕获到asyncio.CanceledError错误时,就将当前连接断开,重新连接。那么这个连接对象的reader中的buffer就会释放。缺点就是在这种取消task的时候,会降低连接池效率。
redis-py 4.3.5版本和4.4.0版本产生差异的原因,就是此处。前者对异常统一断开了连接,后者只是抛了个空。
在最新的版本中,redis-py维护者已经将这个问题修复,方法和(4)一样。 https://github.com/redis/redis-py/pull/2695/files
总结
综上, 在asyncio中使用协程任务并发,异步操作的时候,对协程状态的改变应当谨慎,它只能改变用户态的协程状态,而无法对已经正在内核态发生调用或已执行IO正在回调的任务进行改变。应该自行对任务asyncio.CanceledError的错误进行处理。