协程与纤程

协程与纤程

概念

纤程 (Fiber) 和 协程 (Coroutine) 可用于实现并发编程(这又可实现异步 I/O)。它们之间的共同点在于:

轻量级: 与操作系统线程相比,纤程和协程更加轻量级,创建和切换的开销更低。

用户空间: 纤程和协程的调度发生在用户空间,而非内核空间,减少了系统调用的开销。

并发性: 虽然不是真正的并行,但纤程和协程可以通过交替执行任务来模拟并发执行效果。

区别:

调度控制:

注:由于这里是概念性叙述,不太能对底层堆栈加以描述

纤程: 概念上来说是一种轻量级(用户态)线程。因此调度完全由程序员控制。需显式指定何时挂起和恢复纤程。 纤程重点解决线程的问题。传统线程往往用于执行单个 I/O 操作,这时阻塞会导致整个线程挂起。 线程是时间片的最小单位。通过将多个纤程派发给单个(或多个)线程,由纤程执行 I/O 操作,能够提高线程时间片利用率。

协程: 调度由程序员和编程语言的运行时系统共同控制。协程可以在特定操作(如 I/O 操作)阻塞时自动挂起,并在操作完成后恢复,无需程序员显式指定。 这种自动挂起显然不是用户控制的,而是低层库提供。但也可手动挂起(主动让出控制流)和恢复来控制执行的流程。 其优势在于避免进程可能的的竞争态,因此无需关心是否是单线程还是多线程。

协程可能更接近异步编程,因为无需(一般也不允许手动)管理上下文(切换),低层库提供的 API 背后可能使用多个线程(比如线程池),也可能包装了操作系统提供的非阻塞 I/O 接口异步很大程度上是通过事件驱动,而这又可以基于状态机。协程天然带有这种状态转移的特性。

实现方式:

纤程: 通常在用户空间实现,作为更轻量的线程替代方案。

协程: 可以是基于栈式 (stackful) 或非栈式 (stackless) 的。

栈式协程: 拥有自己的调用栈,类似于轻量级线程,可以在函数调用之间保存状态。

非栈式协程: 没有自己的调用栈,状态保存在编译器生成的协程对象中。

非对称纤程: 纤程只能将控制权让给调用方,因为纤程需主动让出控制权。

对称纤程: 纤程启动后,调用方获得控制器,用于控制纤程的暂停和恢复。

一句话来说,纤程在于轻量化线程,而协程在于子程序的自动调度。

参考:https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4024.pdf

Goroutine

Goroutine 的模型可能类似于纤程,但 goroutine 栈(创建,销毁和自动增长)和 goroutine 切换是 runtime 自动调度的。

线程栈需要手动控制:pthread_attr_setstack

其特点在于没有共享内存,个人认为其通过信道通信的方式很大程度上模拟了协程的挂起和恢复。

以 C/C++ 为例

ucontext 库提供用户进程内部的 context 控制,可视为更先进的 setjmp/longjmp。(参考)

c// in ucontext.h

/* Userlevel context. */

typedef struct ucontext_t

{

unsigned long __ctx(uc_flags);

struct ucontext_t *uc_link; // uc_link指向当前context 结束时待恢复的上下文

stack_t uc_stack; // context 使用的栈

sigset_t uc_sigmask; // context 要阻塞的信号合集

mcontext_t uc_mcontext; // 机器特定的 context 表示

} ucontext_t;

cpp#include

#include

ucontext_t mainContext, fiberContext;

void FiberFunc() {

for (int i = 0; i < 5; ++i) {

std::cout << "Fiber: " << i << std::endl;

swapcontext(&fiberContext, &mainContext); // 切换回主上下文

}

}

int main() {

// 初始化纤程上下文

getcontext(&fiberContext);

fiberContext.uc_stack.ss_sp = malloc(64 * 1024);

fiberContext.uc_stack.ss_size = 64 * 1024;

fiberContext.uc_link = &mainContext;

makecontext(&fiberContext, FiberFunc, 0);

// 切换到纤程上下文

swapcontext(&mainContext, &fiberContext);

// 主上下文逻辑

for (int i = 0; i < 3; ++i) {

std::cout << "Main: " << i << std::endl;

swapcontext(&mainContext, &fiberContext);

}

// 释放内存

free(fiberContext.uc_stack.ss_sp);

return 0;

}Win32 提供 Fiber API

cpp#include

#include

void FiberProc(void* parameter) {

for (int i = 0; i < 5; ++i) {

std::cout << "Fiber: " << i << std::endl;

SwitchToFiber(parameter); // 切换回主纤程

}

}

int main() {

// 创建纤程栈

void* fiberStack = VirtualAlloc(nullptr, 64 * 1024, MEM_COMMIT, PAGE_READWRITE);

// 创建纤程

void* fiber = CreateFiber(0, FiberProc, nullptr);

// 切换到纤程

SwitchToFiber(fiber);

// 主纤程逻辑

for (int i = 0; i < 3; ++i) {

std::cout << "Main: " << i << std::endl;

SwitchToFiber(fiber);

}

// 清理纤程

DeleteFiber(fiber);

VirtualFree(fiberStack, 0, MEM_RELEASE);

return 0;

}Boost.Fiber

cpp#include

#include

void fiber_function() {

for (int i = 0; i < 5; ++i) {

std::cout << "Fiber: " << i << std::endl;

boost::this_fiber::yield(); // 切换到其他纤程

}

}

int main() {

boost::fibers::fiber f(fiber_function);

for (int i = 0; i < 3; ++i) {

std::cout << "Main: " << i << std::endl;

f.join(); // 等待纤程 f 结束

}

return 0;

}C++20 Coroutines / Boost.Coroutine

cpp#include

#include

struct Task {

struct promise_type {

std::suspend_always initial_suspend() { return {}; }

std::suspend_always final_suspend() noexcept { return {}; }

void return_void() {}

void unhandled_exception() {}

Task get_return_object() { return {}; }

};

};

Task foo() {

std::cout << "foo() begin" << std::endl;

co_await std::suspend_always{};

std::cout << "foo() end" << std::endl;

}

int main() {

auto f = foo();

std::cout << "main() continues" << std::endl;

return 0;

}

cpp#include

#include

struct CountingCoroutine {

struct promise_type {

int current_value = 0;

CountingCoroutine get_return_object() { return CountingCoroutine(std::coroutine_handle::from_promise(*this)); }

std::suspend_always initial_suspend() { return {}; }

std::suspend_always final_suspend() noexcept { return {}; }

void return_void() {}

void unhandled_exception() {}

std::suspend_always yield_value(int value) {

current_value = value;

return {};

}

};

std::coroutine_handle coroutine;

CountingCoroutine(std::coroutine_handle handle) : coroutine(handle) {}

~CountingCoroutine() {

if (coroutine)

coroutine.destroy();

}

int getValue() const {

return coroutine.promise().current_value;

}

void resume() {

coroutine.resume();

}

};

CountingCoroutine generateNumbers() {

for (int i = 0; i < 10; ++i) {

//移交控制权

co_yield i;

}

}

int main() {

CountingCoroutine counter = generateNumbers();

while (counter.coroutine) {

std::cout << "Value: " << counter.getValue() << std::endl;

//回到 co_yield 继续执行

counter.resume();

}

return 0;

}

风雨相关