C++20: Jak se budí korutiny

30. 10. 2022 21:57 (aktualizováno) Ondřej Novák

Minulý článek vyzněl jako kritika implementace korutin v C++20. To jsem ani nechtěl, naopak si myslím, že je to skvěle vymyšlené s ohledem na užitečnost a vyjadřovací svobodu, kterou to přináší. Problémem je spíš neexistence pravidel a doporučení jak korutiny implementovat.

Nositelem takových doporučení je často právě STL, která ve verzi 20 toho v oblasti korutin mnoho nepřináší, a nejinak je i v nových verzích. V C++23 se chystá std::generator<>, který implementuje synchronní generátor – to je generátor, který nesmí použít co_await (mimochodem v mé implementaci coclasses toto omezení není). Malý krok v rámci pokroku. Neexistence nějakých jednotících pravidel pak může vést na chaos a nekompatibilitu mezi různými implementacemi knihoven. Jen si představte, že si z githubu stáhnete tři knihovny, které používají korutiny a zjistíte, že si mezi sebou nerozumí a pro zaintegrování do vašeho kódu si budete muset napsat čtvrtou knihovnu. Dalo by se to přirovnat například k výjimkovému systému v C++. Vyhazovat jako výjimku lze cokoliv, ale kompetentní programátor si dnes nedovolí vyhazovat výjimku, která nedědí std::exception, protože když taková výjimka vypadne ven, umí se aspoň představit a oznámit, co se vlastně stalo.

O jednom takovém “zádrhelu” je i tento článek. Než se k tomu dostanu, vrátím se k dotazu, který se objevil pod předchozím článkem v komentářích. Jen si ho trošku upravíme. Představte si kód

auto data = co_await socket.read();

Zápis znamená, že kód je součástí korutiny a předpokládá se asynchronní čtení ze socketu. Abych se vyhnul detailům, nebudu se teď zabývat v jaké formě jsou předána ta data. Může to být třeba std::vector<char>. Dotaz zněl, jak je tato operace implementovaná, jestli se třeba vytváří vlákno nebo jak?

Systém korutin tohle neřeší. Operátor co_await přináší mechanismus jak uspat právě běžící kód. Je to podobně jako například zastavení vlákna na condition_variable nebo na mutex. Rozdíl je v tom, že zde není vlákno zastaveno, pouze současná korutina je přerušena a její stav zachován a současné vlákno pokračuje kódem toho, kdo korutinu původně zavolal (nebo probudil, viz dále). Může se tak věnovat něčemu jinému. Například si představte server, který takto získá volné vlákno, aby mohl obsloužit dalšího klienta.

V tomto příkladě asynchronní čtení obstará funkce read objektu socket. Jak? Jak by se to dělalo bez korutin? Typicky nejprve se zkusí socket přečíst neblokovaně (nonblocking mode). Pokud jsou data již připravená, jednoduše se přenesou a je to hotové, žádné uspávání není třeba. Problém je, když data připravená nejsou, pak operace čtení vrátí chybu EWOULDBLOCK. K monitoringu socketu máme funkce select, poll nebo epollZejména poslední vyjmenovaná se s výhodou používá k hromadnému monitorování socketů. K obsluze epollu budeme muset mít nějaký nástroj ve formě sdíleného objektu (sdílený všemi sockety). Takový má zpravidla vyhrazeno jedno nebo více vláken, nějakou mapu monitorovaných socketů a tedy náš socket-objekt nejspíš “outsourcuje” monitoring do takového nástroje. Pak, jakmile se na socketu objeví data, epoll se probere, zjistí jakého socketu se to týká a pošle mu notifikaci ve formě zavolání funkce nebo callbacku. Na základě této notifikace je pak korutina vzbuzena a pokračuje ve výkonu dalšího kódu.

