Hlavní navigace

Jednoduchá korutina v C++ 20

5. 2. 2023 22:03 (aktualizováno) Ondřej Novák

Jak už víme, korutiny do C++ byly „doručeny“ ve stavu, kdy je nelze použít jak se říká „po vybalení z krabice“. Je to spíš taková stravebnice, něco jako LEGO, ovšem bez návodu. Návodů na internetu lze najít mnoho, ať v podobě tutoriálů, nebo celých knihoven, které lze používat bez nároků na hlubší znalosti fungování pod kapotou. Myslím si, že vědět, co se uvnitř děje a mít schopnost naprogramovat si podporu korutin z dodaných kostiček se prostě dobrému programátorovi bude hodit.

Na začátek malá perlička z mé zkušenosti. Svou první korutinu jsem použil v komerčním projektu už v roce 1998. Bylo to ve hře Brány Skeldalu. Přesněji, jsou tam programátorské „hacky“, které by se ke korutinám daly přirovnat. Nejvýznamnější použití korutiny bylo pro vykreslování animovaných sekvencí do hry. Pro kompresi animací se používaj mnou navržený algoritmus (inspirace formátem FLIC), který však měl komplikovaný stav, a existovala dekompresní funkce masivně využívající proměnné v zásobníku funkce. Aby bylo možné „krokovat“ generování framů animace a mezi framy provádět herní logiku a další operace, bylo použito alternativního zásobníku pro přehrávání animací. Vždy se vykreslil frame animace, pak se korutina uspala, přepnul se zásobník a program pokračoval herní logikou. Při dalším frame se korutina vzbudila, vygenerovala další frame a zase se uspala. To vše bylo relizováno v single-threadovém kódu psaném v jazyce C. V tomto případě šlo o stackfull korutinu. Od té doby různé operační systémy přicházely s podporou takových korutin, jako například v linuxu makecontext, nebo ve Windows systém fiberů. Stavět kód na tomto systému se moc nedalo zejména kvůli přenositelnosti takového kódu. Ten se mohl omezovat jen na operační systémy windows a linux a ještě každý fungoval jinak. Navzdory tomu, že doručena byla pouze stavebnice, je situace pro C++ 20 teď o mnoho lepší.

Pojďme si napsat jednoduchou korutinu v C++ 20, tedy spíše sadu tříd na podporu korutin a ukažme si na tom, proč je ten systém tak komplikovaný a přitom silný. Já vím, že v jiných jazycích stačí označit funkci jako korutinu a jejím zavoláním získat proměnou typu task nebo promise – v závislosti na jazyku. Například v javascriptu stačí před deklarace funkce vložit klíčové slovo async a je to

async function fetch_json_data(url) {
    let request  = await fetch(url);
    return await request.json()
}

Proč tohle jednoduše nejde v C++? Tím jedním důvodem je, že korutiny potřebují nějaký runtime, který často přichází s jazykem, ve kterém korutiny používáme. Například zmíněný javascript vyžaduje, aby takový kód běžel nad jakýmsi dispatcherem, který spravuje a zařazuje jednotlivé korutiny k provedení. Jiný runtime v javascriptu v zásadě není možný. Jazyk C++ se snaží vyjít vstříct široké škále užití na nejrůznějších platformách, ať už jde platformy s moderním operačním systémem, až po embeded kód pro jednoúčelová, často hloupá zařízení disponující jednočipovým počítačem a s minimálním nebo žádným operačním systémem. Dokážu si představit korutiny například na Arduinu.

Korutina detached_task – minimální implementace

detached_task example_coro(int i, std::string s) {
    //...
    co_await ...
}

Tato funkce vrací detached_task. Proč jsem zvolil tento název prozradím dále.Korutina musí použít aspoň jedno klíčové slovo vyhrazené korutinám. co_await, co_return, co_yield. I kdyby žádné z nich nepotřebovala, může použít na konci bloku prázdný co_return.

Co se nyní začne dít?

Překladač se bude zajímat o definici typu detached_task. To by měla být nejspíš class nebo struct. Jedná se o návratovou hodnotu, takže lze očekávat, že pokud takovou funkci zavolám, obdržím hodnotu tohoto typu. Je třeba rovnou upřesnit, že zavolání korutiny může být okamžitě vyřízeno vrácením této hodnoty, aniž by byla korutina vůbec spuštěna. K tomu se taky dostaneme. Návratová hodnota by měla představovat nějaké pojítko s běžící korutinou – její instanci – a bude se nám hodit k získání skutečného výsledku korutiny.

V tomto jednoduchém příkladě detached_task představuje běžící korutinu, která je „detached“, podobně jako funguje thread::detach(),kdy se vlákno odpojí od volajícího, tak i tato korutina nebude nijak spojena s tím, kdo jí zavolal. Deklarace takové korutiny splňuje minimální sadu povinných definicí.

