Asynchroní parsování JSONu v C++20

4. 5. 2024 14:28 (aktualizováno) Ondřej Novák

Tento článek je ukázkou a malým cvičením na korutiny. Ukážeme si, jak napsat parser JSONu jako korutinu v C++20

Motivace

Asynchronní parsování JSONu najde uplatnění zejména u programů, které komunikují po síťovém spojení nebo používají asynchronní vstupně výstupní operace. Jako příklad si můžeme uvést HTTP server, který přijme request od klienta ve formátu JSON. 

Běžně se toto realizuje tak, že se data nejprve načtou do mezipaměti a teprve, když jsou data kompletní, provede se parsování JSONu a sestavení adekvátní datové struktury. Parsování je provedeno synchronně, což vede na výrazně jednodušší implementaci. Nevýhodou je to, že celý postup vyžaduje minimálně dvojnásobnou paměťovou náročnost. Data se v jednu chvíli nachází v mezipaměti a zároveň ve výsledné struktuře, která byla při parsování sestavena. V praxi jsem občas narazil na problémy s paměťovou náročností serverových komponent provozovaných na malých VPSkách (zcela zřejmě z důvodu úspory nákladů za pronájem VPSek)

U HTTP serveru může být situace ještě zvládnutelná, protože tělo POST se často posílá s informací o tom, jak jsou data velká. Lze tedy paměť rezervovat před načtením těla requestu. Tedy za předpokladu, že klient nepoužije chunked formát – ano, to může, je to součást HTTP/1.1. Tam pak vytváříme nekonečně velkou mezipaměť – buffer, který se musí v průběhu rozšiřovat, a toto rozšiřování stojí další paměť a kopírování dat. Podobně můžeme narazit u HTTP klienta, který musí načíst odpověď serveru, pokud například query zaslané na server vygeneruje velikou odpověď. Server možná data formátuje průběžně, jak výsledky načítá z databáze, ale klient musí nejprve odpověď celou načíst, aby ji mohl zpracovat. A to taky může stát obrovské množství paměti a zdržuje to zpracování. Klientem může být například mobilní telefon.

Asynchronní parsování umožňuje parsovat formát JSON a vytvářet jeho reprezentaci rovnou z přijímaných dat bez nutnosti data ukládat do mezipaměti. Budeme samozřejmě diskutovat, kolik nás taková úprava parsování bude stát výkonu.

Format JSON a jeho parsování

JSON formát je jednoduchý a jeho parser by měl být schopen naprogramovat průměrný programátor. Formát je přitom velice dobře zdokumentovaný na stránce json.org

