Hlavní navigace

C++20 Korutiny - Přepis knihovny CoCls

12. 4. 2023 9:13 (aktualizováno) Ondřej Novák

Uplynul nějaký čas kdy jsem se musel věnovat jiným úkolům, ale ke korutinám jsem se vrátil. Často se vyplatí nechat téma „uležet“ a vrátit se později s úplně jiným pohledem na celou věc. Zvlášť, když člověk za to dobu spoustu věcí zapomene a pak když se k tomu vrátí, má najednou pocit, že je všechno až zbytečně  komplexní (překomplikované). Takže jsem se rozhodl svou knihovnu na korutiny lehce přepracovat.

Novou knihovnu najdete na nové adrese a jmenuje se jednoduše CoCls [1]. Stejný je i namespace, ve kterém jsou všechny třídy deklarované. Celou knihovnu tvoří jen hlavičkové soubory, které lze do projektu přímo includovat bez nutnosti kompilovat a připojovat libku. Do projektu jsem také kromě příkladů použití přidal i adresář obsahující testy. Testy tedy zdaleka nepokrývají veškerou funkcionalitu, to se snad v budoucnu zlepší.

Co se změnilo?

Snažil jsem se výrazně redukovat funkcionalitu a komplexitu mé původní knihovny, kterou jsem zde v předchozích postech popisoval. Původní projekt totiž začal bobtnat a komplexita rostla, a já mám radši, když je projekt co nejvíc KISS (Keep it simple stupid)

  • Jen dva typy korutin - knihovna nabízí pouze korutinu async<> a generator<>. Ukázalo se, že to postačuje. Jak název napovídá async<> je určen pro korutiny provádějící asynchronní operace, zatímco generator<> představuje korutiny, které něco generují, tedy v této korutině lze používat co_yield.
  • Odstranění resumption policy - Nakonec se ukázalo, že se s tím špatně pracuje. Nová knihovna nabízí jiný způsob jak plánovat probouzení korutin pomocí suspend_point a o tom se ještě zmíním.
  • Přepsání base awaiteru - Původní implementace používala klasicky systém dědění z interface. Podařilo se mi zbavit se virtuálních funkcí, které vždy přináší další level indirekce. Překladač bez virtuálních funkcí lépe optimalizuje kód – a překvapilo mne, jak dobře optimalizace fungují. 
  • Odstranění globálního alokátoru - Původní knihovna nabízela speciální alokátor pro rámce korutin. Tuto funkcionalitu jsem odstranil, protože se častěji používaly ad-hoc alokátory typu reusable_storage, případně jsem korutiny navrhoval jako šablony, kterým lze vhodný alokátor předat parametrem.
  • Důraz na pattern future-promise - namísto, aby vzniklo mnoho druhů korutin, které se liší víceméně jen tím, jak se nakládá s future objektem korutiny, oba nabízené typy korutin spolupracují s future-promise třídami. Výrazně to pomohlo používání korutin ve  třídách používající polymorfism – virtuální funkce a interfacy.
  • Zůstaly mutex, queue, thread_pool, scheduler, publisher-subscriber. Přibyl další objekt signal. Odebral jsem barrier, cancelable coroutine, lazy, task, dispatcher, detached a subtask. Buď jsem k tomu nenašel využití, nebo funkcionalita může být implementovaná jinak. Některé třídy jsem přepsal, aby používali future-promise – leckde to překvapivě kód výrazně zjednodušilo, například u queue.
  • nové mechanismy coro_queue suspend_point

Jak se teď budí korutiny

Namísto explicitní specifikaci resumption policy jsem se rozhodl ponechat pouze jedinou policy a to (v původním označení) „queued“. Tato policy je implementovaná v podobě singleton třídy coro_queue. Ta pracuje s kontextem aktuálního vlákna (thread_local) tak, že eviduje zda vlákno exekuuje korutinu, nebo normální kód. V tzv „coro-mode“ stačí, aby v zásobníku vlákna byl aspoň jediný frame patřící běžící korutině a nezáleží na to, v jak hluboko v zásobníku se ten frame nachází.

  • normálním režimu se jakákoliv spuštěna nebo probuzená korutina začne okamžitě provádět, čímž přeruší kód v aktuálním vlákně. Zároveň se tím vlákno přepne do „coro-mode“ a v něm zůstane, dokud existuje minimálně jedna běžící korutina nebo korutina připravená k běhu
  • režimu „coro-mode“ se evidují korutiny, které by měly být probuzeny, ale aktuální vlákno provádí jinou korutinu. Tyto korutiny jsou „připravené“ a zpravidla se na aktuálním vlákně řadí do fronty. Jde tedy o formu kooperativního multitaskingu. Přepnutí mezi korutinami dochází vždy, když je aktuální korutina přerušena, nebo skončí.

Samozřejmě jsem zvažoval i možnost plánování korutin napříč vlákny, tak jak se to děje v jiných jazycích. Mohl bych se inspirovat gorutinami z jazyka Go. Avšak tohle řešení má nevýhody. Vyžadovalo by to zavedení globálního thread poolu. Já se však snažím naprogramovat knihovnu, nikoliv framework, takže s globálními objekty bych v tomto případě byl velice opatrný. Představte si, že by každý malý program používající mou knihovnu startoval thread pool s desítkou běžících vláken! Také by to znamenalo, že všechny korutiny by musely automaticky předpokládat, že poběží paralelně, což by vedlo na nutnost používat zámky na všech úrovních. Ten hlavní „tahák“ (selling point) pro korutiny je ale v možnosti kooperativního multitaskingu, který často postačuje a kde naopak využíváme toho, že korutiny nepoběží paralelně a zámky nepotřebujeme.