class detached_task {
public:
    struct promise_type {
        detached_task get_return_object() noexcept {return {};}
        std::suspend_never initial_suspend() noexcept {return {};}
        void return_void() {}
        void unhandled_exception() {std::terminate();}
        std::suspend_never final_suspend() noexcept {return {};}
    };
};

Samotná třída je úplně prázdná. Překladač se zajímá o typ promise_type, který představuje jakousi ovládací třídu korutiny a vlastně tak definuje typ korutiny a její vlastnosti. Obsahuje některé povinné funkce, které se počas života korutiny volají, tak jak korutina prochází různými etapami svého běhu. Funkce není potřeba deklarovat virtual, v tomto případě by bylo možné je deklarovat static a ještě constexpr. Specifikace noexcept je doporučená všude tam, kde se neočekává výjimka a je povinná na final_suspend(). Korutina jako celek nevyhazuje výjimky, třída promise_type si musí výjimky ošetřit – viz dále.

detached_task get_return_object() noexcept {return {};}

Funkce get_return_object() musí zkonstruovat návratovou hodnotu z korutiny (ne výsledek), tedy v našem případě samotnou instanci detached_task. Ta je v tomto případě prázdná, takže konstrukce neobsahuje žádný kód. Tady je to výjimka, jinak drtivá většina korutin si musí do své návratové hodnoty uložit nějakou referenci na inicializovanou korutinu, pokud s ní chtějí později komunikovat.

Std::suspend_never initial_suspend() noexcept {return {};}

Tato funkce se volá před spuštěním korutiny a očekává, že zkonstruujeme awaitera, na který se následně zavolá co_await. K awaiterům se dostaneme. Standardní knihovna nabízí dva obecné awaitery – std::suspend_always, std::suspend_never. Jejich název je dostatečně vypovídající, ten první vždy pozastaví korutinu, ten druhý nikdy. V tomto případě nemáme nikoho, kdo by pozastavenou korutinu probudil, takže jedinou možností je std::suspend_never, tedy korutina se ihned spustí bez čekání


void return_void() {} void unhandled_exception() {std::terminate();}

Jedna z těchto funkcí se volá podle výsledku běhu korutiny. Funkce return_void() se volá, když korutina doběhne do svého konce, v tomto případě vrací void. Existuje varianta return_value(val), která se zavolá, pokud korutina vrací nějakou hodnotu. Funkce unhandled_exception() se volá, když je korutina ukončena výjimkou. V tomto případě necháme program ukončit, stejně jako kdyby korutina byla deklarovaná jako noexcept. Běžně se ale výjimka odchytává a přeposílá se volajícímu, což tady nejde, jelikož korutina je detachovaná. Kód téhle korutiny si musí výjimku odchytit přes try-catch.

std::suspend_never final_suspend() noexcept {return {};}

Tato funkce se volá jako poslední po předání výsledku. Smyslem funkce je rozhodnout, zda instance korutiny zůstane zachována po doběhu korutiny, nebo se zničí automaticky. Vrací se opět awaiter a provede se na něj co_await. Pokud awaiter nepozastaví korutinu, tak korutina zamíří do své záhuby a je implicitně zdestruována. Pokud je korutina zastavena v tomto bodě, už ji nelze obnovit, a musí být zničena explicitně (pomocí funkce .destroy() – viz dále). Do awaiteru však můžeme přidat další chytrý kód, který může provádět nějakou komunikaci s vnějším světem, například notifikovat jinou korutinu nebo vlákno o výsledku běhu. V tomto případě ale není nikdo, kdo by o detachované korutině chtěl něco vědět a tak ji po jejím dokončení necháme implicitně zničit.

Provedení korutiny může vypadat nějak takto (příklad)

     hlavní vlákno                                     vedlejší vlákno
           │                                                  │
           │                                                  │
           │                                                  │
           ▼                                                  │
       ┌───────┐          ┌─────────────────────────┐         │
       │ call  ├─────────►│ detached_task_promise() │         │
       └───────┘          └───────────┬─────────────┘         │
                                      │                       │
                          ┌───────────▼─────────────┐         │
           ┌─------------ │ get_return_object()     │         │
           │              └───────────┬─────────────┘         │
