Asynchronní cykly v C++20

13. 4. 2024 14:14 (aktualizováno) Ondřej Novák

Pro podporu asynchroních cyklů C++20 byl navržen příkaz for co_await. Problém je, že nakonec se tento návrh do normy nedostal. Máme nějakou náhradu?

for co_await

Příkaz for asi nemusím představovat. Jeho použití pro procházení kontejnerů a rozsahů asi také ne. Kdyžtak pro zopakování

std::vector<int> items={1,2,3,4};
for (int x: items) {
   std::cout << x << std::endl;
}

Aby příkaz správně fungoval, je třeba, aby kontejner (zde vector) definoval iterátor a funkce begin() a end(). Příkaz for je víceméně zkrácený zápis následujícího kódu

for(auto iter = item.begin(),  end = item.end(); iter != end; ++iter) {
    int x = *iter;
    std::cout << x << std::endl;
}

Norma C++ má tu definici trochu složitější, můj přepis je zjednodušen pro snadnou demonstraci.

Myšlenka je taková, že mohu jakýkoliv objekt upravit tak, aby se choval jako kontejner, měl definovaný iterátor a metody begin() a end(). S nástupem podpory korutin je ideálním kandidátem generátor

coro::generator<int> fibo(int count) {
    unsigned int a = 1;
    unsigned int b = 1;

    for (int i = 0; i < count; ++i) {
        co_yield a;
        int c = a+b;
        a = b;
        b = c;
    }

}
int main() {
    for (int v: fibo(10)) {
       std::cout << v << std::endl;
    }
}

Iterátor pro generátor může být maximálně jednoduchý. Při jeho konstrukci požádáme generátor o první hodnotu, kterou iterátor vrátí při dereferenci. Operace ++ pak požádá o další hodnotu. Pokud je iterátor srovnáván s end(), tak vrací true, pokud generátor ukončil generování.

//příklad
auto gen = fibo(10);
auto iter = gen();
while (iter.has_value()) {
int v = *iter;
iter = gen();
}

Stačí tedy přiřadit

  • konstrukce = volám gen()
  • test na end() = volám .has_value()
  • dereference
  • operator ++ = volám gen()

Pokud by náš generátor byl asynchronní, musel by kód přidat na některé místa klíčové slovo co_await .

//příklad - nefunguje v libcoro!!!

auto gen = fibo(10);
auto iter =co_await gen();
while (iter.has_value()) {
     int v = *iter;
     iter = co_await gen();
}

Ekvivalentní zápis pro for by vypadal takto:

for(auto iter = co_await item.begin(),  end = item.end(); iter != end; co_await ++iter) {
    int x = *iter;
    std::cout << x << std::endl;
}

A přesně takhle by měl být definován asynchronní for

for co_await (int v: fibo(10)) {
       std::cout << v << std::endl;
}

Jak to obejít

Jak už jsem předeslal for co_await se do finální verze C++ 20 nedostal. Celou definici této funkcionality najdete zde a zdůvodnění, proč to nakonec bylo vyřazeno najdete zase zde. Za mě je to škoda, na druhou stranu, … hmm, ani já si nejsem moc jistý s implementací awaiteru nad operátorem ++ (co_await ++iter). Fakt je ten, že rozhraní generátorů není ustálené

V knihovně libcoro je na výběr několik možností jak tento cyklus napsat

varianta 1: … výjimka nakonec

Předpokládejme, že asynchronní generátor je nekonečný, ale pokud potřebuje skončit, generuje výjimku

try {
    auto gen = async_fibo(10);
    for(;;) {
        int v = co_await gen();
        std::cout << v << std::endl;
    }
} catch (const coro::await_canceled_exception &) {
    //konec
}

Rozumím tomu, že používání výjimek k tomuto účelu není vůbec optimální, ale zmínit jsem to musel. Minimálně to pomůže pochopit používání generátorů z jiného úhlu. Pokud totiž generátor budu prezentovat jako funkci, která při zavolání vygeneruje novou hodnotu, pak stačí v cyklu „generátor volat“ a získávat hodnoty. Generátor na konci vyhodí výjimku a tím detekujeme konec cyklu.

Volání generátoru je implementováno definicí operátoru () v objektu generátoru, který provede resume korutiny, která generátor implementuje. Prezentace generátoru jako funkce se s vnitřním stavem pomáhá čitelnosti kódu. Takové funkce známe už od C++11, jen s tím, že jejich implementace není realizována korutinou.