Oba požadavky mi ani neseděly do konceptu KISS.

To ovšem neznamená, že by to v rámci nabízených nástrojů nešlo zařídit. Jde jen o to, že je to ve výchozím stavu řešeno pouze v rámci aktuálního vlákna (viz dále)

Kooperativní multitasking – funkce pause()

Tzv. režim „coro-mode“ byl zaveden hlavně kvůli kooperativního multitaskingu. Pokud nějaký kód je implementován jako korutina, lze předpokládat, že právě kvůli její schopnosti se uspat na nějaké asynchronní operaci. Takže pokud v rámci tohoto kódu se nějaká jiná korutina má probudit, nebudu jí budit okamžitě, ale nachystám si tu korutinu k případnému probuzení, jakmile současná korutina se uspí. Mám tak k dispozici frontu korutin čekajícíchna svou příležitost. Zároveň tím dochází k optimalizaci používání zásobníku. Pokud by tento mechanismus neexistoval, každá probuzená korutina by si alokovala nový rámce v zásobníku a najdou se i algoritmy pro korutiny, které by v takovém případě vyčerpaly celý zásobník.

Aby připravená korutina mohla běžet, musí jiná korutina být uspána. Může se ale stát, že běžící korutina nemá nic, na čem by se mohla uspat. Proto vznikl příkaz pause(), který voláme spolu s co_await. Aktuálně běžící korutina se uspí, čímž uvolní místo korutině připravené ve frontě, zároveň sama sebe zařadí na konec fronty.

co_await pause();

Jednoduchý příklad kooperativního multitaskingu

#include <iostream>
#include <cocls/future.h>

cocls::async<void> test_task(int id) {
    for (int j = 0; j < 10; j++) {
        for (int i = 0; i < id; i++) std::cout << "\t";
        std::cout << j << std::endl;
        co_await cocls::pause();
    }
}


cocls::future<void> test_cooperative() {
   for (int i = 0; i < 5; i++) {
       test_task(i).detach();
   }
   co_return;
}


int main(int, char **) {
    test_cooperative().join();

}

Pro zahájení kooperativního multitaskingu je třeba nejprve spustit korutinu, která následně spustí všechny korutiny, které se mají ve vlákně střídat. Pak skončí. V rámci „coro-mode“ vlákna nedojde k ukončení běhu korutiny dokud existují připravené korutiny. Následně tedy běží 5 korutin které se přes pause() střídají v běhu. Teprve ukončením poslední korutiny dojde k návratu do main() a k ukončení programu.

Suspend point – plánování korutin ručně

V knihovně existuje mnoho míst, kde zavoláním funkce nebo metody dojde k probuzení korutiny. Pokud je to v rámci normálního vlákna, dojde k okamžitému spuštění dané korutiny, pokud je to v rámci „coro-mode“, taková korutina se pouze připraví k běhu zařazením do fronty v aktuálním vlákně. Ani jedno řešení nemusí být v danou chvíli vhodné. V normálním vlákně může dojít ke spuštění korutinu uvnitř nějakého zámku a zablokovat tím daný prostředek na delší dobu. V „coro-mode“ by zase mohlo dojít k vyhladovění dané korutiny. Řešení nabízí tzv. suspend_point, který je vracen právě v situaci, kdy nějaká funkce připravila korutinu (nebo i množinu korutin) k běhu, avšak tyto korutiny ještě nebyly probuzeny. Objekt slouží volajícímu, aby naplánoval vhodný bod ke svému uspání a ke spuštění připravených korutin, přičemž tento objekt zároveň nese seznam korutin, které je třeba spustit. Příklad:

Mějme korutinu, která čeká na future a k ní existuje svázaná promise. Pokud některá funkce nastaví hodnotu promise, stane se korutina čekající na future „připravena k běhu“. Funkce nastavující promise vrátí suspend_point volajícímu. Ten může provést co_await na tomto objektu a tím je uspán a namísto toho je spuštěna připravená korutina a ta si může hodnotu vyzvednou. Je to taková operace ručního přepnutí.

cocls::promise<int> p = ... // nějaká promise
co_await p(42); //nastav svázanou future na 42 a přepni do čekající korutiny

Pokud se toto stane ve funkci, která není korutinou, může být suspend_point poslán volajícímu, který už může být korutinou a kde lze provést uspání.

cocls::suspend_point<bool> resolve() {
   cocls::promise<int> p = ... // nějaká promise
   return p(42);
}

cocls::async<void> coro() {
   //...
   co_await resolve()
   //...
}

Objekt suspend_point lze libovolně přenášet kódem i mezi vlákny a dokud není provedeno co_await, nejsou připravené korutiny spuštěny. Pokud však suspend_point skončí v normálním vlákně, pak jeho zahozením (destrukcí) dojde k přerušení běhu aktuálního vlákna a k probuzení všech čekajících korutin v tomto vlákně (v rámci jeho destruktoru), takže připravené korutiny se nakonec probudí. Vhodnou správou objektu suspend_point však můžeme vybrat místo, kde nám přerušení vadí nejméně.