┌──────────▼───────┐                  │                       │
│  detached_task   │      ┌───────────▼─────────────┐         │
└──────────────────┘      │ initial_suspend()       │         │
                          └───────────┬─────────────┘         │
                                      │                       │
                                kód korutiny                  │
                                      │                       │
                          ┌───────────▼─────────────┐         │
           ┌──────────────┤ co_await (suspend)      │         │
           │              └─────────────────────────┘         │
           │                                                  │
           │              ┌─────────────────────────┐         │
           │              │ resume()                │◄────────┘
           │              └───────────┬─────────────┘
           │                          │
           │                    kód korutiny
           │                          │
           │        ┌─────────────────▼───────────────────┐
           │        │ return_void()/unhandled_exception() │
           │        └─────────────────┬───────────────────┘
           │                          │
           │              ┌───────────▼─────────────┐
           │              │ final_suspend()         │
           │              └───────────┬─────────────┘
           │                          │
           │              ┌───────────▼─────────────┐
           │              │ destroy()               ├─────────┐
           │              └─────────────────────────┘         │
           │                                                  │
           ▼                                                  ▼

Vedlejším vláknem samozřejmě může být i hlavní vlákno. Použití vláken korutiny neimplikují, avšak asynchronní operace jsou často realizovány v samostatném vlákně. Odlišný způsob běhu mají generátory, které se často střídají na jednom vláknu.

Ve schématu jsem chtěl hlavně zdůraznit místo, kdy korutina běží a v jakém okamžiku uvolní vlákno, které ji zavolalo. Je to v okamžiku, kdy provede první co_await (nebo co_yield) který bude požadovat uspání korutiny. Pokud by korutina skončila aniž by takovou operaci vyvolala, pak se celá korutina provede v rámci svého zavolání. Ze schématu je vidět, že synchronizace s koncem korutiny nemusí být jednoduchá.

Korutina subtask<T>

Dostáváme se k praktičtější verze korutiny. Její název si vysvětlíme později. Tento typ korutiny by nám měl umožnit volat a čekat na asynchronní operace, tak jak jsme na to zvyklí v javascriptu. Níže uvedený příklad se jen snaží napodobit stejný příklad z javascriptu, nepředstavuje tedy existující kód, ale pokud bych implementoval json::Value a HTTPCli­ent::fetch(), tak by mohl úplně normálně fungovat.

subtask<json::Value> fetch_json_data(std::string url) {
    auto request  = co_await HTTPClient::fetch(url);
    co_return co_await request.json()
}

Začátek deklarace typu korutiny.

template<typename T>
struct subtask_promise;

template<typename T>
class subtask {
public:
    using promise_type = subtask_promise<T>;
…
};

V příkladu se snažím ukázat, že promise_type nemusí být deklarovat uvnitř hlavní třídy. Můžeme třídu aliasovat pomocí klíčového slova using. Zvýší to čitelnost kódu.

Je třeba si ujasnit, jak se uloží výsledek korutiny. Jak už jsme si ukázali výše, korutina může vracet void, může také vracet hodnotu, ale může také vyhodit výjimku. Dokud není korutina dokončena, výsledek není definovaný. Každopádně si pro výsledek musíme někde v paměti držet prostor, aby se měl kam uložit, ať už to bude hodnota, nebo výjimka. Druhým rozhodnutím je, kde ten prostor bude alokován. Může být alokován na haldě, může být alokován v instanci subtask_promise nebo instanci subtask. První variantu bych zavrhl, protože alokace zdržují a je to zbytečné, jsem schopni najít místo bez extra alokace. Například pokud zajistíme, že subtask_promise nebude zničena před vyzvednutím výsledku, můžeme výsledek alokovat tam. To zajistíme vhodně napsaným awaiterem pro final_suspend. Není vyloučena ani varianta ukládat výsledek přímo v subtask, pokud zajistíme, že tato instance bude existovat při ukončení korutiny. A to zajistit lze, viz další část kódu. Výhodou držení výsledku přímo na subtask umožňuje implementovat další vlastnost. Například můžeme zkonstruovat subtask již s hodnotou, a tím se vyhnout volání korutiny v situacích, kdy to není potřeba. Protože i když korutiny jsou „lehčí“ než vlákna, pořád jejích správu zaplatíme nějakými takty CPU navíc.

template<typename T>
class subtask {
public:
    using promise_type = subtask_promise<T>;
    using storage = std::variant<std::monostate, T, std::exception_ptr>;


    subtask(std::coroutine_handle<promise_type> h):_coro_handle(h) {}
    subtask(const subtask &) = delete;
    subtask &operator=(const subtask &) = delete;
    subtask(subtask &&other)
        :_coro_handle(std::exchange(other._coro_handle,{}))
        ,_value(std::move(other._value)) {}
    ~subtask() {
        if (_coro_handle) _coro_handle.destroy();
    }
protected: friend class subtask_promise<T>; std::coroutine_handle<promise_type> _coro_handle; storage _value; };

Pro uložení výsledku použijeme typ std::variant. Ten má tři varianty: std::monostate, T a std::exception_ptr. První stav je prázdná třída, která hraje roli „neplatné hodnoty“ (jakoby null) a je konstruovaná jako výchozí. Druhé dva stavy nám dávají možnost uložit buď vlastní hodnotu nebo výjimku. V deklaraci nehledejte způsob vyzvednutí výsledku, doplním je později.

