Uplynul nějaký čas kdy jsem se musel věnovat jiným úkolům, ale ke korutinám jsem se vrátil. Často se vyplatí nechat téma „uležet“ a vrátit se později s úplně jiným pohledem na celou věc. Zvlášť, když člověk za to dobu spoustu věcí zapomene a pak když se k tomu vrátí, má najednou pocit, že je všechno až zbytečně komplexní (překomplikované). Takže jsem se rozhodl svou knihovnu na korutiny lehce přepracovat.
Novou knihovnu najdete na nové adrese a jmenuje se jednoduše CoCls [1]. Stejný je i namespace, ve kterém jsou všechny třídy deklarované. Celou knihovnu tvoří jen hlavičkové soubory, které lze do projektu přímo includovat bez nutnosti kompilovat a připojovat libku. Do projektu jsem také kromě příkladů použití přidal i adresář obsahující testy. Testy tedy zdaleka nepokrývají veškerou funkcionalitu, to se snad v budoucnu zlepší.
Snažil jsem se výrazně redukovat funkcionalitu a komplexitu mé původní knihovny, kterou jsem zde v předchozích postech popisoval. Původní projekt totiž začal bobtnat a komplexita rostla, a já mám radši, když je projekt co nejvíc KISS (Keep it simple stupid)
Namísto explicitní specifikaci resumption policy jsem se rozhodl ponechat pouze jedinou policy a to (v původním označení) „queued“. Tato policy je implementovaná v podobě singleton třídy coro_queue. Ta pracuje s kontextem aktuálního vlákna (thread_local) tak, že eviduje zda vlákno exekuuje korutinu, nebo normální kód. V tzv „coro-mode“ stačí, aby v zásobníku vlákna byl aspoň jediný frame patřící běžící korutině a nezáleží na to, v jak hluboko v zásobníku se ten frame nachází.
Samozřejmě jsem zvažoval i možnost plánování korutin napříč vlákny, tak jak se to děje v jiných jazycích. Mohl bych se inspirovat gorutinami z jazyka Go. Avšak tohle řešení má nevýhody. Vyžadovalo by to zavedení globálního thread poolu. Já se však snažím naprogramovat knihovnu, nikoliv framework, takže s globálními objekty bych v tomto případě byl velice opatrný. Představte si, že by každý malý program používající mou knihovnu startoval thread pool s desítkou běžících vláken! Také by to znamenalo, že všechny korutiny by musely automaticky předpokládat, že poběží paralelně, což by vedlo na nutnost používat zámky na všech úrovních. Ten hlavní „tahák“ (selling point) pro korutiny je ale v možnosti kooperativního multitaskingu, který často postačuje a kde naopak využíváme toho, že korutiny nepoběží paralelně a zámky nepotřebujeme.
Oba požadavky mi ani neseděly do konceptu KISS.
To ovšem neznamená, že by to v rámci nabízených nástrojů nešlo zařídit. Jde jen o to, že je to ve výchozím stavu řešeno pouze v rámci aktuálního vlákna (viz dále)
Tzv. režim „coro-mode“ byl zaveden hlavně kvůli kooperativního multitaskingu. Pokud nějaký kód je implementován jako korutina, lze předpokládat, že právě kvůli její schopnosti se uspat na nějaké asynchronní operaci. Takže pokud v rámci tohoto kódu se nějaká jiná korutina má probudit, nebudu jí budit okamžitě, ale nachystám si tu korutinu k případnému probuzení, jakmile současná korutina se uspí. Mám tak k dispozici frontu korutin čekajícíchna svou příležitost. Zároveň tím dochází k optimalizaci používání zásobníku. Pokud by tento mechanismus neexistoval, každá probuzená korutina by si alokovala nový rámce v zásobníku a najdou se i algoritmy pro korutiny, které by v takovém případě vyčerpaly celý zásobník.
Aby připravená korutina mohla běžet, musí jiná korutina být uspána. Může se ale stát, že běžící korutina nemá nic, na čem by se mohla uspat. Proto vznikl příkaz pause(), který voláme spolu s co_await. Aktuálně běžící korutina se uspí, čímž uvolní místo korutině připravené ve frontě, zároveň sama sebe zařadí na konec fronty.
co_await pause();
Jednoduchý příklad kooperativního multitaskingu
#include <iostream> #include <cocls/future.h> cocls::async<void> test_task(int id) { for (int j = 0; j < 10; j++) { for (int i = 0; i < id; i++) std::cout << "\t"; std::cout << j << std::endl; co_await cocls::pause(); } } cocls::future<void> test_cooperative() { for (int i = 0; i < 5; i++) { test_task(i).detach(); } co_return; } int main(int, char **) { test_cooperative().join(); }
Pro zahájení kooperativního multitaskingu je třeba nejprve spustit korutinu, která následně spustí všechny korutiny, které se mají ve vlákně střídat. Pak skončí. V rámci „coro-mode“ vlákna nedojde k ukončení běhu korutiny dokud existují připravené korutiny. Následně tedy běží 5 korutin které se přes pause() střídají v běhu. Teprve ukončením poslední korutiny dojde k návratu do main() a k ukončení programu.
V knihovně existuje mnoho míst, kde zavoláním funkce nebo metody dojde k probuzení korutiny. Pokud je to v rámci normálního vlákna, dojde k okamžitému spuštění dané korutiny, pokud je to v rámci „coro-mode“, taková korutina se pouze připraví k běhu zařazením do fronty v aktuálním vlákně. Ani jedno řešení nemusí být v danou chvíli vhodné. V normálním vlákně může dojít ke spuštění korutinu uvnitř nějakého zámku a zablokovat tím daný prostředek na delší dobu. V „coro-mode“ by zase mohlo dojít k vyhladovění dané korutiny. Řešení nabízí tzv. suspend_point, který je vracen právě v situaci, kdy nějaká funkce připravila korutinu (nebo i množinu korutin) k běhu, avšak tyto korutiny ještě nebyly probuzeny. Objekt slouží volajícímu, aby naplánoval vhodný bod ke svému uspání a ke spuštění připravených korutin, přičemž tento objekt zároveň nese seznam korutin, které je třeba spustit. Příklad:
Mějme korutinu, která čeká na future a k ní existuje svázaná promise. Pokud některá funkce nastaví hodnotu promise, stane se korutina čekající na future „připravena k běhu“. Funkce nastavující promise vrátí suspend_point volajícímu. Ten může provést co_await na tomto objektu a tím je uspán a namísto toho je spuštěna připravená korutina a ta si může hodnotu vyzvednou. Je to taková operace ručního přepnutí.
cocls::promise<int> p = ... // nějaká promise co_await p(42); //nastav svázanou future na 42 a přepni do čekající korutiny
Pokud se toto stane ve funkci, která není korutinou, může být suspend_point poslán volajícímu, který už může být korutinou a kde lze provést uspání.
cocls::suspend_point<bool> resolve() { cocls::promise<int> p = ... // nějaká promise return p(42); } cocls::async<void> coro() { //... co_await resolve() //... }
Objekt suspend_point lze libovolně přenášet kódem i mezi vlákny a dokud není provedeno co_await, nejsou připravené korutiny spuštěny. Pokud však suspend_point skončí v normálním vlákně, pak jeho zahozením (destrukcí) dojde k přerušení běhu aktuálního vlákna a k probuzení všech čekajících korutin v tomto vlákně (v rámci jeho destruktoru), takže připravené korutiny se nakonec probudí. Vhodnou správou objektu suspend_point však můžeme vybrat místo, kde nám přerušení vadí nejméně.
Reakce na suspend_point je nepovinná. Pokud je objekt při návratu z funkce zahozen, připravené korutiny jsou spuštěny podle pravidel v aktuálním vlákně. I tak má smysl objekt používat, například umožňuje přenést odpovědnost za spuštění korutin mimo tělo volaného, tedy vně případných zámků a vně kontextu funkce, který by jinak mohly ovlivnit.
Objekt suspend_point je optimalizován pro 3 připravené korutiny, kdy nepotřebuje alokovat paměť na haldě. Pro víc jak 3 připravené korutiny pak alokuje pole na haldě. Pokud se mi v kódu sejde víc suspend pointů, pak se dají slučovat pomocí operátoru <<
//připrav první korutinu suspend_point<void> p1 (coro1.detach()); //připrav druhou korutinu suspend_point<void> p2 (coro2.detach()); //sloučený bod suspend_point<void> c; //sluč do c c << p1 << p2; //přidej tam ještě třetí korutinu c << coro3.detach(); //a teď se uspi a nechť ostatní běží co_await c;
Jen krátce se zmíním o možnost nechat suspend point aktivovat v thread poolu. Pak skutečně dojde k tomu, že se čekající korutiny spustí v tomto thread poolu a to tak, že každá dostane přidělené jedno vlákno (případně se zařadí do fronty na volné vlákno).
cocls::thread_pool pool; //sloučený bod suspend_point<void> c = ... // viz předchozí příklad pool.resume(c); //spusť všechny tři korutiny ve třech vláknech
Protože suspend_point<> zabere návratovou hodnotu funkce, nebylo by možné předat výsledek funkce, proto suspend_point umožňuje svázat objekt s libovolnou hodnotou, jejíž typ se specifikuje parametrem šablony
Tato hodnota je vrácena jako výsledek operace co_await po tom, co je aktuální korutina opět probuzena. Mimo korutinu je výsledek dostupný přímo přiřazením objektu do proměnné patřičného typu
Název jsem si nechal poradit přes ChatGPT :-)
Ve skutečnosti hlavním smyslem je čitelnost u rozhraní. Pokud máte rozhraní
suspend_point<bool> cancel_event(ident id);
…pak na první pohled je vidět, že funkce by ráda, aby se volající uspal (ona sama to udělat nemůže, protože není korutinou), tím by mohla dokončit požadovanou operaci, která zřejmě probíhá v korutině. Předává mu tedy bod uspání, a očekává, že volající se skutečně uspí. Vstupuje do toho i ona „nepovinnost“, volající to nemusí udělat, a pak se vše řídí pravidly popsanými výše. Zároveň z popisu rozhraní vidíme, že výsledkem operace je i nějaký boolean, který by mohl nést nějakou informaci.
Pojďme se nyní podívat na vlastní korutiny, nejprve tedy na async<T>. Parametrem šablony je návratová hodnota z korutiny, přičemž povolen je i typ void: async<void>
async<int> korutina(int arg) { co_return arg+42; }
Pokud takovou korutinu zavoláme, dojde pouze ke konstrukci objektu async<int> ale samotná korutina se nespustí. Chová se to jako konstruktor, který vytvoří objekt korutiny s předanými parametry. Pro spuštění musíme udělat ještě jeden krok, máme přitom na výběr:
korutina(0).detach(); //----------- promise<int> p = ... //získej promise někde korutina(1).start(promise); //----------- future<int> f = korutina(2).start(); int result = co_await f; //----------- int result = co_await korutina(3); //----------- int result = korutina(4).join();
Objekt async<> lze před spuštěním korutiny přesouvat pomocí std::move(). Jakmile je však korutina spuštěna, pak se objekt stane neplatným a nemůže být použit k přístupu k dané korutině. Spuštěním se totiž stav korutiny přesune „do útrob“ knihovny, kde je potřeba mít k němu výhradní přístup.
Knihovna umožňuje použít zkratku pro převod async<T> na future<T>. Objekt future<T> často budeme používat jako návratovou hodnotu z mnoha asynchronních funkcí – spíš než samotný async<T>, který má mít jen lokální použití. Pokud však naše korutina je vždy spouštěna přes start() a vrací future<T> pak lze návratovou future<T> použít místo async<T> a přesto psát kód pro korutinu. To není zavedení nového typu korutiny, jde jen o zkratku, protože takto deklarovanou korutinu stále implementuje async<T>, byť to tam není uvedeno.
cocls::future<int> korutina2(int x) { co_return x+42; }
Výše uvedený kód je ekvivalentní zápisu:
cocls::async<int> korutina2_coro(int x) { co_return x+42; } cocls::future<int> korutina2(int x) { return korutina2_coro(x).start(); }
Skutečným důvodem k přepsání a zjednodušení knihovny cocls byl větší důraz na návrhový vzor future-promise. Na začátek bych chtěl upozornit, že zde nepopisuju std::future a std::promise ale objekty knihovny cocls. Tedy cocls::future a cocls::promise. Jejich účel je sice podobný, ale významně se liší rozhraním i způsobem použitím a také tím, že cocls::future podporuje operaci co_await
Tento vzor umožňuje vzdálenou synchronizaci mezi kódem, který produkuje nějakou hodnotu a kódem, který tu hodnotu konzumuje. Jedná se však o jednorázovou synchronizaci, tedy jakmile je hodnota produkována, předá se čekajícímu kódu a tím je vztah mezi těmito části kódu ukončen. Pokud by měl vztah pokračovat, například jde o stream hodnot, pak je lepší použít frontu (queue). V praxi ale jednorázovou synchronizaci používáme častěji, protože je jednodušší, často žádáme a čekáme na jednu hodnotu, více hodnot zpravidla požadujeme v cyklu po jedné a odpadá například problém s „rušením vztahu“. Je to podobné jako request-response. Ke každému requestu (volání funkce) jedna odpověď (produkování jedné hodnoty). Jednorázové použití též zjednodušuje vlastní implementaci, kterou lze realizovat plně lock-free (tedy bez použití zámků)
Pokud navrhujeme nějaké abstraktní rozhraní, používání typů představující korutiny jako návratové hodnoty je velmi nepraktické. Narušuje to celý smysl abstrakce, protože předepisuje, aby daná funkce v patřičném rozhraní byla implementována jako korutina. U rozhraní, která nabízí více implementací však zpravidla nechceme vynucovat způsob implementace. Objekt future<T> tak představuje vyšší úroveň abstrakce, pouze oznamuje, že výsledek nějakého volání bude dostupný později. To jestli na druhé straně rozhraní bude spuštěna korutina, nebo se výsledek spočítá jinak není pro použití rozhraní rozhodující. Možností je i synchronní výpočet, protože objekt future<T> lze vytvořit již s nastavenou hodnotou.
Příklad jednoho rozhraní pro asynchronní streamy
class IStream { public: virtual ~IStream() = default; virtual cocls::future<std::string_view> read() = 0; virtual void put_back(std::string_view buff) = 0; virtual cocls::future<bool> write(std::string_view buffer) = 0; virtual cocls::future<bool> write_eof() = 0; virtual cocls::suspend_point<void> shutdown() = 0; };
Při implementaci cocls::future<T> jsem se snažil brát maximální ohled na výkon. Hlavním požadavkem bylo, aby objekt nealokoval žádnou paměť na haldě. Objekt často „sedí“ ve frame korutiny a s výhodou využijeme faktu, že nikam neodejde, dokud není slib splněn a promise zavolána. Pro fungující synchronizaci potřebujeme pevný bod v paměti. Z toho důvodu jsem objektu future<T> zakázal přesun i kopírování. Je zřejmé, že nemůžeme potřebovat, aby objekt měnil adresu, to by velice komplikovalo synchronizaci. V knihovně STL se tento problém řeší právě tím, že se sdílený prostor alokuje na haldě. Alokace však stojí drahocenný výkon procesoru.
Tato „drobnost“ způsobuje, že práce s objektem není v některých situacích intuitivní a postupně si rozebereme problémy, které to přináší i nastíníme řešení. Chtěl jsem jen na začátek odůvodnit mé strategické rozhodnutí. Z vlastní zkušenosti dodám, že v drtivé většině případů tato omezení nejsou překážkou. A opět i tady platí, že programátor má možnost se z tohoto omezení vymanit a vybrat si. Pokud potřebuje aby se s objektem dalo hýbat, musí si jej alokovat na heapu ručně.
Jak už jsem psal, vždy vzniká dvojice future a promise a zpravidla promise zůstává na straně implementace dané funkce, zatímco future je vrácena z funkce. I když má future<T> zakázané kopírování i přesun, nebrání to vracet future<T> jako hodnotu a to díky garantovanému copy elision [2], které bylo zavedeno už v C++17. Podmínkou je, že objekt konstruujeme ve výrazu return. A toho se využívá při psaní funkcí, které future<T> vrací.
cocls::future<int> calc_async(int x) { //------------ vytvoř future a promise return [&](cocls::promise<int> promise) { //------------- práce s promise ----------- std::thread t([promise = std::move(promise), x] () mutable { promise(x+42); }); t.detach(); }; }
Pro konstrukci future<T> použijeme lambda funkci, která obdrží svázanou promise. Tu můžeme přesunout například do vlákna, kde se odehraje příslušný výpočet. Objekt promise je přesouvatelný ale není kopírovatelný, proto musíme použít std::move(). Vlastní funkce vrátí objekt future<T> bez hodnoty a v okamžiku dokončení výpočtu ve vlákně se hodnota nastaví skrze právě svázanou promise()
Kromě výsledku lze přes promise předat i výjimku. Třeba následujícím způsobem
cocls::future<int> calc_async(int x) { return [&](cocls::promise<int> promise) { std::thread t([promise = std::move(promise), x] () mutable { try { promise(run_long_calculation(x)); } catch (...) { promise(std::current_exception()); } }); t.detach(); }; }
Pokud by ve funkci run_long_calculation došlo k výjimce, vlákno, ve kterém výpočet běží, nemá jak výjimku zpracovat. Zde můžeme výjimku zachytit funkcí přes std::current_exception() a s výsledkem zavolat promisu. Tato výjimka je pak předana do future<int> místo výsledku a při pokusu získat výsledek se vyhodí v čekající funkci a ta na ní může patřičně reagovat.
Co se stane, když je promise zničena, aniž by došlo k jejímu zavolání? I to je ošetřeno. Došlo k porušení slibu a objekt future na to zareaguje. Přepne do stavu, kdy sice signalizuje nastavení hodnoty, ale hodnotu nemá, a ani výjimku. V případě pokusu načíst hodnotu se vyhazuje výjimka cocls::await_canceled_exception(). V rámci rozhraní objektu future<T> lze testovat tento případ i explicitně. Je zde funkce has_value(), která vrací true, pokud future<T> má hodnotu nebo výjimku, a false pokud je to ten druhý případ. Objekt future se v tomto případě chová jako optional<>. V korutině můžeme použít co_await future.has_value() se stejným významem s tím, že korutina se uspí, pokud future<T> ještě stále na svou hodnotu čeká.
Dropnutí promisy lze provést i ručně
promise(cocls::drop);
… se stejným výsledkem. Tohoto se hojně využívá u generátorů, které na konci svého běhu vrací future<T> bez hodnoty čímž oznamují své dokončení (konec generování)
Pokud objekt future<T> propadne do korutiny, není nic jednoduššího, než použít co_await k získání výsledku. Dokonce ani není nutné proměnnou deklarovat
int result = co_await calc_async(123)
Tento zápis způsobí, že korutina se uspí dokud není znám výsledek výpočtu. Toto je asi nejčastější použití.
Pokud objekt future<T> musíme řešit mimo korutinu, je zde k dispozici funkce wait() která zablokuje aktuální vlákno, dokud se hodnota future<T> nenastaví. Výsledek je pak vrácen. Stejně se chová i operátor * (dereference)
int result = *calc_async(123) // int result = calc_async(123).wait()
Pozor na to, že synchronní čekání bude fungovat i v korutině, kde je ale kontraproduktivní, proto je třeba kód navrhnout tak, aby v případě, že kód běží v korutině bylo na čekání použit co_await namísto dereference nebo funkce wait(). Žádný interní mechanismus nám to neohlídá. Všude tam, kde chceme použít wait() se ujistěte, jestli náhodou nejde o korutinu.
Na druhou stranu, dereferenci (*) lze použít i v korutině na místě, kde už bezpečně víme, že future hodnotu má.
cocls::future<int> f = calc_async(); if (co_await f.has_value()) { //čekáme na hodnotu std::cout << *f << std::endl; //tady už ji máme
Protože objekt future<T> není kopírovatelný ani přesouvatelný, špatně se s ním pracuje na mnoha místech. Ona nepřesouvatelnost znamená, že není možné použít například std::make_unique<> čímž by se dalo zajistit jeho přesouvatelnost. Vlastně jediná možnost je použít staré dobré new
std::unique_ptr fut(new auto(calc_async(123))); //nyní je fut přesouvatelná
Jiné možnosti se vlastně nenabízí a viditelnost new v kódu nebudí důvěru (z hlediska code review). Proto bylo rozhraní future<T> obohaceno o některé zkratky (nechci říkat hacky, i když se to slovo tady nabízí), které práci usnadňují. Jsou zpravidla založené na použití new (v placement verzi)
Jedna verze už byla představena a to konstrukce pomocí lambda funkce, která obdrží promise objekt. Další možností je konstrukce pomocí lambda funkce, která vrací future<T>. To oceníme tam, kde potřebujeme jakoby „přesunout“ objekt do nějakého kontejneru, ačkoliv o přesun ve skutečnosti nejde.
auto movable = std::make_unique<cocls::future<int> >([&]{return calc_async(123);}); std::optional<future<T> > opt_fut; opt_fut.emplace([&]{return calc_async(123);});
Důvod, proč nefunguje přiřazení je ten, že přiřazení vyžaduje přesouvatelný objekt. Proto byl zaveden operátor << který umožňuje přiřadit do proměnné future<T> pomocí lambda funkce
cocls::future<int> result; int i; do { result << [&]{return calc_async(i++);}; std::cout << (co_await result) << std::endl; } while (*result != 0);
Aby bylo možné do future<T> takto přiřadit, je nutné, aby proměnná nebyla ve stavu, kdy čeká na hodnotu, tedy nikde „nelítá“ reference v podobě svázané promise.
Příklad: Awaiter, který čte ze streamu a vrací znaky – ty načítáme přes co_await, ačkoliv by se mohlo zdát, že používání objektu bude značně neefektivní, opak je pravdou, překladače jsou fakt dobré a pochopí, co to má dělat. Používá se přiřazení do future<T> (odebral jsem nepodstatné věci).
class CharacterReader: public cocls::awaiter { public: CharacterReader(Stream s):_s(s) {} bool await_ready() const { return _ptr != _end; } bool await_suspend(std::coroutine_handle<> h) { set_handle(h); _fut << [&]{return _s.read();}; //přiřazení do future return _fut.subscribe(this); } int await_resume() { if (_ptr == _end) { std::string_view &text = _fut.value(); if (text.empty()) return -1; _ptr = text.data(); _end = _ptr+text.size(); } return *(_ptr++); } ~CharacterReader() { _s.put_back(std::string_view(_ptr, _end-_ptr)); } protected: Stream _s; cocls::future<std::string_view> _fut; const char *_ptr = nullptr; const char *_end = nullptr; }; //použití CharacterReader rd(stream); char a = co_await rd; char b = co_await rd; ....
Vrácena future<T> se musí vždy zpracovat, počkat na hodnotu a teprve pak se může zničit. Předčasné zničení je chyba a vede na UB (a debugu na assert). Co když potřebujeme výsledek zahodit, a tedy nečekat zbytečně na dokončení výpočtu. Pokud nemůžeme problém obejít tím, že zavolat async<T> přímo v režimu detach(), pak zbývá cocls::discard(). I v tomto případě použijeme lambdu.
cocls::discard([&]{return calc_async(123);});
Funkce discard funguje tak, že pro future<T> alokuje místo na heapu a jakmile je nastavena hodnota, tak paměť automaticky dealokuje. Tímto se můžeme zbavit závazku čekat na výsledek future<T>
Někdy potřebujeme future sdílet mezi mnoha vlastníky, a k tomu použijeme shared_future<T>. Protože se zpravidla tento objekt nevyskytuje jako návratová hodnota (používáme future<T>), je potřeba opět použít trik s lambdou pro konstrukci objektu
cocls::shared_future<T> fut([&]{return calc_async(123);});
Sdílená futura již může být kopírována, čímž dochází k jejímu sdílení. Je zřejmé, že v tomto případě se prostor pro future alokuje na heapu. Sdílená futura má také tu vlastnost, že během čekání na výsledek je započtena jedna reference navíc, takže není chybou zahození všech referencí před nastavením výsledku. Prostě jakmile je nastaven výsledek a v ten okamžik neexistuje žádná reference, pak je zároveň futura zahozena. Sdílené futury mohou najít uplatnění u cache, kdy můžeme slot v cache alokovat před naplněním hodnotou a každý, kdo do cache sáhne později bude vědět, zda musí na naplnění cache čekat, nebo už je výsledek k dispozici. Každopádně bude vidět, že záznam v cache už existuje a není třeba jej zakládat.
Poslední co zmíním a co doznalo změny, je generator. Ten nabízí synchronní ale i asynchronní generátor, a také generátor s argumentem (v obou variantách).
Generátor může být s parametrem nebo bez parametru (klasický)
Generátory lze dělit i z hlediska počtu cyklů
Oba typy generátorů lze kdykoliv přerušit. Je třeba to udělat v době, kdy generátor čeká v co_yield. Stačí pouze generátor zdestruovat – automaticky se zavolají destruktory všech objektů vytvořených během generování, ale je třeba dát pozor, aby kód generátoru byl „exception safe“ – minimálně kolem co_yield – protože se to chová jako by nastala nezachytitelná výjimka.
Z hlediska API lze použít synchronní a asynchronní přístup
Základní API nabízí funkce .next() a .value()
cocls::generator<int> fibo(int count) { int a = 0; int b = 1; for(int i = 0;i < count; i++) { int c = a+b; co_yield c; a = b; b = c; } } int main() { cocls::generator<int> gen = fibo(15); while (gen.next()) { std::cout << gen.value() << std::endl; } return 0; }
V asynchronním režimu lze funkci .next() zavolat přes co_await
cocls::generator<int> gen = fibo(15); while (co_await gen.next()) { std::cout << gen.value() << std::endl; }
Generátor nabízí i přístup přes iterátor. Ten je dostupný jen v synchronním režimu. Iterátory pak otevírají cestu k použití ranged-for. Pozor na to, jestli je generátor nekonečný, pak zde dojde k nekonečné smyčce
cocls::generator<int> gen = fibo(20); for (auto value: gen) std::cout<<value<<std::endl;
Generátor se však může chovat i jako funkce, kterou voláme vždy, když potřebujeme aby generátor vygeneroval další hodnotu. V takovém případě generátor vrací future<T>
cocls::generator<int> gen = fibo(); for (int i = 0; i< 10; i++) { cocls::future<int> res = gen(); std::cout << res.wait() << std::endl; }
U konečných generátorů musíme detekovat konec.
cocls::generator<int> gen = fibo(); while(true) { cocls::future<int> res = gen(); if (res) { //res.has_value() std::cout << *res << std::endl; } else { break; } }
V asynchronním přístupu je třeba použít co_await na has_value()
cocls::generator<int> gen = fibo(); while(true) { cocls::future<int> res = gen(); if (co_await res.has_value()) { std::cout << *res << std::endl; } else { break; } }
Nebo to řešit přes výjimku
cocls::generator<int> gen = fibo(); try { for (int i = 0; i< 10; i++) { int res = co_await gen(); std::cout << res << std::endl; } } catch (const cocls::await_canceled_exception &) { std::cout << "Done" << std::endl; }
Generátor s argumentem se volá s argumentem buď přes .next(arg) nebo prostým zavoláním gen(arg). Předat lze pouze jeden argument, pokud chcete předat víc argumentů, pak použijte strukturu. Argument se předává referencí, proto je třeba zajistit, aby zůstal platný po celou dobu generování – což je zpravidla zajištěno tím, že volající typicky čeká na výsledek.
Pokud jako argument použijeme tuple nebo strukturu, můžeme generátor volat s více argumenty. Generátor pak obdrží hodnoty zabalené do zvolené struktury
Trochu se liší i vlastní běh generátoru
struct RetVal { double sum = 0; int count = 0; }; cocls::generator<RetVal,double> summary() { double val = co_yield nullptr; RetVal rv; while(true) { rv.count++; rv.sum+=val; val = co_yield rv; } } int min() { auto sum = summary(); double data[] = {1,4,32,31.3,58.3,0.2, 16.3, 0.8, 7.7,4,8.5}; for (double x: data) { RetVal state = *sum(x); std::cout << "Value=" << x <<", Sum=" << state.sum <<", Count=" << state.count <<", Avg=" << state.sum/state.count << std::endl; } return 0; }
Agregace generátorů je důležitá v okamžiku, kdy mám několik asynchronních generátorů a potřebuju získat hodnotu z jednoho z nich, je libovolné z kterého, z toho prvního, který generování dokončí.
Jako příklad uvedu generátor, který realizuje accept příchozího spojení
cocls::generator<int> accept_con = listen(mother_socket); //.... int socket = co_await accept_con();
Funkce listen(mother_socket) zahájí naslouchání na nově příchozí spojení na otevřeném socketu. Pokud bych chtěl naslouchat na dvou socketech současně, byl by kód mnohem složitější. Naštěstí toto řeší generator_aggregator
cocls::generator<int> accept_con = cocls::generator_aggregator({ listen(mother_socket_1), listen(mother_socket_2) }); //.... int socket = co_await accept_con();
Agregátor přijímá n generátorů a sám pak funguje jako generátor s tím, že vždy vygeneruje výsledek generátoru, který jako první dokončil generování, v tomto případě vrátí první spojení, které se připojilo na libovolný otevřený port. Vnitřně funguje tak, že obsahuje frontu výsledků, které přenáší do co_yield a při návratu požádá generátor, jehož výsledek byl předán o nové generování. Poradí si i s konečnými generátory. A i když je primárně určen pro asynchronní generátory, bude fungovat i se synchronními generátory, tam akorát bude docházet k prokládání výsledků generátorů.
Mohl bych pokračovat v psaní, ale článek by už byl hodně dlouhý. Přitom drobných nuancí, které přinesly výrazná vylepšení by se našlo ještě hodně. Některé další informace najdete přímo v README u knihovny, kterou zatím mám napsanou v češtině – anglický překlad chystám. Zároveň chci upozornit, že již dále nebudu vyvíjet starou knihovnu „coroutines_classes“.
Samotný vývoj cocls knihovny považuju za „RC1“, tedy že výrazné změny neplánuju, maximálně se budu soustředit na hledání chyb a optimalizace přičemž se budu snažit minimálně zasahovat do programátorského rozhraní. A slibuju, že napíšu další testy.
Tuto knihovnu také používám v plánované knihovně „coroserver“ – zatím neveřejná, ale pár fragmentů z kódu jsem zde použil pro demonstraci – knihovna by měla implementovat síťový vstup a výstup pomocí korutin (TCP/HTTP server, atd)
[1] https://github.com/ondra-novak/cocls
[2] https://en.cppreference.com/w/cpp/language/copy_elision
Intenzivně se zabývám programováním zejména v jazyce C++. Vyvíjím vlastní knihovny, vzory, techniky, používám šablony, to vše proto, aby se mi usnadnil život při návrhu aplikací. Pracoval jsem jako programátor ve společnosti Seznam.cz. Nyní jsem se usadil v jednom startupu, kde vyvíjím serverové komponenty a informační systémy v C++
Přečteno 51 633×
Přečteno 24 295×
Přečteno 23 090×
Přečteno 21 393×
Přečteno 18 055×