Funkce read() musí implementovat “awaitera”. To je jednorázově vytvořený objekt, který žije ve exekučním frame korutiny počas jejího spánku. Objekt musí implementovat tři metody, které se volají postupně:

  • await_ready - ten vrací bool a slouží k dotazu, zda je již výsledek k dispozici. V našem příkladě to je ideální místo provést pokus o neblokující čtení. Pokud obdržíme EWOULDBLOCK, vrátíme false.
  • await_suspend(h) - ten může být void nebo vracet bool nebo vracet std::coroutine_handle<>. V každém případě se volá v okamžiku, kdy await_ready vrátí false. Mezi tím jsou provedeny kroky k uspání korutiny (uloží se její stav a bod obnovy). V okamžiku zavolání téhle funkce je korutina považovaná za uspanou a tato funkce obdrží handle korutiny, které následně můžeme použít k jejímu probuzení - h.resume() - nebo ke zničení - h.destroy(). Ano, korutinu lze beztrestně “zabít” ve spánku. Pokud je kód korutiny exception safe, pak by to nemělo mít žádné následky, protože všechny destruktory všech dosud vytvořených objektů v rámci korutiny se poctivě zavolají. Návratová hodnota z této funkce může být false pak je korutina ihned zase probuzena – v případě true bude spát dál. Lze ale také vrátit handle jiné korutiny, která je probuzena místo té co právě usnula, takže aktuální vlákno může být využito k běhu jiné korutiny. Tomuto systému se říká symmetric transfer a jeho výhodou je, že nedochází k resume hell, tedy, že hloubka zásobníku se tímto nemění.
  • V tento okamžik tedy korutina spí a někdo jiný drží její handle. Tímto handle ji může probudit, jakmile nastane ten vhodný okamžik. Například, jakmile jsou na socketu připravena data.
  • await_resume() – se zavolá nakonec a nezávisle na tom, jestli mezitím byla korutina uspaná nebo to nebylo potřeba. Smyslem té funkce je vrátit výsledek operace a tedy to co tato funkce vrátí je pak vráceno z operace co_await. Je to také místo, kde lze případně potřeby bezpečně vyhodit výjimku. To je užitečné, protože máme možnost přeposlat výjimku, která mohla vzniknout při plnění asynchronní operace a byla vyhozena v rámci nějakého jiného vlákna a jiného kódu. Pokud takovou výjimku zachytil náš asynchronní nástroj, může tuhle výjimku přeposlat do čekajícího awaitera a ten ji v tomto místě vyhodí ven a tak se o výjimce dozví kód korutiny.

Možnosti awaiterů jsou široké. Norma C++20 dále definuje operator co_await kterým lze konvertovat jakýkoliv objekt na awaiter a tento může být definován jak na objektu, na který se bude čekat, tak i jako globální funkce (podobně jako jakýkoliv jiný operátor) – tam máme možnost “naučit” pracovat s co_await i objekt, který původně pro korutiny nebyl určen. Jako příklad (sice ne moc dobrý) by mohla být konverze std::future na awaitera přes:

template<typename T>
auto operator co_await(std::future<T> &);
  • je jasné, že celé chování awaitera musíme naimplementovat
  • i když si nedokážu moc dobře představit implementaci se současným ne moc dobrý stavem toto nástroje

Jak správně budit korutinu

Chtěl jsem ukázat na awiaterech, jaké široké možnosti jsou k dispozici pro uspávání korutin. Ale jak je to s jejich buzením? Standard zde nabízí jedinou funkci

h.resume(); //kde h je typu std::coroutine_handle<>

případně ve variantě callable

h();

A to je vše přátelé. Konec článku.

Dělám si legraci, tam určitě bude nějaký zádrhel. To co tato funkce udělá je, že vytvoří nový zásobníkový rámec a v něm obnoví stav vybrané korutiny a tu spustí z místa, kde naposledy byla korutina uspána. Standard neřeší, v jakém kontextu to volání probíhá a zda vůbec je vhodné na tomto místě korutinu probudit. Vraťme se k našemu příkladu se socketem. V okamžiku, kdy se na socketu objeví data, první to zjistí epoll, který je provozován v nějakém vlákně. To nakonec pošle notifikaci objektu socket a ten probudí korutinu. To vše se děje ve stejném vlákně. Běda pokud ta korutina se rozhodne na základě vrácených dat spustit nějaký komplikovaný výpočet. Tím jsme si dokonale zablokovali náš monitorovací nástroj!