Zbytek deklarace definuje konstruktory subtask. Hlavní konstruktor očekává handle korutiny, kterou spravuje. Instanci nejde kopírovat, nejde přiřadit, ale lze jí přesouvat, protože oba překladače GCC a Clang vyžadují deklaraci move-konstruktoru. Destruktor naši korutinu zničí, pokud instance drží platné handle. Tady se může objevit otázka, jak zabezpečit, aby se někdo nepokoušel zničit běžící korutinu, ale ukážeme si, že taková situace běžným použitím nemůže nastat.

Ještě si navrhněme promisu

template<typename T>
struct subtask_promise {
    std::coroutine_handle<> _caller;
    subtask<T> *_future = nullptr;

    subtask<T> get_return_object() noexcept {
        return subtask<T>(std::coroutine_handle<subtask_promise>::from_promise(*this));
    }
    std::suspend_always initial_suspend() noexcept {return {};}

    void return_value(T &&val) {
        _future->_value = std::move(val);
    }
    void return_value(const T &val) {
        _future->_value = val;
    }
    void unhandled_exception() {
        _future->_value = std::current_exception();
    }
    struct final_awaiter: std::suspend_always {
        final_awaiter(subtask_promise &owner):_owner(owner) {}
        subtask_promise &_owner;
        std::coroutine_handle<> await_suspend(std::coroutine_handle<> my_handle) noexcept {
            auto caller = _owner._caller;
            _owner._future->_coro_handle = {};
            my_handle.destroy();
            return caller;
        }
    };

    final_awaiter final_suspend() noexcept {return final_awaiter(*this);}

};

Dodám, že to je téměř kompletní deklarace. Vysvětleme si jednotlivé části

  • _caller drží handle korutiny, která naši korutinu volá a čeká na výsledek. Je automaticky inicializovaný na nullptr

  • _future je pointer na svázanou instanci subtask. (je významu future). A tam budeme ukládat hodnotu. Na začátku je inicializovaná na nullptr. Nastavení tohoto pointeru bude mít na zodpovědnost subtask (nastaví tam sám sebe).

  • get_return_object() konstruuje instanci subtask. Není tam žádný chyták, snad jen způsob, jak je this převedeno na handle korutiny.

  • initial_suspent() požaduje zastavení korutiny před spuštěním. To proto, že nemůžeme korutinu spustit, dokud není nastaven _caller a _future. A v tuto chvíli to neumíme nastavit. Spuštění korutiny bude řídít subtask.

  • return_value() ve dvou variantách umožňuje uložit jak rvalue tak lvalue. Lze to dát do jednoho příkazu pomocí C++20 konceptu, ale nechci se teď zamotávat do konceptů. Výsledek se ukládá přímo do subtask<> do _value, který jsme pro to rezervovali

  • unhandled_exception() zachytává výjimku a ukládá ji do _value – pamatujete, je to variant.

  • final_suspend() a final_awaiter dělají trochu víc činnosti. final_awaiter dědí std::suspend_always, ale přepisuje chování await_suspend, který se volá jako úplně poslední operace této korutiny. Zároveň korutina není na konci implicitně zničena. Funkce await_suspend dostane jako parametr handle korutiny, která je přerušena (což je ta naše) a musí vrátit handle korutiny, která má pokračovat. A tady použijeme _caller. Tím dojde k předání řízení do korutiny, která čeká na výsledek. Ještě před tím ale vynulujme handle v subtask a korutinu zničíme přes .destroy(). Tím uvolníme paměť, protože instance korutiny již dále není potřeba. Volající totiž může chtít výsledky korutiny cachovat jako celý subtask a držet s tím frame celé korutiny by bylo paměťově náročné.

Implementace čekání na co_await

Důvod proč se korutina jmenuje subtask je ten, že primární účel korutiny je fungovat jako sub-korutina nějakého nadřazené úlohy, která je také korutinou a která potřebuje zavolat asynchronní operaci skrze tuto sub-korutinu. Sama tedy musí čekat na její výsledek. Korutina subtask tak musí fungovat jako awaiter, tedy implementovat potřebné funkce tak, aby bylo možné použít instanci subtask jako argument operátoru co_await.

subtask<int> delej_neco(...) { ... }

detached_task jina_korutina() {
int x = co_await delej_neco(...);
...;
}

Níže uvedený kód je vložen doprostřed deklarace subtask, jak je naznačeno

