Skip to content

Latest commit

 

History

History
831 lines (659 loc) · 27.8 KB

协程.md

File metadata and controls

831 lines (659 loc) · 27.8 KB

协程

因为协程的实现和生成器有关, 所以关于生成器的实现查看生成器

数据结构

PyCoroObject:

/* _PyGenObject_HEAD defines the initial segment of generator
   and coroutine objects. */
#define _PyGenObject_HEAD(prefix)                                           \
    PyObject_HEAD                                                           \
    /* Note: gi_frame can be NULL if the generator is "finished" */         \
    PyFrameObject *prefix##_frame;                                          \
    /* True if generator is being executed. */                              \
    char prefix##_running;                                                  \
    /* The code object backing the generator */                             \
    PyObject *prefix##_code;                                                \
    /* List of weak reference. */                                           \
    PyObject *prefix##_weakreflist;                                         \
    /* Name of the generator. */                                            \
    PyObject *prefix##_name;                                                \
    /* Qualified name of the generator. */                                  \
    PyObject *prefix##_qualname;                                            \
    _PyErr_StackItem prefix##_exc_state;

typedef struct {
    _PyGenObject_HEAD(cr)
    PyObject *cr_origin;
} PyCoroObject;

从 PyCoroObject 的定义可以看出它和生成器(PyGenObject)几乎相同的.

从一个简单的例子开始

Python 代码:

import asyncio

async def f():
    await asyncio.sleep(5)
    return 10

print(f.__code__.co_flags)
print(asyncio.run(f()))

对应的字节码:

  1           0 LOAD_CONST               0 (0)
              2 LOAD_CONST               1 (None)
              4 IMPORT_NAME              0 (asyncio)
              6 STORE_NAME               0 (asyncio)

  3           8 LOAD_CONST               2 (<code object f at 0x7f06909329d0, file "mydemo/coro_demo.py", line 3>)
             10 LOAD_CONST               3 ('f')
             12 MAKE_FUNCTION            0
             14 STORE_NAME               1 (f)

  7          16 LOAD_NAME                2 (print)
             18 LOAD_NAME                0 (asyncio)
             20 LOAD_METHOD              3 (run)
             22 LOAD_NAME                1 (f)
             24 CALL_FUNCTION            0
             26 CALL_METHOD              1
             28 CALL_FUNCTION            1
             30 POP_TOP
             32 LOAD_CONST               1 (None)
             34 RETURN_VALUE

Disassembly of <code object f at 0x7f06909329d0, file "mydemo/coro_demo.py", line 3>:
  4           0 LOAD_GLOBAL              0 (asyncio)
              2 LOAD_METHOD              1 (sleep)
              4 LOAD_CONST               1 (5)
              6 CALL_METHOD              1
              8 GET_AWAITABLE
             10 LOAD_CONST               0 (None)
             12 YIELD_FROM
             14 POP_TOP

  5          16 LOAD_CONST               1 (10)
             18 RETURN_VALUE

code 对象的 co_flags 等于 195(0xc3), 即等于 CO_COROUTINE | CO_NOFREE | CO_NEWLOCALS | CO_OPTIMIZED.

和协程相关的字节码有: GET_AWAITABLE.

async

async 关键字的作用其实就是标记函数是返回协程的函数, code 对象的 co_flags 中会添加 CO_COROUTINE. 这样一来就可以区分协程和生成器.

async 生成新的协程对象(PyCoro_New)

调用 f() 会创建一个协程对象, 过程和创建生成器一样, 代码如下:

/* Handle generator/coroutine/asynchronous generator */
if (co->co_flags & (CO_GENERATOR | CO_COROUTINE | CO_ASYNC_GENERATOR)) {
    PyObject *gen;
    int is_coro = co->co_flags & CO_COROUTINE;

    /* Don't need to keep the reference to f_back, it will be set
      * when the generator is resumed. */
    Py_CLEAR(f->f_back);

    /* Create a new generator that owns the ready to run frame
      * and return that as the value. */
    if (is_coro) {
        gen = PyCoro_New(f, name, qualname);
    } else if (co->co_flags & CO_ASYNC_GENERATOR) {
        gen = PyAsyncGen_New(f, name, qualname);
    } else {
        gen = PyGen_NewWithQualName(f, name, qualname);
    }
    if (gen == NULL) {
        return NULL;
    }

    _PyObject_GC_TRACK(f);

    return gen;
}