Optimální by bylo vyhnout se vyhození výjimky. Můžeme to řešit třeba tak, že generátor bude generovat speciální hodnotu představující konec. Tenhle způsob taky není neznámý. Například při čtení vstupního streamu po znacích můžeme obdržet hodnoty - 1, což není platný ASCII znak, ale je to konstanta označená jako EOF. 

Generátor by také mohl vracet hodnotu přes objekt std::optional<>. Pokud by vrátil prázdnou hodnotu, znamenalo by to konec.

varianta 2: prázdná hodnota nakonec

V prvních verzích libcoro (tedy cocls) jsem skutečně z generátorů vracel std::optional<> abych mohl snadno detekovat konec generování. Po mnoha přepisech jsem došel k řešení, že coro::future může fungovat i jako optional v případě, že umožním označit futuru za resolved, avšak bez nastavení hodnoty. Teprve až pokus načíst hodnotu vyhodí výjimku await_canceled_exception. Předpokládá se, že tento stav se použije v situacích, kdy ten kdo generuje výsledek bude chtít oznámit čekající korutině, že operace byla zrušena, typicky proto, že třeba sám tento objekt ukončuje činnost. Například pokud se svázaná promise drží v nějakém kontejneru a tento kontejner si projde destrukcí, pak si destrukcí projdou i tam uložené promise a všechny čekající korutiny se dozví, že operace byla zrušena.

Celé se to točí kolem oznámení konce nějaké činnost, takže proč to nerozvinou?

Rozhraní pro coro::future obsahuje dvě operace, které lze na to využít .wait().has_value()

auto gen = async_fibo(10);
for(;;) {
     auto fut = gen();             //získej hodnotu jako future
     co_await fut.wait();          //uspi korutinu, dokud není hodnota ready
     if (!fut.has_value()) break;  //nemá hodnotu? konec
     int v = fut.get();            //vyzvedni hodnotu
     std::cout << v << std::endl;
 }

Funkce wait() pouze počká na vyřešení futury, ale protože se nedotazuje na výsledek, nevyhazuje výjimku. Jakmile futura má hodnotu, můžeme v klidu použít jakékoliv funkce s jejího synchronního API bez nebezpečí, že by došlo k blokování.

Kombinace wait() a has_value() je tak častá, že vznikla „zkratka“ v podobě awaitable operátoru ! (vykřičník). Ten vrací true, pokud, pokud futura hodnotu nemá. A to že je awaitable znamená, že lze použít s klíčovým slovem  co_await

auto gen = async_fibo(10);
for(;;) {
     auto fut = gen();             //získej hodnotu jako future
     if (co_await !fut) break;     //uspi korutinu a pokud po probuzení nemá
                                   //hodnotu, tak konec
     int v = fut.get();            //vyzvedni hodnotu
     std::cout << v << std::endl;
 }

K tomu je třeba dodat, že futura by mohla mít celé své synchronní API též v podobě awaitable funkcí, ale to by znamenalo naprogramovat mnoho awaiterů, ne všude to přináší nějaký větší benefit. 

Šlo by to nějak zapsat jako operaci for? Co třeba for v klasickém formátu? 

varianta 3: klasický cyklus for 

Klasický cyklus for má zápis:

for(inicializace; podmínka; inkrement)

Následující kód bude fungovat:

auto gen = async_fibo(10);
for(auto fut = gen();!co_await !fut; fut = gen()) {
     int v = fut.get();            //vyzvedni hodnotu
     std::cout << v << std::endl;
 }

Zápis !co_await !fut je divný, i když dává logiku. Protože ! vrací true, pokud nemá hodnotu, obrácením podmínky se pak vrací true, pokud má hodnotu. Možná by stálo za uvážení, zda neudělat funkci has_value jako awaitable, takže by se psalo co_await fut.has_value(). – a skutečně jsem to tak kdysi měl. Mně se ale zalíbila jiná zkratka, kterou používám a to !!. Totiž to napravo je operátor, který se používá na detekci chyby a to nalevo je operátor, který to neguje. Například !stream vrací true, pokud ve streamu je chyba (třeba eof) a !!stream vrací true, pokud ve streamu není chyba. Stejný mechanismu by se mi líbil tady, je třeba udělat operaci !! jako awaitable – a tak to skutečně je i u  coro::future (ve skutečnost nejde o nový awaiter, ale o možnost dopředu specifikovat v jakém případě má awaiter vrátit true)