Reakce na suspend_point je nepovinná. Pokud je objekt při návratu z funkce zahozen, připravené korutiny jsou spuštěny podle pravidel v aktuálním vlákně. I tak má smysl objekt používat, například umožňuje přenést odpovědnost za spuštění korutin mimo tělo volaného, tedy vně případných zámků a vně kontextu funkce, který by jinak mohly ovlivnit.

Objekt suspend_point je optimalizován pro 3 připravené korutiny, kdy nepotřebuje alokovat paměť na haldě. Pro víc jak 3 připravené korutiny pak alokuje pole na haldě. Pokud se mi v kódu sejde víc suspend pointů, pak se dají slučovat pomocí operátoru <<

//připrav první korutinu
suspend_point<void> p1 (coro1.detach());
//připrav druhou korutinu
suspend_point<void> p2 (coro2.detach());
//sloučený bod
suspend_point<void> c;
//sluč do c
c << p1 << p2;
//přidej tam ještě třetí korutinu
c << coro3.detach();
//a teď se uspi a nechť ostatní běží
co_await c;

Suspend point v thread poolu

Jen krátce se zmíním o možnost nechat suspend point aktivovat v thread poolu. Pak skutečně dojde k tomu, že se čekající korutiny spustí v tomto thread poolu a to tak, že každá dostane přidělené jedno vlákno (případně se zařadí do fronty na volné vlákno). 

cocls::thread_pool pool;
//sloučený bod
suspend_point<void> c = ... // viz předchozí příklad

pool.resume(c);   //spusť všechny tři korutiny ve třech vláknech

Suspend point s parametrem

Protože suspend_point<> zabere návratovou hodnotu funkce, nebylo by možné předat výsledek funkce, proto suspend_point umožňuje svázat objekt s libovolnou hodnotou, jejíž typ se specifikuje parametrem šablony

  • suspend_point<void>
  • suspend_point<int>
  • suspend_point<bool>
  • suspend_point<std::string>
  • atd…

Tato hodnota je vrácena jako výsledek operace co_await po tom, co je aktuální korutina opět probuzena. Mimo korutinu je výsledek dostupný přímo přiřazením objektu do proměnné patřičného typu

Proč se to jmenuje suspend_point, když to ve skutečnosti nosí připravené korutiny, které chtějí probudit?

Název jsem si nechal poradit přes ChatGPT :-)

Ve skutečnosti hlavním smyslem je čitelnost u rozhraní. Pokud máte rozhraní

suspend_point<bool> cancel_event(ident id);

…pak na první pohled je vidět, že funkce by ráda, aby se volající uspal (ona sama to udělat nemůže, protože není korutinou), tím by mohla dokončit požadovanou operaci, která zřejmě probíhá v korutině. Předává mu tedy bod uspání, a očekává, že volající se skutečně uspí. Vstupuje do toho i ona „nepovinnost“, volající to nemusí udělat, a pak se vše řídí pravidly popsanými výše. Zároveň z popisu rozhraní vidíme, že výsledkem operace je i nějaký boolean, který by mohl nést nějakou informaci.

Korutina async<T>

Pojďme se nyní podívat na vlastní korutiny, nejprve tedy na async<T>. Parametrem šablony je návratová hodnota z korutiny, přičemž povolen je i typ void: async<void>

async<int> korutina(int arg) {
    co_return arg+42;
}

Pokud takovou korutinu zavoláme, dojde pouze ke konstrukci objektu async<int> ale samotná korutina se nespustí. Chová se to jako konstruktor, který vytvoří objekt korutiny s předanými parametry. Pro spuštění musíme udělat ještě jeden krok, máme přitom na výběr:

  • Spustit v režimu „detach“ - v tomto režimu se korutina spustí bez možnosti další synchronizace a i s tím, že výsledek korutiny bude zahozen. Je to tedy spíš vhodné pro void korutiny, ale není zakázáno takto spouštět i korutiny vracející výsledek. Prostě se na konci zahodí. Tato funkce vrací suspend_point, tedy samotným spuštěním se korutina stane „připravenou k běhu“ a spustí se až když suspend_point přeruší aktuální vlákno.To nám umožňuje spuštění korutiny ještě lépe plánovat (třeba spustit přímo v jiném vlákně).
  • Spustit a svázat promise – start(promise) - v tomto režimu se korutina spustí a když skončí, její výsledek nastaví promise. I tato funkce vrací suspend_point 
  • Spustit a vrátit future<T> – jde o přímější způsob spuštění, která se hodí do výrazu za return funkce která vrací future<T>. Na tuto future může volající následně čekat. Tento typ spuštění neumožňuje použít suspend_point, korutina se spustí okamžitě.
  • Spustit a počkat na výsledek přes co_await - v rámci korutiny, pokud zavoláme co_await korutina(x), dojde k uspání současné korutiny a ke spuštění nové korutiny. Současná korutina se probudí až když nová korutina skončí a vrátí výsledek. Výsledkem operace co_await je pak návratová hodnota
  • Spustit synchronně - k tomu slouží funkce join(). Tím se spustí korutina v současném vlákně a to vlákno zůstane zablokované, dokud korutina nevrátí výsledek. A to i v případě, že by exekuce korutiny byla přesunuta do jiného vlákna.  Výsledkem funkce je návratová hodnota