Jak tedy správně probudit takovou korutinu? Nelze ji probudit v původním vlákně (ve vlákně, ve kterém byla poprvé uspána), protože to vlákno již provádí jiný kód. A neexistuje způsob, jak tomu vláknu říct, aby se teď chvíli věnovalo probuzené korutině. Leda že by vlákno obsahovalo nějakého dispečera (angl. dispatcher), tak jak bývá zvykem v jiných programovacích jazycích, třeba v Javascriptu, nebo v Javě. Typicky hodně jazyků, které se chtějí vyhnout složitému programování vícevláknových aplikací převádí problém na korutiny s jedním vláknem implementující dispečera. Korutina je vložena do fronty dispečera a jakmile dispečer převezme řízení, vyzvedne korutinu z fronty a tu probudí.

V C++ standardní dispečer není, a nikdy nebude (ale je ve WinAPI, můžete si handle korutiny přeposlat přes PostMessage a vyzvednout v GetMessage).

Náš monitorovací nástroj se může bránit tím, že má k dispozici thread pool a má možnost  pro korutinu alokovat další vlákno (thread_pool má funkci alokace vlákna pro korutinu) Tohle je ještě jednoduché, ale představme si jiné programovací primitiva v rámci korutin. Například mutex nebo frontu (queue). Mutex podporující korutiny má funkci co_await lock(), který uspí korutinu dokud je mutex zamčený a probudí, jakmile je mutex odemčen. Fronta může mít funkci co_await pop(), která uspí korutinu, když je fronta prázdná a probudí ji, jakmile je do fronty vložen prvek. Obě primitiva řeší svou funkcionalitu a nejsou zodpovědné za systém buzení korutin. Kdy se vzbudí korutina, která čekala na mutex?

  • Ihned, jakmile je mutex odemčen – tady vzniká mnou oblíbený resume hell. Představte si, že mutex je odemknut jinou korutinou. A takových korutin čeká na mutex třeba 100. Každá která získá mutex, něco provede a odemkne ho, je přerušena rekurzivně další korutinou. Takto se vytvoří 100 zásobníkových framů přerušených korutin na sobě.
  • Neprobudí se hned, ale “až někdy”, třeba až ta korutina, která zámek odemkla, je uspána nebo skončí – tímto se lze zbavit resume hell – viz dále
  • Alokuje se nové vlákno a korutina poběží paralelně – rozhodně náročné na zdroje degradující smysl korutin
  • Korutina je probuzena ve svém mateřském vlákně přes dispečera - vyžaduje dispečera

Původně jsem měl v plánu napsat článek, kde obhájím, jeden z výše vyjmenovaných systému. Ale pak mi můj kolega z práce navrhl, ať si všechny případy nadefinuji jako resumption policies (inspirace v execution policy)Každá korutina přitom může chtít být buzena jinak.

Problém je, že o buzení rozhoduje někdo jiný, někdo, kdo o korutině neví zhola nic. Mutex, queue, atd…

Nejbližší vztah s korutinou má její awaiter. A ze narážíme na nedotaženost standardu, protože neexistuje žádné pravidlo, žádná směrnice, jak tomu, kdo korutinu bude budit, předat informaci o tom, jak si daná korutina přeje být buzena. A nejspíš se bude stávat, že korutiny budou z jedné knihovny a awaitable primitiva budou z jiné knihovny.

Resumption policy

V následujícím systému budu popisovat vlastní systém, tedy jde jen o jakýsi návrh nebo dočasné řešení, než se mezi programátory rozšíři nějaký jiný systém, ať už samovolně nebo to někdo nadefinuje v normě