class subtask {
    //
    // ... kód před tím
    //
    bool await_ready() const noexcept {
        return _value.index() != 0;
    }
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> my_handle) {
        promise_type &p = _coro_handle.promise();
        p._caller = my_handle;
        p._future = this;
        return _coro_handle;
    }

    T &await_resume() {
        switch (_value.index()) {
            case 1: return std::get<1>(_value);
            case 2: std::rethrow_exception(std::get<2>(_value));
            default: throw std::runtime_error("Value is not ready");
        }
    }
   
protected: // // ... kód na konci // }

Pojďme si to projít povinné funkce

  • await_ready – se volá jako první funkce při zpracování co_await. Vrací bool a měl by vrátit true pokud je již výsledek k dispozici. V tomto případě se může stát, že někdo zavolá co_await dvakrát na stejný subtask, přičemž podruhé je již výsledek k dispozici a zabráníme tak obnovení již ukončené korutiny. Pokud je _value ve stavu std::monostate, výsledek ještě není k dispozici.

  • await_suspend – je ten okamžik, kdy se naše korutina spustí, protože volající korutina se právě uspala a čeká na výsledek. V kódu si nejprve vyzvedneme referenci na naši subtask_promise a vyplníme tam _caller (handle volajícího je předáno parametrem) a _future (což je this). Máme vše hotovo a vrácením našeho handle předáme řízení do volané korutiny a ta se spustí. Pokud doběhne, použije se _future na uložení výsledku a _caller k obnovení současné korutiny. (zajímavost: Optimalizovaný kód předává řízení přes instrukci JMP)

  • await_resume – funkce se zavolá po obnovení volající korutiny. Tato funkce předá výsledek který je vrácen z co_await. Musí též ošetřit chycenou výjimku. Podle indexu ve _value buď vracíme výsledek, nebo vyhazujeme výjimku. Stav 0 by neměl nastat i tak ho ošetříme (třeba kdyby někdo zavolal await_resume přímo).

Tyhle tři funkce definují jedno volání co_await. Teprve čekáním na výsledek se korutina spouští, podobně jako režim „defered“ u std::promise.

Příklad diagramu, jak řízení kódu probíhá

                   │
                   │
                   │
                   ▼
               ┌───────┐              ┌─────────────────────────────┐
               │ call  ├─────────────►│ subtask_promise             │
               └───────┘              └────────────┬────────────────┘
                                                   │
                                      ┌────────────▼────────────────┐
     +--------------------------------│ get_return_object           │
     :                                └────────────┬────────────────┘
     :                                             │
     ▼                                ┌────────────▼────────────────┐
┌───────┐         ┌───────────────────┤ co_await initial_suspend()  │
│       │         │                   └─────────────────────────────┘
│       │         ▼
│       │   ┌───────────┐await_suspend┌─────────────────────────────┐
│       ├──►│ co_await  ├────────────►│ kód korutiny                │
│       │   └───────────┘             │                             │
│       │                             │                             │
│       │                             │                             │
│       │                             └────────────┬────────────────┘
│       │                                          │
│subtask│                             ┌────────────▼────────────────┐
│       │                             │  co_await (suspend)         ├────────────────► async operace
│       │                             └─────────────────────────────┘
│       │                          -------------------------------------
│       │
│       │                             ┌─────────────────────────────┐
│       │                             │ resume()                    │◄──────────────── dokončeno
│       │                             └────────────┬────────────────┘
│       │                                          │
│       │                             ┌────────────▼────────────────┐
│       │                             │ kód korutiny                │
│       │◄--------+                   │                             │
│       │         :                   │                             │
│       │         :                   │                             │
└───────┘         :                   └────────────┬────────────────┘
    :             :                                │
    :             :                   ┌────────────▼────────────────┐
    :             +-------------------│ return_value()              │
    :                                 └────────────┬────────────────┘
    :                                              │
    :     ┌───────────────┐           ┌────────────▼────────────────┐
    +---─►│ await_resume  │◄──────────┤ final_awaiter() + destroy() │
          └───────┬───────┘           └─────────────────────────────┘
                  │
                  │
                  │
                  │
                  │
                  ▼

Diagram ukazuje vyvolání korutiny, která dále čeká na asynchroní operaci. Vyřízení asynchroní operace probíhá v běžném vlákně. Dokončené operace může proběhnout v jiném vlákně. Na diagramu si všimněte, jak probíhá volání. Nejprve je korutina vytvořena a teprve použitím co_await je korutina spuštěna. Původní korutina je uspána a teprve po dokončení volané korutiny je obnoven běh volající korutiny přes await_resume. Tohle uspořádání má jednu významnou výhodu: Není potřeba řešit komunikaci mezi vlákny, i když ke změně vlákna může dojít. V daném okamžiku vždy běží pouze jedna korutina, nedochází k překryvu běhu obou korutin. Dokonce přístup do subtask probíhá serializovaně.

Volání korutiny z vnějšího světa