retval = _PyEval_EvalFrame(tstate, f, 0);
PyObject *
PyCoro_New(PyFrameObject *f, PyObject *name, PyObject *qualname)
{
    // 注意: gen_new_with_qualname 的作用是创建 PyGenObject, 但是因为 PyCoroObject 和 PyGenObject 几乎一样, 
    // PyCoroObject 在结构体尾部多了一个字段 cr_origin, 所以可以复用 gen_new_with_qualname 用于创建 PyCoroObject
    PyObject *coro = gen_new_with_qualname(&PyCoro_Type, f, name, qualname);
    if (!coro) {
        return NULL;
    }

    PyThreadState *tstate = _PyThreadState_GET();
    int origin_depth = tstate->coroutine_origin_tracking_depth;

    if (origin_depth == 0) {
        ((PyCoroObject *)coro)->cr_origin = NULL;
    } else {
        PyObject *cr_origin = compute_cr_origin(origin_depth);
        ((PyCoroObject *)coro)->cr_origin = cr_origin;
        if (!cr_origin) {
            Py_DECREF(coro);
            return NULL;
        }
    }

    return coro;
}

从上面的代码可以看出, PyCoro_New 使用 gen_new_with_qualname 创建了一个 PyCoroObject 对象, 接下来将最近 origin_depth 个的 frame 保存到 coro->cr_origin 中.

compute_cr_origin 获取最近的 origin_depth 个 frame 的信息.

await

await 关键字对应的字节码为:

 8 GET_AWAITABLE
10 LOAD_CONST               0 (None)
12 YIELD_FROM

GET_AWAITABLE 对应的处理器:

case TARGET(GET_AWAITABLE): {
    PREDICTED(GET_AWAITABLE);
    PyObject *iterable = TOP();
    PyObject *iter = _PyCoro_GetAwaitableIter(iterable);

    if (iter == NULL) {
        int opcode_at_minus_3 = 0;
        if ((next_instr - first_instr) > 2) {
            opcode_at_minus_3 = _Py_OPCODE(next_instr[-3]);
        }
        format_awaitable_error(tstate, Py_TYPE(iterable),
                                opcode_at_minus_3,
                                _Py_OPCODE(next_instr[-2]));
    }

    Py_DECREF(iterable);

    if (iter != NULL && PyCoro_CheckExact(iter)) {
        PyObject *yf = _PyGen_yf((PyGenObject*)iter);
        if (yf != NULL) {
            /* `iter` is a coroutine object that is being
                awaited, `yf` is a pointer to the current awaitable
                being awaited on. */
            // yf != NULL 说明 iter 已经被执行过了
            Py_DECREF(yf);
            Py_CLEAR(iter);
            _PyErr_SetString(tstate, PyExc_RuntimeError,
                              "coroutine is being awaited already");
            /* The code below jumps to `error` if `iter` is NULL. */
        }
    }

    SET_TOP(iter); /* Even if it's NULL */

    if (iter == NULL) {
        goto error;
    }

    PREDICT(LOAD_CONST);
    DISPATCH();
}

可以看出 await 的基本原理就是 YIELD_FROM, 这再一次说明协程和生成器的关系. GET_AWAITABLE 将协程放入到栈顶, YIELD_FROM 的 receiver 就是该协程.

eventloop