JSON formát lze popsat LL(1) gramatikou a tak nejjednoduší způsob analýzy formátu je použití rekurzivního sestupu. Poznat typ prvku lze již podle prvního znaku (pokud přeskočíme bílé znaky)

  • uvozovky(") – string
  • znak „t“ – true
  • znak „f“ – false
  • znak „n“ – null
  • hranaté závorky „[“ – pole
  • složené závorky „{“ – objekt
  • číslice, plus a mínus – číslo
  • cokoliv jiného – chyba

objekty a pole jsou kontejnery, tedy obsahují další prvky. Jejich analýza se tedy dělá rekurzivně

  • pole – [ <prvek>, <prvek>,…]
  • objekt – {„string“: < prvek > ,„string“: < prvek > }
 switch (c) {
      default:
         if ((c >= '0' && c <= '9') || c == '+' || c == '-') {
                return parse_number(c);
         } else {
                throw chyba();
         }
      case '"': return parse_string();
      case '{': return parse_object();
      case '[': return parse_array();
      case 't': return check_keyword(c,"true", true);
      case 'f': return check_keyword(c,"false", false);
      case 'n': return check_keyword(c,"null", nullptr);
}

… a teď korutina

Je vidět, že parser bude zřejmě volat různé funkce, které mohou rekurzivně volat parser, zejména u kontejnerů. Vypadá to, že korutina se tedy rozpadne na mnoho korutin a místo return budeme psát co_return co_await. Například

    case '[': co_return co_await parse_array();

Tento postup není optimální. Každé zavolání korutiny znamená alokaci paměti na heapu, protože korutiny nemohou použít zásobník právě kvůli potřebě korutinu kdykoliv přerušit. Korutiny v C++ jsou totiž stackless, a to věc komplikuje. (Jistě by šlo použít nějakou variantu stackful korutiny, například boost::context, ale o jejich problémech si povíme někdy příště).

Proto jsem si zkusil naprogramovat parser, který nepotřebuje rekurzi realizovanou pomocí zásobníku volání. Celý parser je jako jedna jediná korutina. Nedochází k rekurzivnímu volání sebe sama. Jediné volání, které vyžaduje co_await je čtení vstupu.

Požadavek na vstup

Definujme si jak budeme získávat vstupní data.

  • Asynchronní stream
  • Asynchronní generátor
  • Asynchronní iterátor
  • Funkce vracející awaitable vysledek

První tři varianty postupně vyloučíme. Asynchronní streamy ve standardu C++ neexistují, bylo by nutné svázat řešení s nějakou knihovnou, třeba s boost::asio. Asynchronní generátory sice je možné v C++20 naprogramovat, ale jejich rozhranní není ustálené, jak jsme si ukázali v minulém článku. Tam také spadají asynchronní iterátory. Optimální by tedy bylo, získávat data voláním nějaké metody nějaké třídy, která by vracela awaitable výsledek (šlo by na něj čekat přes co_await). A co přímo nějaká lambda funkce 

static_assert(coro::awaitable_r<std::invoke_result_t<decltype(source)>, std::string_view>);
std::string_view data = co_await source();

Vyžaduje se:

  1. Protože se vrací view na řetězec, vyžaduje se, aby samotný řetězec byl alokován v kontextu (closure) dané funkce.
  2. Vrácená hodnota je platná do příštího zavolání nebo destrukce funkce (jejího closure)
  3. Neexistuje horní limit na délku vráceného řetězec
  4. Každé zavolání může vrátit jiný počet znaků, minimálně však alespoň jeden znak. Pokud prozatím nejsou žádná data k dispozici, musí být blokující (ve smyslu korutin, tedy uspat korutinu po dobu čekání na data)
  5. Pokud však funkce vrátí prázdný řetězec, považuje se to za EOF (stream skončil, druhá strana uzavřela spojení) 

Tyto pravidla odpovídají způsobu, jakým probíhá čtení ze socketu ( recv).

Transformace awaiteru

V předchozím odstavci jsme si definovali vstup, kterým má být funkce generující úseky stringů, tedy JSON text je rozsekán na tyto úseky, tak jak se je povedlo přečíst ze vstupu. Nicméně pro parsování by bylo vhodnější, kdyby zdroj vracel text po znacích.

Proč jsem volil výše uvedené požadavky, když mi nevyhovují? Protože bych rád mé řešení přizpůsobil případu užití (čtení dat ze socketu), a nevyžadoval po uživateli tohoto řešení, aby si psal vlastní transformaci. Transformaci awaiteru z varianty, která vrací úseky řetězce na varaintu, která vrací jednotlivé znaky, můžeme snadno zakomponovat do našeho řešení.

Čtení vstupu se tedy změní tak, že vždycky načteme úsek, rozsekáme ho na písmenka a to vracíme parseru po jednom, kdykoliv o ně požádá. Jakmile písmenka dojdou, načteme další úsek. Tento „sekáček“ znaků musí být awaitable, protože budeme chtít parser přerušit kdykoliv dojdou znaky. Optimální by bylo, kdyby tento transformovaný awaiter bylo možné použít takto

CharacterSource character_source(source);
char c;
c = co_await character_source;
c = co_await character_source;
c = co_await character_source;
....

Zkusme si takovou třídu navrhnout.

Protože objekt CharacterSource je evidentně stavový – musí si udržovat načtený řetězec, tak se nabízí použít generátor. Ale to by byl kanón na vrabce. Mnohem lepší je vyjít ze znalosti struktury awaiteru. Protože náš zdroj (source) je funkce, která vrací awaiter, a evidentně CharacterSource musí být naprogramován také jako awaiter, tak se nabízí naimplementovat to jako proxy.

Awaiter v C++20 má 3 povinné metody: await_ready, await_suspend a await_resume. Tyto tři metody budou součastí našeho  CharacterSource

struct CharacterSource {
    std::string_view _str = {};
    std::size_t _pos = 0;
    //....
    bool await_ready()  {
       if (_pos < _str.length()) return true;
       //...
    }
    auto await_suspend(std::coroutine_handle<> h) {
       //...
    }
    char await_resume() {
        //...
        return _str[_pos++];
    }
};

Jak awaiter funguje? Pokud zapíšu co_await src, kde src je instance CharacterSource, zavolá se nejprve await_ready(). Pokud tato funkce vrátí true, zavolá se rovnou await_resume()  a výsledek této funkce je výsledkem operátoru co_await. Pokud by ale první funkce vrátila false, zavolá se await_suspend() a korutina je považována za uspanou, řízení se vrátí podle výsledku té funkce, ale zpravidla volajícímu. Po probuzení korutiny se volá await_resume() se stejnými důsledky.

Vypadá to na komplikovaný kód, ale překladač umí operaci c = co_await src skvěle optimalizovat. Výše uvedená implementace bude při zapnutých optimalizací přeložena takto:

if (_pos < _str.length()) {
     c = _str[_pos++];
} else {
     //načti další úsek
     //...
}

… a to na všech místech, kde bude co_await uveden (bude se inlineovat). Tedy přesně stejně, jak by to dopadlo v synchronní variantě, pokud by uměla načítat data z nějakého vstupu po částech s blokováním při čekání na data.

Otázkou je, jak bude vypadat zbytek kódu pro načtení dalšího úseku. Na začátek přidáme další deklarace

struct CharacterSource {
    using SourceAwaiter = std::invoke_result_t<Source>;
    Source &_src;
    std::string_view _str = {};
    std::size_t _pos = 0;
    bool _fetched = false;
    union {
        SourceAwaiter _srcawt;
    };

    CharacterSource(Source &src):_src(src) {}
    ~CharacterSource() {}
//...
};

Reference na Source v _src je zřejmá, objekt musí mít přístup ke zdroji. Příznak _fetched se nastavuje na true, pokud došlo k přečtení dat ze zdroje, čili jeho awaiter je schopen data načtená data vydat. Podivnou deklarací je union s SourceAwaiter, což je alias typu návratové hodnoty Source. Tato deklarace má svůj důvod. Způsobí, že konstruktor jej nebude konstruovat implicitně. Musíme jej aktivovat explicitně a to přesto, že tam je uvedena jen jedna varianta. To je funkce unionu. Protože kód vytváříme jako šablonu, nemáme garantované žádné vlastnosti zdrojového awaiteru.  Co když nemá default konstruktor? Co když jej nelze přesouvat? Nejspíš tam bude coro::future a tuto třídu určitě přesouvat nelze. Tahle proměnná tak slouží jen jako rezervované místo pro uložení návratové hodnoty volání source(), a protože zároveň musíme tuto hodnotu podržet mimo zásobník, je zvolen právě tento způsob. Následně si ukážeme, jak se to používá.

bool await_ready()  {
     if (_pos < _str.length()) [[likely]] return true;
     new (&_srcawt) SourceAwaiter(_src());
     _fetched = true;
     return _srcawt.await_ready();
}

Funkce await_ready tedy stále vrací true pokud má zásobu nepřečtených znaků. Atribut [[likely]] označuje, že předpokládáme, že tato situace bude častá, takže překladač navrhne kód tak, aby CPU spíš volilo tuto větev. Jakmile ale znaky dojdou, inicializuje se _srcawt návratovou hodnotou volání Source (nejde použít std::construct_at, hádejte proč?). Nastaví se příznak _fetched a vrátí se to, co vrátí zdrojový awaiter na await_ready(), tedy přesně to emuluje chování  co_await. Pokud však tato funkce vrátí false, pak zdrojový awaiter ještě data nemá, a tak dojde k volání  await_suspend()

auto await_suspend(std::coroutine_handle<> h) {
     return _srcawt.await_suspend(h);
}

Tato funkce pouze jen přepošle informaci o uspání korutiny do zdrojového awaiteru. Ten nejspíš propojí handle korutiny s požadavkem na další data a vrátí výsledek, který povede k uspání korutiny (může přepnout na jinou korutinu, která operaci vyřídí). Zdrojový awaiter nemusí vědět, že požadavek byl přeposlán. Po probuzení korutiny se bude volat await_resume() naší implementace.

char await_resume() {
     if (_fetched) [[unlikely]] {
         coro::on_leave finally=[this]{
              _fetched = false;
              _srcawt.~SourceAwaiter();
         };
         _pos = 0;
         _str = _srcawt.await_resume();
         if (_str.empty()) throw json_parse_error::unexpected_eof;
     }
     return _str[_pos++];
}

Ve funkci await_resume, která se volá na konec, ať už se uspávalo nebo ne, testujeme právě příznak _fetched. Ten je true, pokud došlo k načtení dalšího úseku dat. Je třeba ty data přenést z awaiteru, tedy zavolat jeho funkci await_resume() a resetovat čítač pozice. Nakonec musíme instanci SourceAwaiter zdestruovat.

Ve funkci se používá coro::on_leave což je objekt, který registruje lambda funkci a tuto funkci pak volá při opuštění scope, bez ohledu na to, jakým způsobem se scope opouští. Je to takove finally (kdysi jsem o tom psal zde), protože funkce await_resume může vyhodit výjimku a my potřebujeme, aby k destrukci SourceAwaiter došlo za všech okolností a při té příležitosti vynulujeme příznak  _fetched

To je všechno pro transformaci awaiteru, nyní můžeme psát

CharacterSource src(source);
char znak = co_await src;

Optimalizující překladač bude při každém čtení testovat, zda je dostatek nepřečtených znaků a pokud ano, automaticky je bude vracet z bufferu a inkrementovat čítač pozice. To je pár instrukcí. Pokud znaky dojdou, následuje mnohem složitější část kódu, která může vést k uspání korutiny, ale s tím se tak nějak počítá, protože pokud nejsou data k dispozici, proces zpracování již víc zoptimalizovat nejde

K maximálnímu pohodlí nám zbývají ještě dvě pomocné funkce

std::string_view get_unused() const {return _str.substr(_pos);}

void put_back() {--_pos;}
  • Funkce get_unused() vrací podřetězec s doposud nezpracovanými znaky, to se může hodit když za JSON textem následuje obsah, o který nechceme přijít.
  • Funkce put_back() ruší předchozí operaci čtení znaku, takže následující co_await vrátí stejný znak. Předpokládá se, že takto lze vrátit pouze jedno čtení – jen se to neověřuje, ušetříme nějaké instrukce.

Parsování JSON řetězce

Parsování JSON řetězce není zrovna triviální operace. Nestačí jen hledat ukončující uvozovky. Je potřeba také interpretovat escape sekvence a konvertovat unicode zápis ( \u12AB) na UTF-8. Ano, pokud výsledný řetězec je UTF-8, pak je třeba tyto sekvence převést do UTF-8. A nezapomenout na surrogate pairs ( \uD83D\uDE00 je 😀)

Zde nepůjdu do takových podrobností, jen se zamyslíme nad tím, jak asynchronně číst řetězec, a přitom to nenapsat jako korutinu, protože jsme si řekli, že tam bude jen jedna korutina. A řetězec musíme umět načíst hned na dvou místech, tedy jednak při čtení řetězcových hodnot, a jednak při čtení klíčů.

Parser řetězců lze napsat jako filtr/konvertor znaků. To je funkce, která má closure (lambda funkce). Při inicializaci obdrží výstupní funkci, tedy funkci zodpovědnou za zpracování konvertovaných znaků. Funkce se pak volá pro každý znak a sama provádí konverzni znaků. Musí si tedy udržovat stav, je to takový stavový automat. Toto je začátek

///converts json string to utf-8 string
template<std::invocable<char> Output>
inline auto json_string_to_utf_8(Output output) {

    enum class State {
        character,
        special,
        codepoint1,
        codepoint2,
        codepoint3,
        codepoint4,
    };

    return [output,
            state = State::character,
            codepoint = 0,
            first_codepoint = 0](char c) mutable -> bool {

        switch (state) {
            default:
                if (c == '"') return false;
                else if (c == '\\') state = State::special;
                else output(c);
                break;
            case State::special:
//.....

Náš filtr navíc vrací true, pokud řetězec pokračuje, nebo false pokud byla dodána ukončující uvozovka. Použití v korutině pak vypadá takto:

case '"': {
    auto strconv = json_string_to_utf_8([&](char c){strbuff.push_back(c);});
    while (strconv(co_await src));

    //v strbuff je výsledek
    //.... něco s tím udělej

}break;

Hlavní smyslem tohoto uspořádání je přesunou uspávací část na základní úroveň do té jediné korutiny. V tomto případě uspávání probíhá při zpracování parametru filtru uvnitř výrazu while.(děsivé co? Ale opravdu to takhle funguje, testování podmínky while lze přerušit)

Parsování čísel

Při studiu různých parserů jsem se setkal i se „závoděním“ o nejrychlejší parsing JSONu, kde největší focus se soustředí hlavně na rychlý parsování čisel. Ono se ukazuje, že funkce strtod nepatří k nejrychlejším. Časem jak jsem získával zkušenosti jsem naznal, že parsování čísel ze strany JSON parseru nepotřebuji, vlastně vůbec nechci aby něco takového parser dělal. Postačí, když JSON parser bude čísla předávat v jako řetězce.

Vyplývá to ze způsobu, jakým jsem z JSONem pracoval na serverech. Často totiž šlo o operaci načti-uprav-ulož JSON dokument v nějaké dokumentové databázi. A tady se objevily problémy s čísly, kdy docházelo ke zhoršení přesnosti čásel při této operaci, aniž by byly předmětem té úpravy. Parser totiž načetl čísla do double s nějakou přesností a serializátor je pak převedl na text s jinou přesností. Při ukládání čísel v původní řetězcové variantě k ničemu takovému nedocházelo. 

Proto i v tomto případě budeme čísla předávat v řetězcové podobě. Nicméně, parser by měl minimálně ověřit, že číslo je validní. Tady naštestí kód není složitý, stačí ho přímo vložit do korutiny do větve, která zpracovává čísla. 

static constexpr auto is_number = [](char c) {return c >= '0' && c <= '9';};
static constexpr auto is_e = [](char c) {return c == 'e' || c == 'E';};
static constexpr auto is_sign = [](char c) {return c == '+' || c == '-';};

if (is_number(c) || is_sign(c)) {

    if (is_sign(c)) {
        strbuff.push_back(c);
        c = co_await src;
    }
    if (!is_number(c)) throw json_parse_error::invalid_number;
    while (is_number(c)) {
        strbuff.push_back(c);
        c = co_await src;
    }
    try {
        if (c == '.') {
            strbuff.push_back(c);
            c = co_await src;
            if (!is_number(c)) throw json_parse_error::invalid_number;
            while (is_number(c)) {
                strbuff.push_back(c);
                c = co_await src;
            }
        }
        if (is_e(c)) {
            strbuff.push_back(c);
            c = co_await src;
            if (is_sign(c)) {
                strbuff.push_back(c);
                c = co_await src;
            }
            if (!is_number(c)) throw json_parse_error::invalid_number;
            while (is_number(c)) {
                strbuff.push_back(c);
                c = co_await src;
            }
        }
        src.put_back();
    } catch (json_parse_error::error_t e) {
        //EOF on top level is OK, otherwise rethrow
        if (e != json_parse_error::unexpected_eof
                 || !levels.empty() || !is_number(strbuff.back())) throw;
    }
    strbuff.push_back('\0'); //strtod support
    //... další zpracování strbuff ...
}

Na tomhle kódu jsem nechtěl demonstrovat „žádnou vychytávku“, snad jen použití co_await na spoustu míst, kde dochází přečtení dalšího znaku (něco jako cin.get()). U JSONu je číslo jediná sekvence, která může končit EOF pokud je na základní úrovni. Protože předčasné EOF je reportováno výjimkou. musí zde být try-catch blok, který tento případ řeší.

Znak nula na konec bufferu se dává z důvodu možné zpracování čísla pomocí strtod, který očekává řetězec zakončený nulou.

Trochu nezvyklé je deklarace pomocných funkcí jako static constexpr lambda funkce. Je to jeden ze způsobů, jak deklarovat pomocnou funkci ultra-private. Obdoba javascriptového arrow operátoru.

//javascript
const is_number = c => c>='0' && c<='9';

Výstupní reprezentace JSONu

Tohle je zamyšlení o tom, jak reprezentovat výsledek parsování aniž bych byl nucen definovat nějakou přesnou strukturu. Chtěl jsem pouze napsat parser ale ne nutit někomu interní reprezentaci. Třeba bych chtěl tímto parserem generovat nlohmann/json

Nebudeme tedy definovat strukturu, ale představíme si jen takový koncept – vlastně, nadefinujeme si concept.

Pro ukládání výsledku je třeba nadefinovat factory přesnějí concept factory. Jak má taková továrna vypadat? Ideálně by měla definovat typ hodnoty, typ klíče a jednotlivé konstruktory. Například takto:

class JsonFactory {
public:
    using value_type = /*... nejaký typ ...*/;
    using key_type = /*... nejaký typ ...*/
value_type new_string(std::string_view s);
value_type new_number(std::string_view s);
value_type new_bool(bool b);
value_type new_null();
value_type new_array(std::span<value_type> items);
value_type new_object(std::span<std::string> keys, std::span<value_type> items);
key_type new_key(std::string_view str);
};

Poznámka: Funkce new_object obdrží dvě stejně dlouhé pole, jedno obsahuje klíče a jedno hodnoty. I když se zde nabízí použití pair<key_type, value_type>, ukáže se, že toto uspořádání je výhodnější pro parser.

Tuto třídu popíšeme jako concept

template<typename T>
concept json_factory = requires(T obj,
        std::string_view s,
        bool b,
        std::span<typename T::value_type> a,
        std::span<typename T::key_type> k) {

    {obj.new_string(s)}->std::same_as<typename T::value_type>;
    {obj.new_number(s)}->std::same_as<typename T::value_type>;
    {obj.new_bool(b)}->std::same_as<typename T::value_type>;
    {obj.new_null()}->std::same_as<typename T::value_type>;
    {obj.new_array(a)}->std::same_as<typename T::value_type>;
    {obj.new_key(s)}->std::same_as<typename T::key_type>;
    {obj.new_object(k,a)}->std::same_as<typename T::value_type>;
};

Kdo vidí koncept poprvé… Tento koncept se skládá z pravidel ve tvaru 

{ výraz } -> koncept;

… a v zásadě se jedná o pravidlo, že nějaký výraz lze vyhodnotit a že jeho výsledek odpovídá nějakému jinému konceptu, zde například std::same_as je koncept, který vyžaduje, aby výsledek byl přesně daného typu.

Parametry konceptu mohou být libovolné proměnné, které zde zastupují jen hodnoty daného typu, tedy aby se nemuselo místo toho psát std::declval<Typ>(). Překladač si prostě nějaké proměnné při ověřování vytvoří, na jejich obsahu při ověřování konceptu nesejde. Pokud z nějakého důvodu nejde koncept ověřit ale ani vytvořit (například chybí T::value_type), překladač oznámí chybu při dosazování zvoleného typu.

Náš parser tak bude jako parametr přijímat 

template<json_factory Factory>

… a může se spolehnout na existenci metod, které factory musí definovat. Pak není těžké ukládat parsované výsledky kamkoliv chceme.

Korutina parseru

Korutinu parseru napíšeme za pomocí coro::async. Tuhle korutinu můžeme převést na coro::future nebo na coro::shared_future, případně můžeme ji přímo zavolat z jiné korutiny a počkat na výsledek přes co_await. V normálním kódu je třeba korutinu spustit pomocí  .run()

template<json_factory JsonFactory, std::invocable<> Source>
inline coro::async<std::pair<typename JsonFactory::value_type, std::string_view> >
                parse_json(Source &&source, JsonFactory &&fact = {}) {
//....
}

Návratovou hodnotou korutiny je dvojice, ve first najdeme výslednou JSON hodnotu, a v second nezpracovaný vstup. Pokud dojde k chybě, vylítne z funkce výjimka  json_parse_error

auto result = co_await parse_json<MyJsonFactory>(source);
process(result.first);

Je dobré ověřit, že zdroj je awaitable a vrací std::string_view

using SourceAwaiter = std::invoke_result_t<Source>;
static_assert(coro::awaitable_r<SourceAwaiter, std::string_view>, "Source must return string_view and must be awaitable");

Deklarujeme si klíč a hodnotu

using Node = typename JsonFactory::value_type;
using KeyNode = typename JsonFactory::key_type;

Jak už jsem psal výše, žádná rekurze není povolena. Ale jak tedy budeme řešit čtení kontejnerů, které se jinak řeší rekurzí? Zavedeme si vlastní zásobník. A nebude jeden

Zavedeme si zásobník stavů. Nejprve tedy stavy:

enum class State {
        detect,
        key,
        array_begin,
        array_cont,
        object_begin,
        object_cont,
        object_key
};

Rozdělení do stavů má jednoduché pravidlo které (také) souvisí s bílými znaky. Každý stav totiž začíná přeskakováním bílých znaků. 

  • detect - je část, která detekuje typ elementu, tedy obsahuje ten velký case, který byl představen na začátku článku.
  • key - v tomto stavu se čte klíč, očekává se ", pak se čte řetězec.
  • array_begin - bylo načteno [, nyní buď pokračuje ]  – prázdné pole, nebo něco jiného, pak  detect
  • array_cont - byl přidán prvek, pokračuje buď , nebo  ]
  • object_begin – bylo načteno {, nyní buď pokračuje }  – prázdný objekt, nebo "  – pak object_keykey
  • object_cont – byly načteny klíč a hodnota, pokračuje , nebo  }
  • object_key - byl načten klíč, pokračuje :, pak pokračuje další prvek  detect

Dále je realizován zásobník hodnot a zásobník klíčů

struct Level {
    State _st;
    std::size_t _count = 0;
};

std::vector<Node> items;        //stack of items
std::vector<KeyNode> keys;      //stack of keys
std::vector<Level> levels;      //stack of levels'
std::vector<char> strbuff;      //string buffer

Jako zásobník se používá vektor, protože z něj mohu generovat rozsahy ( span). Vrchol je vždy back(). Vždy, když je dokončeno parsování některé hodnoty, je umístěna na vrchol zásobníku hodnot. Pokud se přečte klíč, je uložen na zásobník klíčů

Součástí stavu je nejen hodnota stavu, ale také počet hodnot v zásobníků hodnot a klíčů v zásobníku klíčů (vždy je to stejné číslo) alokovaných pro daný stav. Po „návratu“ z daného stavu, tedy odebrání stavu ze zásobníků se předpokládá, že výsledek stavu je jako poslední hodnota na zásobníku hodnot. Pokud vyprázdníme zásobník stavů, pak by v zásobníku hodnot měla být jedna hodnota, a to je výsledek parsování.

levels.push_back(Level{State::detect});
while (!levels.empty()) {
    //.... (hodně dlouhý kód)...
}
co_return std::pair{std::move(items.back()),src.get_unused()};

Místo volání funkcí a podprogramů se tedy používá jeden velký switch-case pro všechny stavy

while (!levels.empty()) {
    do c = co_await src;  while (c>=0 && c <= 32); //eat whitespaces
    switch(levels.back()._st) {
        case State::detect:...;break;
        case State::key: ...;break;
        case State::array_begin:...;break;
        case State::array_cont:...;break;
        case State::object_begin:...;break;
        case State::object_key:...;break;
        case State::object_cont:...;break;
        default: throw json_parse_error::internal_error_invalid_state;
     }
}

Rekurzivní volání se simuluje přidáním nového stavu na vrchol zásobníku. Na zásobník stavů můžeme vložit i víc stavů, pokud chceme „programovat“ cestu návratu. Například při čtení klíče objektu vložíme object_key a key. Nejprve se parsuje klíč jako řetězec, pak se pokračuje object_key, který kontroluje přítomnost dvojtečky a ten vloží na zásobník detect, který způsobí načtení hodnoty (to může vést k další rekurzi). 

Stavy object_cont a array_cont využívají Level::_count k záznamu délky pole. Po uzavření pole nebo objektu se vytvoří span z posledních _count prvků a ty se předají do továrny. A nakonec se odstraní

case State::array_begin:
    levels.pop_back();
    if (c == ']') {
        items.push_back(fact.new_array(std::span<Node>{}));
    } else {
        src.put_back(); //tady vracím přečtený znak zpět vstupu
        levels.push_back({State::array_cont});
        levels.push_back({State::detect});
    }
 break;

case State::array_cont:
    ++levels.back()._count;
    if (c == ']') {
        auto iter = items.begin() + items.size() - levels.back()._count;
        auto result = fact.new_array(std::span<Node>(iter, items.end()));
        items.erase(iter,items.end());
        items.push_back(std::move(result));
        levels.pop_back();
    } else if (c == ',') {
        levels.push_back({State::detect});
    } else {
        throw json_parse_error::unexpected_separator;
    }
 break;

Tímto způsobem je rekurze přepsána na cyklus, který je realizován v rámci jedné korutiny. 

Celý kód

Celý kód tentokrát není v compiler exploreru, ale najdete ho v rámci knihovni libcoro jako v adresáři usecases, kam bych chtěl časem dávat nějaké užitečné použití korutin, které přímo nesouvisí s knihovnou, ale může se to hodit. Tyto soubory se také neinstalují jako součást knihovny a nepatří do testů.

Závěr

Co k tomu napsat, bylo to takové cvičení, ukázka, jistě by se to dalo napsat i jinak. 

Ještě jsem slíbil, že budeme diskutovat optimalitu řešení. Ve skutečnosti jsem ale žádné měření neprováděl, takže nemám v ruce žádné čísla a žádné argumenty. Mohu provést jen statickou analýzu. Oproti čistě synchronnímu řešení, tento parser zřejmě nebude rychlejší, spíš naopak. Už jen to, že musíme testovat konec zásobníku znaků při čtení každého znaku. Na druhou stranu, použití zásobníku hodnot zároveň jako pole hodnot při parsování kontejnerů vede podle mě na optimálnější kód než naivní rekurzivní řešení, kdy při čtení kontejnerů se na každé úrovni deklaruje prázdný vektor. Toto řešení umožňuje znovupoužití alokované paměti při čtení například obrovského pole objektů, kde všechny objekty mají pevný počet klíčů. Nicméně takto lze samozřejmě implementovat i synchronní verze, takže to není žádný benefit, který by byl exkluzivní pro asynchronní verzi. Hlavním benefitem zůstává nižší paměťová náročnost daná tím, že data lze načítat po částech a není třeba je skaldovat v mezipaměti.

Příště bych se podíval na serializaci. Je také potřeba ji dělat asynchronně? (je).

Bonus

do c = co_await src;  while (c>=0 && c <= 32);

Po přeložení v gcc -O3

00005555555569a1:   mov     %rcx,0x108(%r8)
208                           if (_pos < _str.length()) [[likely]] return true;
00005555555569a8:   cmp     0x20(%r8),%rax
00005555555569ac:   jae     0x5555555582aa
222                           if (_fetched) [[unlikely]]{
00005555555569b2:   cmpb    $0x0,0x38(%r8)
00005555555569b7:   jne     0x55555555826b
259                           return *(this->_M_str + __pos);
00005555555569bd:   mov     0x28(%r8),%rcx
238                           return _str[_pos++];
00005555555569c1:   mov     0x30(%r8),%rdx
00005555555569c5:   lea     0x1(%rdx),%rax
00005555555569c9:   mov     %rax,0x30(%r8)
00005555555569cd:   movsbl  (%rcx,%rdx,1),%ebp
290                           do c = co_await src;  while (c>=0 && c <= 32);
00005555555569d1:   mov     %bpl,0x141(%r8)
238                           return _str[_pos++];
00005555555569d8:   cmp     $0x21,%ebp
00005555555569db:   jb      0x5555555569a8

Sdílet