Korutiny typu subtask umíme zavolat z jiné korutiny. Ale jak takovou korutinu zavolat z běžné funkce? Běžná funkce nemůže používat co_await. Je zde několik možností. Můžeme navrhnout nový typ korutiny, kterou bude možné řídit z běžné funkce voláním nějakých členských funkcí (třeba start(), wait(), atd). Lze použít detached_task a synchronizaci s vnějším světem zajistit přes std::promise, kterou předáme jako parametr do korutiny tohoto typu. Tato korutina by pak na svém konci nastavila výsledek do předané promisy.

subtask<int> delej_neco(...) { ... }

detached_task bootstrap(std::promise<int> result) {
int x = co_await delej_neco(...);
result.set_value(x);
}

A nebo definici task_promise mírně upravíme a přidáme novou funkci.

Nejprve upravíme task_promise.

template<typename T>
struct subtask_promise {
    std::coroutine_handle<> _caller;
    subtask<T> *_future = nullptr;
    std::atomic<bool> _sync = {false};

    //...

    struct final_awaiter: std::suspend_always {
        final_awaiter(subtask_promise &owner):_owner(owner) {}
        subtask_promise &_owner;
        std::coroutine_handle<> await_suspend(std::coroutine_handle<> ) noexcept {
            auto caller = _owner._caller;
            if (caller)  {
                _owner._future->_coro_handle = {};
            my_handle.destroy();
                return caller;
            }
            _owner._sync.store(true, std::memory_order_release);
            _owner._sync.notify_all();
            return std::noop_coroutine();
        }
    };
//...
};

Korutina musí rozpoznat, jak byla spouštěna, jestli korutinou nebo běžnou funkci. Předpokládejme, že korutina spuštěná z běžné funkce bude mít prázdný _caller . Je ale nutné předpokládat, že synchronizace s funkcí může probíhat napříč vlákny. Pokud korutina může dokončit běh v jiném vlákně, je třeba pozastavit vlákno volající funkce do doby, než bude výsledek k dispozici. Pro synchronizaci použijeme nové rozhraní C++ 20 na atomických proměnnéch, rozhraní wait() a notify_all(). Dokud korutina neskončí, je proměnná _sync nastavena na false. Ve final_awaiter se testuje, zda korutina má handle volajícího. Pokud jej má, pokračuje jako doposud, zničením stavu korutiny a předání řízení volajícímu. Pokud jej ale nemá, předpokládá se, že volajícím byla běžná funkce, jejíž vlákno nyní čeká na výsledek. Musí nastavit _sync na true (použijeme me­mory_order_release, aby se veškeré zápisy provedly před nastavením na true) a notifikovat čekající vlákno. Jako výsledek vrací speciální konstantu std::noop_coroutine(), což provede ukončení operace .resume(), která čeká v nadřazeném frame zásobníku (a která zařídila probuzení korutiny)

Takto můžeme synchronizovat volající vlákno s korutinou. Napíšeme si ještě funkci do subtask která to zařídí.