auto gen = async_fibo(10);
for(auto fut = gen();co_await !!fut; fut = gen()) {
   int v = fut.get(); //vyzvedni hodnotu
   std::cout << v << std::endl;
}

varianta 4: nová myšlenka, iterujeme futury

Pokud by iterátor asynchronního generátoru vracel místo hodnot futury, pak by bylo možno použít standardní range-for a na hodnotu počkat až v těle cyklu

for(auto &fut: async_fibo(10)) {
   int v = co_await fut;
   std::cout << v << std::endl;
}

Tahle varianta je zatím nejdál, kam jsem se dostal, při hledání náhrady neexistující for co_await. Celé řešení má ale jeden zásadní háček. Vyžaduje, aby generátor svůj konec oznamoval synchronně. Vyplývá to z toho, jak je implementována detekce konce.

generator_iterator &operator++() {
     _stor = _src();            //vyzvedni futuru další hodnotu a uloži ji
     _stor.start();             //spusť generátor (deferred_future::start())
                                //pokud není pending a nemá hodnotu

     if (!_stor.is_pending() && !_stor.has_value()) {
         _is_end = true;        //je to konec
     }
     return *this;
}

Tato varianta využívá toho, že asynchronní generátor je spuštěn nejprve v aktuálním kontextu (vlákně) a teprve když narazí na co_await ve svém kódu a je uspán, může později pokračovat v rámci jiného kontextu. Z hlediska průběhu exekuce kódu to vypadá, jako kdyby generátor ukončil generování hodnoty. To že běží asynchronně poznáme jedině tak, že vrácená futura bude stále ve stavu pending. Tam právě zafunguje co_await tím, že také uspí volající korutinu a její probuzení se provede ve stejném kontextu jako probuzení generátoru – ten po vygenerování hodnoty použije symetrický transfer k přepnutí do čekající korutiny.

Nicméně, pokud generátor ukončí svou činnost synchronně, tak se to dozvíme ještě před co_await právě výše zmíněným testem na pending a na has_value a tedy můžeme cyklus ukončit.

Co když ale generátor svoji činnost ukončí asynchronně? Může vůbec taková situace nastat? Samozřejmě může, jako příklad uvedu TCP stream, který čeká na data a najednou obdrží EOF. Takový generátor nemá co vrátit a tak oznámí konec – asynchronně.

Co v takovém případě nastane? Nastane výjimka na co_await, jako v první variantě. 

Výše uvedená varianta tak není plnohodnotnou náhradou za zrušenou for co_await ale i tak může pokrýt hodně případů,  kdy lze ranged-for použít i pro asynchronní generátory. 

Task list

Tento způsob procházení asynchronních výsledků vznikl prvně pro task list. To je kontejner v libcoro, do kterého si lze ukládat spuštěné asynchronní korutiny

coro::task_list<coro::future<int> > list;
for (int i = 0; i < 100; ++i) {
     list.push_back(run_async_task(i));
}
for (auto &fut: coro::when_each(list)) {
     int r = co_await fut;
     std::cout << r << std::endl;
}

(možná někoho napadne, proč speciální objekt namísto použití vector. No protože coro::future nemá povolený move constructor a proto se nedá použít s vektorem. Ve skutečnosti se pod tím ukrývá deque s upraveným API)

V tomto případě třída coro::when_each prochází seznam zpracovaných úloh v pořadí, v jakém byly dokončené, přičemž k vlastnímu čekání dochází až v co_await na dalším řádku. Na první pohled to vypadá trochu nelogicky, jak mohu v příkazu for získat další úlohu v pořadí, když netuším, která to bude? Přece se do dozvím až na další řádce? Ve skutečnosti si nevyzvedávám úlohu, ale jen index, tedy ona zmíněná fut obsahuje jen referenci na N-tou úlohu v pořadí a pokud ta ještě není dokončena, bude co_await čekat na její dokončení. Futura tak nereferencuje výsledek budoucí úlohy (nevíme, která to bude), ale jen převezme výsledek N-té úlohy po jejím dokončení. Do té doby pouze čeká na dokončení libovolné úlohy na daném pořadí.

( coro::when_each je inspirován funkcí Task.WhenEach z .NET)

Závěr

To je asi vše co bych měl k for co_await a k tomu, jak jej nahradit. Třeba se jednou do normy dostane (není tam ani v C++23). A pokud mě napadne ještě nějaké jiné řešení, rád o tom zase něco napíšu.

Knihovna libcoro.

Sdílet