eventloop 的各种方法分类:

  • 启动和关闭 loop

    • run_until_complete
    • run_forever
    • stop
    • close
  • 定时任务

    • call_soon
    • call_soon_threadsafe
    • call_later
    • call_at
  • 创建 future 和 task

    • create_future
    • create_task
  • 网络操作

    • create_connection
    • create_datagram_endpoint
    • create_unix_connection
    • create_server
    • create_unix_server
    • connect_accepted_socket
  • 监听文件描述符(I/O 复用)

    • add_reader
    • remove_reader
    • add_writer
    • remove_writer
  • 直接操作 socket

    • sock_recv
    • sock_recv_into
    • sock_sendall
    • sock_connect
    • sock_accept
    • sock_sendfile
  • DNS

    • getaddrinfo
    • getnameinfo
  • pipe

    • connect_read_pipe
    • connect_write_pipe
  • 信号

    • add_signal_handler
    • remove_signal_handler
  • 使用线程池和进程池执行任务

    • run_in_executor
    • set_default_executor
  • 异常处理

    • set_exception_handler
    • get_exception_handler
    • default_exception_handler
    • call_exception_handler
  • 调试模式

    • get_debug
    • set_debug
  • 子进程

    • subprocess_exec
    • subprocess_shell

eventloop 的官方文档

调度机制

eventloop 支持定时任务功能, call_at 函数将任务设定在指定时间执行, call_later 函数将任务设定在距离当前若干时间后执行, call_soon 函数将任务设定在下个调度点执行.

简单来看, eventloop 的核心就是一个循环, 部分代码如下:

class BaseEventLoop(events.AbstractEventLoop):

    def run_forever(self):
        """Run until stop() is called."""
        self._check_closed()
        self._check_running()
        self._set_coroutine_origin_tracking(self._debug)
        self._thread_id = threading.get_ident()

        old_agen_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                                finalizer=self._asyncgen_finalizer_hook)
        try:
            events._set_running_loop(self)
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            self._stopping = False
            self._thread_id = None
            events._set_running_loop(None)
            self._set_coroutine_origin_tracking(False)
            sys.set_asyncgen_hooks(*old_agen_hooks)
    
    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """

        # 从 _scheduled 队列中移除 "取消" 的任务
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        # 计算最小的 timeout, 一般情况下 timeout 等于 _scheduled 队列中最小的时间减去当前时间
        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

        # BaseEventLoop 没有实现 _selector 和 _process_events, 需要子类自己实现.
        # 可以查看 selector_events.py 中的 BaseSelectorEventLoop 和 unix_events.py 中的 _UnixSelectorEventLoop.
        event_list = self._selector.select(timeout)
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        # self._clock_resolution 等于 time.get_clock_info('monotonic').resolution, 我的系统 resolution 等于 1e-9(1 ns).
        # self.time() 返回的时间等于 time.monotonic(), 单位是秒, "秒 + 纳秒" 几乎不影响结果.
        # 下面的代码将 _scheduled 队列中小于当前时间(end_time) 的任务放入 _ready 队列.
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        # 执行 _ready 队列中的任务
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

eventloop 的事件循环核心就是 run_forever 和 _run_once.

eventloop 中有两个任务队列:

  • _scheduled

    _scheduled 是定时任务队列, 使用的数据结构是优先队列, 使用标准库中的 heapq 实现. 排序的 key 是任务的执行时间. 每次进行调度时, 将 _scheduled 队列中到达执行时间的任务取出来放入到 _ready 队列中, 因为 _scheduled 是优先队列, 所以取出任务这个操作非常高效.

  • _ready

    _ready 是准备被调度的任务队列, 当下一个调度点到来时会执行这些任务. _ready 使用的数据结构是标准库中的 collections.deque.

call_at/call_later/call_soon

call_at 的代码如下:

    def call_at(self, when, callback, *args, context=None):
        """Like call_later(), but uses an absolute time.

        Absolute time corresponds to the event loop's time() method.
        """
        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_at')
        timer = events.TimerHandle(when, callback, args, self, context)
        if timer._source_traceback:
            del timer._source_traceback[-1]
        heapq.heappush(self._scheduled, timer)
        timer._scheduled = True
        return timer

TimerHandle 的代码如下:

class TimerHandle(Handle):
    """Object returned by timed callback registration methods."""

    __slots__ = ['_scheduled', '_when']

    def __init__(self, when, callback, args, loop, context=None):
        assert when is not None
        super().__init__(callback, args, loop, context)
        if self._source_traceback:
            del self._source_traceback[-1]
        self._when = when
        self._scheduled = False

    def _repr_info(self):
        info = super()._repr_info()
        pos = 2 if self._cancelled else 1
        info.insert(pos, f'when={self._when}')
        return info

    def __hash__(self):
        return hash(self._when)

    def __lt__(self, other):
        if isinstance(other, TimerHandle):
            return self._when < other._when
        return NotImplemented

    def __le__(self, other):
        if isinstance(other, TimerHandle):
            return self._when < other._when or self.__eq__(other)
        return NotImplemented

    def __gt__(self, other):
        if isinstance(other, TimerHandle):
            return self._when > other._when
        return NotImplemented

    def __ge__(self, other):
        if isinstance(other, TimerHandle):
            return self._when > other._when or self.__eq__(other)
        return NotImplemented

    def __eq__(self, other):
        if isinstance(other, TimerHandle):
            return (self._when == other._when and
                    self._callback == other._callback and
                    self._args == other._args and
                    self._cancelled == other._cancelled)
        return NotImplemented

    def cancel(self):
        if not self._cancelled:
            self._loop._timer_handle_cancelled(self)
        super().cancel()

    def when(self):
        """Return a scheduled callback time.

        The time is an absolute timestamp, using the same time
        reference as loop.time().
        """
        return self._when

上面的代码还是比较容易看懂的.

协程是如何执行的

Future

class Future:

    # Class variables serving as defaults for instance variables.
    _state = _PENDING
    _result = None
    _exception = None
    _loop = None
    _source_traceback = None
    _cancel_message = None
    # A saved CancelledError for later chaining as an exception context.
    _cancelled_exc = None

    __log_traceback = False

    def __init__(self, *, loop=None):
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        self._callbacks = []


    def __schedule_callbacks(self):
        """Internal: Ask the event loop to call all callbacks.

        The callbacks are scheduled to be called as soon as possible. Also
        clears the callback list.
        """
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx)

    def cancelled(self):
        """Return True if the future was cancelled."""
        return self._state == _CANCELLED

    # Don't implement running(); see http://bugs.python.org/issue18699

    def done(self):
        """Return True if the future is done.

        Done means either that a result / exception are available, or that the
        future was cancelled.
        """
        return self._state != _PENDING

    def result(self):
        """Return the result this future represents.

        If the future has been cancelled, raises CancelledError.  If the
        future's result isn't yet available, raises InvalidStateError.  If
        the future is done and has an exception set, this exception is raised.
        """
        if self._state == _CANCELLED:
            exc = self._make_cancelled_error()
            raise exc
        if self._state != _FINISHED:
            raise exceptions.InvalidStateError('Result is not ready.')
        self.__log_traceback = False
        if self._exception is not None:
            raise self._exception
        return self._result

    def add_done_callback(self, fn, *, context=None):
        """Add a callback to be run when the future becomes done.

        The callback is called with a single argument - the future object. If
        the future is already done when this is called, the callback is
        scheduled with call_soon.
        """
        if self._state != _PENDING:
            self._loop.call_soon(fn, self, context=context)
        else:
            if context is None:
                context = contextvars.copy_context()
            self._callbacks.append((fn, context))

    def set_result(self, result):
        """Mark the future done and set its result.

        If the future is already done when this method is called, raises
        InvalidStateError.
        """
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self._state = _FINISHED
        self.__schedule_callbacks()

    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

    __iter__ = __await__  # make compatible with 'yield from'.

上面的代码经过大量删减.

future 是协程, task, 和 eventloop 之间的媒介.

Task

class Task(futures._PyFuture):  
    def __init__(self, coro, *, loop=None, name=None):
        super().__init__(loop=loop)
        self._loop.call_soon(self.__step, context=self._context)
    
    def __step(self, exc=None):
        if self.done():
            raise exceptions.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:
            if not isinstance(exc, exceptions.CancelledError):
                exc = self._make_cancelled_error()
            self._must_cancel = False
        coro = self._coro
        self._fut_waiter = None

        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                super().cancel(msg=self._cancel_message)
            else:
                super().set_result(exc.value)
        except exceptions.CancelledError as exc:
            # Save the original exception so we can chain it later.
            self._cancelled_exc = exc
            super().cancel()  # I.e., Future.cancel(self).
        except (KeyboardInterrupt, SystemExit) as exc:
            super().set_exception(exc)
            raise
        except BaseException as exc:
            super().set_exception(exc)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                elif blocking:
                    if result is self:
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
                    else:
                        result._asyncio_future_blocking = False
                        result.add_done_callback(
                            self.__wakeup, context=self._context)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel(
                                    msg=self._cancel_message):
                                self._must_cancel = False
                else:
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)

            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                new_exc = RuntimeError(
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
            else:
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
        finally:
            _leave_task(self._loop, self)
            self = None  # Needed to break cycles when an exception occurs.

    def __wakeup(self, future):
        try:
            future.result()
        except BaseException as exc:
            # This may also be a cancellation.
            self.__step(exc)
        else:
            # Don't pass the value of `future.result()` explicitly,
            # as `Future.__iter__` and `Future.__await__` don't need it.
            # If we call `_step(value, None)` instead of `_step()`,
            # Python eval loop would use `.send(value)` method call,
            # instead of `__next__()`, which is slower for futures
            # that return non-generator iterators from their `__iter__`.
            self.__step()
        self = None  # Needed to break cycles when an exception occurs.

上面的代码经过大量删减.

Task 继承自 Future, 其中最关键的方法是 __step.

Task 的 __init__ 方法中的 self._loop.call_soon(self.__step, context=self._context)__step 注册到 eventloop 的 _ready 任务队列. 在 eventloop 的第一个事件循环中, __step 就会被执行, 从而 Task 中的协程会被启动.

协程执行到 await 一般会返回一个 Future, Task 会向 Future 中添加回调方法, 该方法就是 __wakeup, __wakeup 间接调用 __step.

就这样通过 __step 不断执行协程, 直到协程结束. 如果协程正常结束(return), 那么会抛出 StopIteration, 此时 __step 会将 exc.val 保存起来.

以 asyncio.sleep 为例

@types.coroutine
def __sleep0():
    """Skip one event loop run cycle.

    This is a private helper for 'asyncio.sleep()', used
    when the 'delay' is set to 0.  It uses a bare 'yield'
    expression (which Task.__step knows how to handle)
    instead of creating a Future object.
    """
    yield


async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    if loop is None:
        loop = events.get_running_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8, "
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

当睡眠的时间到期后, 通过 call_later 注册的定时任务 futures._set_result_unless_cancelled 会被 eventloop 执行, _set_result_unless_cancelled 的代码如下:

def _set_result_unless_cancelled(fut, result):
    """Helper setting the result only if the future was not cancelled."""
    if fut.cancelled():
        return
    fut.set_result(result)

await future 第一次被执行时, 返回了 future 本身, task 接收到 future 后, 调用 future.add_done_callback 向 future 添加了回调函数, 该回调函数负责再次执行 await future, 此时 await future 退出并返回 result.

future.set_result 被执行时, 会执行 task 注册的回调函数(其实并没有直接执行, 而是将回调函数通过 loop.call_soon 放入到 eventloop 的任务队列中).

总结

弄懂生成器的实现可以说就已经掌握了协程的一半, 另一半就是 eventloop 的实现原理以及 Task 和 Future.

asyncio 中除了 eventloop, Task 和 Future 之外, 还有许多和网络相关的操作的协程实现, 这些代码我还没有阅读, 不过这些代码不会影响对于协程实现的理解.