Class [[nodiscard]] subtask {
    //
    // ... kód před tím
//

T &join() { if (!await_ready()) { promise_type &p = _coro_handle.promise(); p._future = this; _coro_handle.resume(); p._sync.wait(false, std::memory_order_acquire); } return await_resume(); }
protected: // // ... kód na konci
// }

Funkce se jmenuje join(), a to proto, že napodobuje API k threadům, které také musíme joinovat. Funkce vrací výsledek korutiny (referenci na storage). Kód napodobuje implementaci operace co_await. Na počátku se provede await_ready(). Pokud výsledek není k dispozici, získá se promise, vyplní se _future, ale nevyplňuje se _caller. Následně se korutina spustí přes funkci resume()a pak už stačí jen počkat, až se proměnná _sync nastaví na true (použije se memory_order_acquire, aby se všechna čtení v tomto vlákně provedla až po signálu). Jako výsledek se pouze zavola await_resume().

subtask<int> example_subtask(int v) {...co_return ... };

int v = example_subtask(42).join()

Následující diagram ukazuje volání korutiny z normální funkce

                 │
                 │
                 ▼
             ┌───────┐               ┌─────────────────────────────┐
             │ call  ├──────────────►│ subtask_promise             │
             └───────┘               └────────────┬────────────────┘
                                                  │
                                     ┌────────────▼────────────────┐
    +--------------------------------│ get_return_object           │
    :                                └────────────┬────────────────┘
    :                                             │
    :                                ┌────────────▼────────────────┐
    :            ┌───────────────────┤ co_await initial_suspend()  │
    :            │                   └─────────────────────────────┘
    ▼            ▼
┌───────┐  ┌─────────┐  resume()     ┌─────────────────────────────┐
│       ├─►│ join()  ├──────────────►│ kód korutiny                │
│       │  └─────────┘               │                             │
│       │                            │                             │
│       │                            │                             │
│       │                            └────────────┬────────────────┘
│       │                                         │
│       │                            ┌────────────▼────────────────┐
│       │                            │  co_await (suspend)         ├────────────────►
│       │                            └─────────────────────────────┘          async operace
│       │        ┌──────────────────────────────────────────────────────────────────
│       │        │
│subtask│        │
│       │        │
│       │        │                   ┌─────────────────────────────┐            dokončeno
│       │        │                   │ resume()                    │◄────────────────
│       │        │                   └────────────┬────────────────┘
│       │        │                                │
│       │        │                   ┌────────────▼────────────────┐
│       │        │                   │ kód korutiny                │
│       │◄──-----│------+            │                             │
│       │        │      :            └────────────┬────────────────┘
└───────┘        │      :                         │
   :             │      :            ┌────────────▼────────────────┐
   :             │      +------------│ return_value()              │
   :             ▼                   └────────────┬────────────────┘
   :    ┌──────────────┐                          │
   :    │    wait()    │ notify_all  ┌────────────▼────────────────┐
   +--─►│              │◄─-----------│ final_awaiter()             │
        ├──────────────┤             └────────────┬────────────────┘
        │  destroy()   │                          │
        │await_resume()│                          │
        └────────┬─────┘                          │
                 │                                └────────────────────────────────────►
                 │
                 ▼

V tomto diagramu se předpokládá, že asynchronní operace je v běžném vlákně pouze zahájena a je dokončena v jiném vlákně (pokud by tak nebylo, synchronizace by nebyla nutná). V okamžiku, kdy korutina zavolá co_await na nějakou asynchronní operaci, řízení je vráceno do volající funkce, která zastaví vlákno na operaci wait(). Korutina pak pokračuje po dokončení asynchronní operace a po uložení výsledku provede notify_all() aby čekající vlákno mohlo pokračovat. Destrukci korutiny má v tomto případě na starost funkce join(), protože musí mít přístupný příznak _sync i potom, co korutina doběhne.

Co zbývá?

Pro tenhle typ korutiny zbývá ještě jedna (větší) práce. Korutina neumí subtask<void> i když by se to hodilo. Ano, v tomhle je to trochu nezáživné. Bude třeba naprogramovat specializaci na void a zřejmě úplně znovu. Je tam spousta definicí jinak. Například místo return_value tam bude return_void, nebo je zbytečné použití std::variant, když stačí jen uložit výjimku a bool stav o dokončení. Můžete si to zkusit za domácí úkol dopsat.

Trigger

Když jsem si připravoval ukázkový kód, v tomto bodě jsem chtěl napsat nějaký příklad. Nějakou korutinu, která provede „dummy“ asynchronní operaci. Ale uvědomil jsem si, že jakákoliv asynchronní operace potřebuje svého awaitera. A v tuto chvíli umíme čekat pouze na jinou korutinu, neumíme čekat na jinou událost. Bylo by možné napsat „dummy“ awaitera, který by simuloval asynchronní operaci. A nebo do článku přidám ještě jeden užitečný nástroj a to trigger.

  • Korutina může čekat na trigger (co_await trigger)

  • Trigger lze jednorázově spustit, čímž se probudí čekající korutina.

  • Trigger musí řešit i situaci, kdy je spuštěn v době, kdy žádná korutina (ještě) nečeká – řešit race-condition

  • Trigger by měl předat hodnotu (nebo výjimku) při své aktivaci.

(takže je to vlastně futura nebo condition_variable pro korutiny)

template<typename T>
struct trigger {

    std::atomic<std::coroutine_handle<> > _waiting_coro = {};
    std::variant<std::monostate, T, std::exception_ptr> _value;

    bool await_ready() const noexcept {
        return _waiting_coro.load() != nullptr;
    }
    bool await_suspend(std::coroutine_handle<> h) {
        std::coroutine_handle<> cond;
        return _waiting_coro.compare_exchange_strong(cond,h);
    }

    T &await_resume() {
        switch (_value.index()) {
            case 1: return std::get<1>(_value);
            case 2: std::rethrow_exception(std::get<2>(_value));
            default: throw std::runtime_error("Value is not ready");
        }
    }

    void set_value(T val) {
        _value = std::move(val);
        auto wt = _waiting_coro.exchange(std::noop_coroutine());
        if (wt) wt.resume();
    }
    void set_exception(std::exception_ptr e) {
        _value = std::move(e);
        auto wt = _waiting_coro.exchange(std::noop_coroutine());
        if (wt) wt.resume();
    }
};

Protože trigger je awaiter, definuje všechny tři povinné funkce. Tady jsme nuceni použít atomic<> a to proto, že budeme synchronizovat mezi vlákny (většinou asynchronní operace probíhají napříč vlákny). Obsahem proměnné bude handle čekající korutiny. Když nikdo nečeká, je zde nullptr. Pokud korutina čeká, je zde handle korutiny. Pokud je výsledek k dispozici, je zde std::noop_coroutine a brání nám k zahájení čekání.

  • await_ready – hodnota je k dispozic pokud ve _waiting_coro je něco jiného než nullptr. Při správném použití zde totiž může být pouze nullptr nebo std::no­op_coroutine, přičemž to druhé pouze v případě, že výsledek je již k dispozici (trigger byl spuštěn)

  • await_suspend – všimněte si, že vrací bool – což je varianta povolená standardem – zde vrací true pokud se korutina má uspat, nebo false pokud se uspávání má přerušit a korutina má pokračovat v běhu. To je ideální místo na provedení atomického compare+exchange. Pouze v případě, že ve _waiting_coro bylo nullptr dojde k nastavení proměnné a vrátí se true, jinak se vrátí false. Tím eliminujeme race-condition. Pokud korutina stihne await_suspend, bude registrována aby mohla být probuzena. Pokud to nestihne, funkce vrátí false a korutina může pokračovat, protože trigger byl mezitím aktivován.

  • await_resume – je stejné jako u subtask

  • set_value, set_exception – nastavuje hodnotu a poté atomické exchange s noop_coroutine. Tahle operace atomicky vyzvedne handle čekající korutiny a zároveň zablokuje další registraci. Takže výsledkem je buď handle korutiny, kterou je třeba probudit, nebo nullptr, v takovém případě korutina zřejmě nestačila dojít až k registraci, a dojde tam později, přičemž zjistí, že výsledek je již k dispozici a nemusí čekat.

Použití triggeru je celkem jednoduché. Korutina jej deklaruje, předá pointer nebo referenci nějakému zařízení, který provede asynchronní operaci. A pote začne čekat na výsledek přes co_await. Jakmile je asynchronní operace hotová, asynchronní zařízení zavolá set_value nebo set_ex­ception, podle výsledku, čímž dojde k probuzení čekající korutiny a ta může pokračovat v činnosti a zpracovat výsledek. Pozor na to, že korutina může pokračovat v jiném vlákně. Dokonce to, v jakém vlákně bude pokračovat, není deterministické.

A nyní ke konečnému příkladu použití

subtask<int> example_coro(int v) {
    trigger<int> res;
    std::thread t([&]{
        std::this_thread::sleep_for(std::chrono::seconds(1));
        res.set_value(v * 2);
    });
    t.detach();
    co_return co_await res;
}

subtask<int> example_coro2(int v) {
    std::cout << "coro started at thread " << std::this_thread::get_id() << std::endl;
    int res = co_await example_coro(v);
    std::cout << "coro exiting at thread " << std::this_thread::get_id() << std::endl;
    co_return res*res;

}

int main(int argc, char **argv) {
    int res = example_coro2(42).join();
    std::cout << res << std::endl;

    return 0;
}

Celý kód najdete na [1] 

Příklad pouze simuluje asynchronní operaci vláknem, které čeká 1 sekundu, než vydá výsledek. V tomto případě jde o vynásobení předané hodnoty 2×. Druhá korutina pak vezme výsledek první korutiny a ten umocní. Zároveň reportuje ve kterém vlákně pokračuje.

coro started at thread 140556988609472
coro exiting at thread 140556988593728
7056

Závěrem.

Výše uvedené příklady korutin ukazují jen malou část toho, co systém korutin v C++ nabízí. Standard zatím nepřišel s již implementovanými běžnými typy korutin. Je to možná také tím, že „běžné typy“ nejsou zatím známe, protože mnoho lidí teprve teď zjišťuje co takovým systémem mohou docílit. Moje implementace subtask není jedinou možností a má určité nevýhody, například se velice špatně ladí a v korutině nevygenerujete použitelný stack-trace. Korutinu nelze sdílet a čekat může jen jeden subjekt současně. Další typy korutin najdete v mé knihovně, o které jsem psal v předchozích dílech[2]. Snad tento článek vnesl trochu světla do tématu korutin.

Za domácí úkol rozšiřte subtask tak, aby bylo možné přímo nastavit hodnotu bez volání korutiny. To se může hodit například v rozhraní, kde návratová hodnota je předepsaná, přesto výsledná implementace nechce nebo nemusí použít korutinu k provedení operace a k vrácení výsledku.

PS: Clang++ ještě ve verzi 15 má chybu při práci s std::this_thread::get_id(), kdy nesprávně cachuje ID vlákna přes co_await, takže vám bude vypisovat stejné ID threadu.

[1] https://godbolt.org/z/sc6zrxs8c

[2] https://github.com/ondra-novak/coroutines_classes

Sdílet