korutina(0).detach();
//-----------
promise<int> p = ... //získej promise někde
korutina(1).start(promise);
//-----------
future<int> f = korutina(2).start();
int result = co_await f;
//-----------
int result = co_await korutina(3);
//-----------
int result = korutina(4).join();

Objekt async<> lze před spuštěním korutiny přesouvat pomocí std::move(). Jakmile je však korutina spuštěna, pak se objekt stane neplatným a nemůže být použit k přístupu k dané korutině. Spuštěním se totiž stav korutiny přesune „do útrob“ knihovny, kde je potřeba mít k němu výhradní přístup.

Korutina async<T> ve službách future<T>

Knihovna umožňuje použít zkratku pro převod async<T> na future<T>. Objekt future<T> často budeme používat jako návratovou hodnotu z mnoha asynchronních funkcí – spíš než samotný async<T>, který má mít jen lokální použití. Pokud však naše korutina je vždy spouštěna přes start() a vrací future<T> pak lze návratovou future<T> použít místo async<T> a přesto psát kód pro korutinu. To není zavedení nového typu korutiny, jde jen o zkratku, protože takto deklarovanou korutinu stále implementuje async<T>, byť to tam není uvedeno.

cocls::future<int> korutina2(int x) {
    co_return x+42;
}

Výše uvedený kód je ekvivalentní zápisu:

cocls::async<int> korutina2_coro(int x) {
   co_return x+42;
}

cocls::future<int> korutina2(int x) {
   return korutina2_coro(x).start();
}

Návrhový vzor future-promise

Skutečným důvodem k přepsání a zjednodušení knihovny cocls byl větší důraz na návrhový vzor future-promise. Na začátek bych chtěl upozornit, že zde nepopisuju std::future a std::promise ale objekty knihovny cocls. Tedy cocls::future a cocls::promise. Jejich účel je sice podobný, ale významně se liší rozhraním i způsobem použitím a také tím, že cocls::future podporuje operaci co_await

Vysvětlení návrhového vzoru

Tento vzor umožňuje vzdálenou synchronizaci mezi kódem, který produkuje nějakou hodnotu a kódem, který tu hodnotu konzumuje. Jedná se však o jednorázovou synchronizaci, tedy jakmile je hodnota produkována, předá se čekajícímu kódu a tím je vztah mezi těmito části kódu ukončen. Pokud by měl vztah pokračovat, například jde o stream hodnot, pak je lepší použít frontu (queue). V praxi ale jednorázovou synchronizaci používáme častěji, protože je jednodušší, často žádáme a čekáme na jednu hodnotu, více hodnot zpravidla požadujeme v cyklu po jedné a odpadá například problém s „rušením vztahu“. Je to podobné jako request-response. Ke každému requestu (volání funkce) jedna odpověď (produkování jedné hodnoty). Jednorázové použití též zjednodušuje vlastní implementaci, kterou lze realizovat plně lock-free (tedy bez použití zámků)

  • Future - představuje „placeholder“ pro budoucí hodnotu. Bývá často vracen z funkcí, které provádí asynchronní výpočet. Volající musí na hodnotu počkat – a k tomu může použít co_await. Tím do hry vstupují korutiny, čekáním na výsledek dojde k uspání korutiny a vlákno, ve kterém korutina běžela může být použito k něčemu jinému. Jakmile je výsledek k dispozici, je korutina probuzena a může si z future<T> vyzvednout výsledek výpočtu
  • Promise - představuje slib volaného, že hodnotu dodá. Promise je svázaná se svou future. Je to zároveň objekt, kterým lze nakonec hodnotu nastavit a tím slib splnit. V knihovně cocls se tento objekt chová jako funkce. Zavoláním funkce promise se předá výsledek svázané future. Zároveň se tento vztah ukončí (svázání se přeruší)

Abstrakce s korutinami

Pokud navrhujeme nějaké abstraktní rozhraní, používání typů představující korutiny jako návratové hodnoty je velmi nepraktické. Narušuje to celý smysl abstrakce, protože předepisuje, aby daná funkce v patřičném rozhraní byla implementována jako korutina. U rozhraní, která nabízí více implementací však zpravidla nechceme vynucovat způsob implementace. Objekt future<T> tak představuje vyšší úroveň abstrakce, pouze oznamuje, že výsledek nějakého volání bude dostupný později. To jestli na druhé straně rozhraní bude spuštěna korutina, nebo se výsledek spočítá jinak  není pro použití rozhraní rozhodující. Možností je i synchronní výpočet, protože objekt future<T> lze vytvořit již s nastavenou hodnotou.

Příklad jednoho rozhraní pro asynchronní streamy

class IStream {
public:
    virtual ~IStream() = default;

    virtual cocls::future<std::string_view> read() = 0;
    virtual void put_back(std::string_view buff) = 0;

    virtual cocls::future<bool> write(std::string_view buffer) = 0;
    virtual cocls::future<bool> write_eof() = 0;

    virtual cocls::suspend_point<void> shutdown() = 0;

};