Rozšiřme si definici třídy task<>

template<typename T, typename resumption_policy = void>
class task;

Korutina s takovou deklarací si přes resumption_policy může definovat vlastní pravidla pro svoje buzení. Například

cocls::task<int, cocls::resumption_policy::dispatcher> example(int x) {
         co_await … neco…
         co_return 42;
}

Výše uvedená korutina chce být buzena přes dispečera vlákna ve kterém byla vytvořena. V knihovně coclasses najdete třídu cocls::dispatcher, kterou můžete instanciovat v libovolném vlákně

Nadefinoval jsem několik policies, zde je jejich výčet, všechny se nachází v namespace cocls::resumption_policy

  • immediate - korutina je vzbuzena okamžitě jakmile to je možné. Tohle je přímé volání funkce .resume().
  • queued - pokud v aktuálním vlákně bylo spuštěno více takových korutin, je v rámci thread_local založena  fronta. Probouzení pak probíhá přes frontu tak, že korutina je vložena do fronty a je probuzena, jakmile současná korutina je ukončena nebo uspána. Pokud kód není v korutině, je korutina probuzena okamžitě, ale současně s ní vytvoří fronta a další buzení se provádí přes frontu
  • parallel - probuzení korutiny je vždy realizováno v nově alokovaném vlákně
  • thread_pool - korutina je probuzena v předem nastaveném thread poolu, tedy je jí alokováno volné vlákno v tomto poolu. Jak se takový thread pool nastaví viz dále
  • dispatcher - korutina je probuzena ve vlákně ve kterém byla vytvořena, pokud to bylo vlákno s dispečerem. V opačném případě je třeba ji dispečera přiřadit
  • void - představuje výchozí globální policy, která je nastavená na queued ale lze ji změnit (pomocí částečné specializace šablony unspecified<void>)

Jako programátor máte možnost si  definovat vlastní policies, ale tím se teď nebudu zabývat

Implementace resumption policies ale není žádný standard, zde jsem si musel rozšířit pravidla o další body. Například pravidla kolem awaiterů. Normálně totiž neexistuje způsob, jak by korutina předala awaiterovi informaci o tom, jakou policy chce pro své probouzení použít. Dovolil jsem si tedy vedle tří povinných funkcí awaitera (viz nahoře) přidat čtvrtou funkci

template<typename Awt, typename Policy>
static auto set_resumption_policy(Awt &&awt, Policy &&policy)

I když musí být funkce deklarovaná na awaiteru, je deklarovaná jako static a instanci awaitera obdrží jako parametr. To umožňuje dědit tuto funkci do potomků awaitera a přitom předat jeho posledního potomka. Druhým parametrem je instance vlastní policy. Funkce musí vytvořit kopii awaitera, který má nastavenou policy a slibuje tak, že při buzení bude tuto policy respektovat. Výsledný awaiter se použije pro funkci co_await. Jak má ale korutina řídit awaitery na které chce čekat, aniž by to musel řešit programátor ručně? Norma nám dává do ruky nástroj ve formě funkce deklarované ve své promise třídě.

template<typename X>
auto await_transform(X &&awt);

Tato funkce, pokud je definovaná, je volána jako první v případě, že kód narazí na co_await. A teprve to co tahle funkce vrátí se považuje za awaitera a na něho se “čeká”.

A teď si jen představte ten template hell, který tam vznikne. Pokud jde jen o zavolání set_resumption_policy, pak je třeba zjistit, jestli předaný objekt vůbec tuhle funkci deklaruje. Do toho je třeba simulovat standardní chování, například zkusit zavolat operator co_await, protože tohle volání se jinak automaticky volá jen v případě, že await_transform není definován. Atd. Tohle peklo jsem se pokusil vyřešit v coclasses.

Takže pokud korutina musí co_await na něco, co funkci set_resumption_policy nepodporuje, pak nemá šanci ovlivnit své probuzení. Je třeba s tím počítat při návrhu kódu. Policy v tomto případě není povinné dodržet, je to spíš takové “přání”. Pokud tedy korutina bude čekat na něco, co nemá nic společného s mým systémem, tak má smůlu. Ale naštěstí by neměl být problém takový kód aspoň přeložit.

Nicméně je třeba říct, že v některých případech nechceme policy dodržet. Třeba co_await nad thread_poolem je interpretováno jako přesun korutiny do thread poolu. To se realizuje jako uspání korutiny a její probuzení v jiném vlákně. V takovém případě asi nechceme aby nám do toho resumption policy mluvila. Stačí prostě výše zmíněnou funkci nedeklarovat.

System resumption policy je definován jen pro task<> a nepoužívám jej u generátorů. U synchronních generátorů vůbec není potřeba, protože tam se generátor spouští v kontextu vlákna, který ho volá. U asynchronních generátorů by to asi smysl mělo, nicméně tam se předpokládá, že generátor nakonec udělá co_yield, který ho přeruší a uvolní vypůjčené vlákno. Pokud se na asynchronní generátor čeká synchronně, volající po celou dobu blokuje své vlákno, a pouze v případě, že asynchronní generátor je volán z korutiny, která na něho čeká přes co_await, tam se resumption policy uplatňuje, protože generátor poskytuje kompatibilní awaiter.

Spuštění korutiny pod danou policy

Resumption policy má vliv i na spouštění korutiny. Na to lze hledět také jako na vytvoření korutiny a jeji probuzení – pak je probuzena na svém začátku. I tady se na to může aplikovat resumption policy. Tento případ se ale v rámci implementace policy řeší odděleně.

  • immediate - žádná změna, korutina je spuštěna okamžitě
  • queue - zde korutina je spuštěna okamžitě, fronta se používá jen při probuzení uspané korutiny
  • parallel - korutina je okamžitě spuštěna v nově alokovaném vlákně
  • thread_pool - korutina je po zavolání uspána, je třeba provést další krok k jejímu probuzení
  • dispatcher - korutina je naplánována v aktuálním dispečeru, pokud aktuální vlákno nemá dispečera, je po zavolání uspána a je potřeba provést další krok

Problém nastává u resumption policy, která vyžaduje další parametry. Korutina neumožňuje jakkoliv definovat parametry policy při svém zavolání. Tam prostě není žádný prostor, jak parametry předat, předávají se pouze parametry vlastní korutiny.

K tomuto účelu jsem zavedl další metodu přímo ve třídě task<>

template<typename ... Args>
void initialize_policy(Args && ... args)

Příklad

cocls::task<void, cocls::resumption_policy::thread_pool> example(int)
{
}

auto pool = std::make_shared<cocls::thread_pool>(8);
auto mytask = example(42); //korutina startuje uspaná
mytask.initialize_policy(pool);  //teď se korutina vzbudí v thread poolu

Závěr

Ano, vypadá to velice složitě, taková “raketová věda”. Ale složité je to spíš pro tvůrce knihoven, nemělo by to být složité pro uživatele. Já jsem chtěl jen ukázat problémy, které vidím a navrhnout své řešení. To řešení naleznete v mé veřejně dostupné knihovně (coclasses). Můžete to používat (licence MIT), nemusíte, můžete se jen inspirovat.

Netvrdím, že se systém resumption policy dostane do nějakého budoucího standardu. Ale nějaké sjednocení pravidel nejspíš bude muset přijít. Do té doby si programátor bude muset vybrat vhodnou knihovnu pro práci s korutinami a výsledkem bude nejspíš chaos a zmatky kolem kompatibility. Ale což, nic na co bychom nebyli u C++ zvyklí léta. Vyjadřovací svoboda má své stinné stránky.

V nějakém dalším článku bych se podíval na praktické zkušenosti s korutinami – tam už bude méně toho “temného chaosu” pod povrchem. Na povrchu je vše hezky “sluníčkové”.

Sdílet