Při implementaci cocls::future<T> jsem se snažil brát maximální ohled na výkon. Hlavním požadavkem bylo, aby objekt nealokoval žádnou paměť na haldě. Objekt často „sedí“ ve frame korutiny a s výhodou využijeme faktu, že nikam neodejde, dokud není slib splněn a promise zavolána. Pro fungující synchronizaci potřebujeme pevný bod v paměti. Z toho důvodu jsem objektu future<T> zakázal přesun i kopírování. Je zřejmé, že nemůžeme potřebovat, aby objekt měnil adresu, to by velice komplikovalo synchronizaci. V knihovně STL se tento problém řeší právě tím, že se sdílený prostor alokuje na haldě. Alokace však stojí drahocenný výkon procesoru.

 Tato „drobnost“ způsobuje, že práce s objektem není v některých situacích intuitivní a postupně si rozebereme problémy, které to přináší i nastíníme řešení. Chtěl jsem jen na začátek odůvodnit mé strategické rozhodnutí. Z vlastní zkušenosti dodám, že v drtivé většině případů tato omezení nejsou překážkou. A opět i tady platí, že programátor má možnost se z tohoto omezení vymanit a vybrat si. Pokud potřebuje aby se s objektem dalo hýbat, musí si jej alokovat na heapu ručně.

Jak už jsem psal, vždy vzniká dvojice future a promise a zpravidla promise zůstává na straně implementace dané funkce, zatímco future je vrácena z funkce. I když má future<T> zakázané kopírování i přesun, nebrání to vracet future<T> jako hodnotu a to díky garantovanému copy elision [2], které bylo zavedeno už v C++17. Podmínkou je, že objekt konstruujeme ve výrazu return. A toho se využívá při psaní funkcí, které future<T> vrací.

cocls::future<int> calc_async(int x) {
    //------------ vytvoř future a promise
    return [&](cocls::promise<int> promise) {
        //------------- práce s promise -----------
        std::thread t([promise = std::move(promise), x] () mutable {
            promise(x+42);
        });
        t.detach();
    };
}

Pro konstrukci future<T> použijeme lambda funkci, která obdrží svázanou promise. Tu můžeme přesunout například do vlákna, kde se odehraje příslušný výpočet. Objekt promise je přesouvatelný ale není kopírovatelný, proto musíme použít std::move(). Vlastní funkce vrátí objekt future<T> bez hodnoty a v okamžiku dokončení výpočtu ve vlákně se hodnota nastaví skrze právě svázanou promise()

Předávání chyb přes promise()

Kromě výsledku lze přes promise předat i výjimku. Třeba následujícím způsobem

cocls::future<int> calc_async(int x) {
    return [&](cocls::promise<int> promise) {
        std::thread t([promise = std::move(promise), x] () mutable {
            try {
                promise(run_long_calculation(x));
            } catch (...) {
                promise(std::current_exception());
            }
        });
        t.detach();
    };
}

Pokud by ve funkci run_long_calculation došlo k výjimce, vlákno, ve kterém výpočet běží, nemá jak výjimku zpracovat. Zde můžeme výjimku zachytit funkcí přes std::current_exception() a s výsledkem zavolat promisu. Tato výjimka je pak předana do future<int> místo výsledku a při pokusu získat výsledek se vyhodí v čekající funkci a ta na ní může patřičně reagovat.

Nedokončení výpočtu a dropnutí promisy

Co se stane, když je promise zničena, aniž by došlo k jejímu zavolání? I to je ošetřeno. Došlo k porušení slibu a objekt future na to zareaguje. Přepne do stavu, kdy sice signalizuje nastavení hodnoty, ale hodnotu nemá, a ani výjimku. V případě pokusu načíst hodnotu se vyhazuje výjimka cocls::await_cance­led_exception(). V rámci rozhraní objektu future<T> lze testovat tento případ i explicitně. Je zde funkce has_value(), která vrací true, pokud future<T> má hodnotu nebo výjimku, a false pokud je to ten druhý případ.  Objekt future se v tomto případě chová jako optional<>. V korutině můžeme použít co_await future.has_value() se stejným významem s tím, že korutina se uspí, pokud future<T> ještě stále na svou hodnotu čeká.

Dropnutí promisy lze provést i ručně

promise(cocls::drop);

… se stejným výsledkem. Tohoto se hojně využívá u generátorů, které na konci svého běhu vrací future<T> bez hodnoty čímž oznamují své dokončení (konec generování)

Práce s future<T>

Pokud objekt future<T> propadne do korutiny, není nic jednoduššího, než použít co_await k získání výsledku. Dokonce ani není nutné proměnnou deklarovat

int result = co_await calc_async(123)

Tento zápis způsobí, že korutina se uspí dokud není znám výsledek výpočtu. Toto je asi nejčastější použití.

Pokud objekt future<T> musíme řešit mimo korutinu, je zde k dispozici funkce wait() která zablokuje aktuální vlákno, dokud se hodnota future<T> nenastaví. Výsledek je pak vrácen. Stejně se chová i operátor * (dereference) 

int result = *calc_async(123)
// int result = calc_async(123).wait()

Pozor na to, že synchronní čekání bude fungovat i v korutině, kde je ale kontraproduktivní, proto je třeba kód navrhnout tak, aby v případě, že kód běží v korutině bylo na čekání použit co_await namísto dereference nebo funkce wait(). Žádný interní mechanismus nám to neohlídá. Všude tam, kde chceme použít wait() se ujistěte, jestli náhodou nejde o korutinu.

Na druhou stranu, dereferenci (*) lze použít i v korutině na místě, kde už bezpečně víme, že future hodnotu má.

cocls::future<int> f = calc_async();
if (co_await f.has_value()) { //čekáme na hodnotu
    std::cout << *f << std::endl; //tady už ji máme

Komplikace při práce s future<T>

Protože objekt future<T> není kopírovatelný ani přesouvatelný, špatně se s ním pracuje na mnoha místech. Ona nepřesouvatelnost znamená, že není možné použít například std::make_unique<> čímž by se dalo zajistit jeho přesouvatelnost. Vlastně jediná možnost je použít staré dobré new

std::unique_ptr fut(new auto(calc_async(123)));

//nyní je fut přesouvatelná

Jiné možnosti se vlastně nenabízí a viditelnost new v kódu nebudí důvěru (z hlediska code review). Proto bylo rozhraní future<T> obohaceno o některé zkratky  (nechci říkat hacky, i když se to slovo tady nabízí), které práci usnadňují. Jsou zpravidla založené na použití new (v placement verzi)

1. Konstrukce lambda funkcí

Jedna verze už byla představena a to konstrukce pomocí lambda funkce, která obdrží promise objekt. Další možností je konstrukce pomocí lambda funkce, která vrací future<T>. To oceníme tam, kde potřebujeme jakoby „přesunout“ objekt do nějakého kontejneru, ačkoliv o přesun ve skutečnosti nejde.

auto movable = std::make_unique<cocls::future<int> >([&]{return calc_async(123);});

std::optional<future<T> > opt_fut;
opt_fut.emplace([&]{return calc_async(123);});

2. Přiřazení lambda funkcí

Důvod, proč nefunguje přiřazení je ten, že přiřazení vyžaduje přesouvatelný objekt. Proto byl zaveden operátor << který umožňuje přiřadit do proměnné future<T> pomocí lambda funkce

cocls::future<int> result;
int i;
do {
    result << [&]{return calc_async(i++);};
    std::cout << (co_await result) << std::endl;
} while (*result != 0);

Aby bylo možné do future<T> takto přiřadit, je nutné, aby proměnná nebyla ve stavu, kdy čeká na hodnotu, tedy nikde „nelítá“ reference v podobě svázané promise.

Příklad: Awaiter, který čte ze streamu a vrací znaky – ty načítáme přes co_await, ačkoliv by se mohlo zdát, že používání objektu bude značně neefektivní, opak je pravdou, překladače jsou fakt dobré a pochopí, co to má dělat. Používá se přiřazení do future<T> (odebral jsem nepodstatné věci). 

class CharacterReader: public cocls::awaiter {
public:
    CharacterReader(Stream s):_s(s) {}
    bool await_ready() const {
        return _ptr != _end;
    }
    bool await_suspend(std::coroutine_handle<> h) {
        set_handle(h);
        _fut << [&]{return _s.read();}; //přiřazení do future
        return _fut.subscribe(this);
    }
    int await_resume() {
        if (_ptr == _end) {
            std::string_view &text = _fut.value();
            if (text.empty()) return -1;
            _ptr = text.data();
            _end = _ptr+text.size();
        }
        return *(_ptr++);
    }
    ~CharacterReader() {
        _s.put_back(std::string_view(_ptr, _end-_ptr));
    }

protected:
    Stream _s;
    cocls::future<std::string_view> _fut;
    const char *_ptr = nullptr;
    const char *_end = nullptr;
};

//použití
CharacterReader rd(stream);
char a = co_await rd;
char b = co_await rd;
....

3. discard výsledku

Vrácena future<T> se musí vždy zpracovat, počkat na hodnotu a teprve pak se může zničit. Předčasné zničení je chyba a vede na UB (a debugu na assert). Co když potřebujeme výsledek zahodit, a tedy nečekat zbytečně na dokončení výpočtu. Pokud nemůžeme problém obejít tím, že zavolat async<T> přímo v režimu detach(), pak zbývá cocls::discard(). I v tomto případě použijeme lambdu.

cocls::discard([&]{return calc_async(123);});

Funkce discard funguje tak, že pro future<T> alokuje místo na heapu a jakmile je nastavena hodnota, tak paměť automaticky dealokuje. Tímto se můžeme zbavit závazku čekat na výsledek future<T>

3. shared_future<T>

Někdy potřebujeme future sdílet mezi mnoha vlastníky, a k tomu použijeme shared_future<T>. Protože se zpravidla tento objekt nevyskytuje jako návratová hodnota (používáme future<T>), je potřeba opět použít trik s lambdou pro konstrukci objektu

cocls::shared_future<T> fut([&]{return calc_async(123);});

Sdílená futura již může být kopírována, čímž dochází k jejímu sdílení. Je zřejmé, že v tomto případě se prostor pro future alokuje na heapu. Sdílená futura má také tu vlastnost, že během čekání na výsledek je započtena jedna reference navíc, takže není chybou zahození všech referencí před nastavením výsledku. Prostě jakmile je nastaven výsledek a v ten okamžik neexistuje žádná reference, pak je zároveň futura zahozena. Sdílené futury mohou najít uplatnění u cache, kdy můžeme slot v cache alokovat před naplněním hodnotou a každý, kdo do cache sáhne později bude vědět, zda musí na naplnění cache čekat, nebo už je výsledek k dispozici. Každopádně bude vidět, že záznam v cache už existuje a není třeba jej zakládat.

Generátory

Poslední co zmíním a co doznalo změny, je generator. Ten nabízí synchronní ale i asynchronní generátor, a také generátor s argumentem (v obou variantách). 

  • Synchronní generátor - nikdy nepoužije co_await a používá pouze co_yield. Takový generátor se jednoduše plánuje, protože se spolu s volajícím střídá v aktuálním vlákně. Generátor zahajuje a ukončuje generování ve stejném vlákně, z perspektivy volajícího se to jeví jako volání funkce
  • Asynchronní generátor - může použít co_await a může dokončit generování v jiném vlákně. To může komplikovat volání generátoru a vyzvedávání výsledku

Generátor může být s parametrem nebo bez parametru (klasický)

  • Klasický generátor cocls::genetator<T> - generátor generuje hodnoty pouze na základě parametrů předaných během jeho konstrukce, komunikace s volajícím je jednosměrná, tedy hodnota putuje z generátoru k volajícímu.
  • Generátor s argumentem cocls::generator<T, Arg> – generátor navíc pro každý cyklus generování obdrží argument zaslaný volajícím, komunikace je tedy dvoustranná. Volající pošle argument a generátor pak výsledek

Generátory lze dělit i z hlediska počtu cyklů

  • Konečný generátor – má konečný počet cyklů, pak se zastaví a nějakým způsobem notifikuje volajícího, že další hodnoty nejsou k dispozici. Volající musí být schopen na to reagovat, pokud na to není připraven, řeší se to zpravidla výjimkou
  • Nekonečný generátor - nemá konečný počet cyklů, může generovat do nekonečna. Příkladem může být nekonečná řada čísel. Volající nesmí čekat na konec generování, jinak dojde k nekonečné smyčce.

Oba typy generátorů lze kdykoliv přerušit. Je třeba to udělat v době, kdy generátor čeká v co_yield. Stačí pouze generátor zdestruovat – automaticky se zavolají destruktory všech objektů vytvořených během generování, ale je třeba dát pozor, aby kód generátoru byl „exception safe“ – minimálně kolem co_yield – protože se to chová jako by nastala nezachytitelná výjimka.

Z hlediska API lze použít synchronní a asynchronní přístup

  • Synchronní přístup - zahrnuje volání generátoru a čekání na hodnotu. Pokud je synchronně volán asynchronní generátor, jde o blokující operaci
  • Asynchronní přístup - zahrnuje volání generátoru přes co_await. Pokud je takto volán asynchronní generátor, dojde k uspání korutiny při čekání na výsledek. Čekající korutina pak může být probuzena v jiném vlákně.

Základní API nabízí funkce .next() a .value()

cocls::generator<int> fibo(int count) {
    int a = 0;
    int b = 1;
    for(int i = 0;i < count; i++) {
        int c = a+b;
        co_yield c;
        a = b;
        b = c;
    }
}

int main() {
    cocls::generator<int> gen = fibo(15);
    while (gen.next()) {
        std::cout << gen.value() << std::endl;
    }
    return 0;
}

V asynchronním režimu lze funkci .next() zavolat přes co_await

cocls::generator<int> gen = fibo(15);
while (co_await gen.next()) {
    std::cout << gen.value() << std::endl;
}

Generátor nabízí i přístup přes iterátor. Ten je dostupný jen v synchronním režimu. Iterátory pak otevírají cestu k použití ranged-for. Pozor na to, jestli je generátor nekonečný, pak zde dojde k nekonečné smyčce

cocls::generator<int> gen = fibo(20);
for (auto value: gen) std::cout<<value<<std::endl;

Generátor se však může chovat i jako funkce, kterou voláme vždy, když potřebujeme aby generátor vygeneroval další hodnotu.  V takovém případě generátor vrací future<T>

cocls::generator<int> gen = fibo();
for (int i = 0; i< 10; i++) {
   cocls::future<int> res = gen();
   std::cout << res.wait() << std::endl;
}

U konečných generátorů musíme detekovat konec.

cocls::generator<int> gen = fibo();
while(true) {
   cocls::future<int> res = gen();
   if (res) {  //res.has_value()
       std::cout << *res << std::endl;
   } else {
       break;
   }
}

V asynchronním přístupu je třeba použít co_await na has_value()

cocls::generator<int> gen = fibo();
while(true) {
   cocls::future<int> res = gen();
   if (co_await res.has_value()) {
       std::cout << *res << std::endl;
   } else {
       break;
   }
}

Nebo to řešit přes výjimku

cocls::generator<int> gen = fibo();
try {
    for (int i = 0; i< 10; i++) {
        int res = co_await gen();
        std::cout << res << std::endl;
    }
} catch (const cocls::await_canceled_exception &) {
    std::cout << "Done" << std::endl;
}

Typický běh generátoru

  1. Generátor je zkonstruován avšak funkce není spuštěna
  2. První zavolání generátoru spustí funkci. Očekává se, že funkce vygeneruje první hodnotu a předá ji přes co_yield
  3. Hodnota se předává referenci a bez const, takže hodnotu lze nejen přečíst, ale přesunout z generátoru – std::move()
  4. Při dalším zavolání kód pokračuje za co_yield. Očekává se, že generátor vygeneruje další hodnotu z opět ji předá přes co_yield
  5. Pokud generátor ukončí svou činnost, pak volajícímu je ohlášen konec generování
  6. Pokud dojde k výjimce počas generování, pak se při přístupu na hodnotu výjimka vyhodí, a navíc se to považuje za ukončení generování
  7. Generátor nevrací hodnotu přes co_return

Generátor s argumentem – odlišnosti

Generátor s argumentem se volá s argumentem buď přes .next(arg) nebo prostým zavoláním gen(arg). Předat lze pouze jeden argument, pokud chcete předat víc argumentů, pak použijte strukturu. Argument se předává referencí, proto je třeba zajistit, aby zůstal platný po celou dobu generování – což je zpravidla zajištěno tím, že volající typicky čeká na výsledek.

Pokud jako argument použijeme tuple nebo strukturu, můžeme generátor volat s více argumenty. Generátor pak obdrží hodnoty zabalené do zvolené struktury

Trochu se liší i vlastní běh generátoru

  1. Generátor je zkonstruován, avšak funkce není spuštěna
  2. První zavolání s argumentem funkci spustí. Funkce však nevidí první předaný argument
  3. Funkce zpravidla použije co_yield nullptr; což je pevně zvolené volání, které pouze načte argument předávaný prvním zavoláním.
  4. Funkce následně argument použije v generování hodnoty a tu předá do co_yield
  5. Volající obdrží výsledek.
  6. Při dalším zavolání je opět předán argument, tentokrát se argument objeví jako výsledek co_yield po obnovení generační funkce
  7. Dál probíhá generování normálně jako u standardního generátoru
struct RetVal {
    double sum = 0;
    int count = 0;
};


cocls::generator<RetVal,double> summary() {
    double val = co_yield nullptr;
    RetVal rv;
    while(true) {
        rv.count++;
        rv.sum+=val;
        val = co_yield rv;
    }
}

int min() {
    auto sum = summary();
    double data[] = {1,4,32,31.3,58.3,0.2, 16.3, 0.8, 7.7,4,8.5};
    for (double x: data) {
        RetVal state = *sum(x);
        std::cout << "Value=" << x
                 <<", Sum=" << state.sum
                 <<", Count=" << state.count
                 <<", Avg=" << state.sum/state.count << std::endl;
    }
    return 0;
}

Agregace generátorů

Agregace generátorů je důležitá v okamžiku, kdy mám několik asynchronních generátorů a potřebuju získat hodnotu z jednoho z nich, je libovolné z kterého, z toho prvního, který generování dokončí. 

Jako příklad uvedu generátor, který realizuje accept příchozího spojení

cocls::generator<int> accept_con = listen(mother_socket);
//....
int socket = co_await accept_con();

Funkce listen(mother_socket) zahájí naslouchání na nově příchozí spojení na otevřeném socketu. Pokud bych chtěl naslouchat na dvou socketech současně, byl by kód mnohem složitější. Naštěstí toto řeší generator_aggregator

cocls::generator<int> accept_con = cocls::generator_aggregator({
          listen(mother_socket_1),
          listen(mother_socket_2)
});
//....
int socket = co_await accept_con();

Agregátor přijímá n generátorů a sám pak funguje jako generátor s tím, že vždy vygeneruje výsledek generátoru, který jako první dokončil generování, v tomto případě vrátí první spojení, které se připojilo na libovolný otevřený port. Vnitřně funguje tak, že obsahuje frontu výsledků, které přenáší do co_yield a při návratu požádá generátor, jehož výsledek byl předán o nové generování. Poradí si i s konečnými generátory. A i když je primárně určen pro asynchronní generátory, bude fungovat i se synchronními generátory, tam akorát bude docházet k prokládání výsledků generátorů.

Závěrem

Mohl bych pokračovat v psaní, ale článek by už byl hodně dlouhý. Přitom drobných nuancí, které přinesly výrazná vylepšení by se našlo ještě hodně. Některé další informace najdete přímo v README u knihovny, kterou zatím mám napsanou v češtině – anglický překlad chystám. Zároveň chci upozornit, že již dále nebudu vyvíjet starou knihovnu „coroutines_classes“.

Samotný vývoj cocls knihovny považuju za „RC1“, tedy že výrazné změny neplánuju, maximálně se budu soustředit na hledání chyb a optimalizace přičemž se budu snažit minimálně zasahovat do programátorského rozhraní. A slibuju, že napíšu další testy.

Tuto knihovnu také používám v plánované knihovně „coroserver“ – zatím neveřejná, ale pár fragmentů z kódu jsem zde použil pro demonstraci – knihovna by měla implementovat síťový vstup a výstup pomocí korutin (TCP/HTTP server, atd)

[1] https://github.com/ondra-novak/cocls

[2] https://en.cppreferen­ce.com/w/cpp/language/copy_e­lision

Sdílet