C++20: A Coroutine Based Stream Parser

C++20: A Coroutine Based Stream Parser

By Andreas Fertig

Overload, 30(168):6-8, April 2022


Stream parsing code can be very complicated. Andreas Fertig uses coroutines to make stream parsing code clearer.

In this article, I want to show you how C++20’s coroutines can help you when writing a stream parser. This is based on a protocol Andrew Tanenbaum describes in his book Computer Networks. The protocol consists of a Start Of Frame (SOF) and ESC to escape characters like SOF. Hence, ESC + SOF mark the start of a frame as well as the end. In today’s exercise, we will parse the string Hello World (yes, without the comma, sorry). For simplicity, I am using strings and not bytes. The steam shown in Figure 1 is what we are going to process.

Figure 1

The classic way

Before C++20, we could have quickly written a function ProcessNextByte, which would have dealt with tokenizing the stream. It looks for the ESC + SOF for the start of a frame and calls the callback frameCompleted once a frame is complete. Error cases are not treated for simplicity here. They are silently discarded. Listing 1 is a version of ProcessNextByte.

template<typename T>
void ProcessNextByte(byte b, T&& frameCompleted)
{
  static std::string frame{};
  static bool        inHeader{false};
  static bool        wasESC{false};
  static bool        lookingForSOF{false};

  if(inHeader) {
    if((ESC == b) && not wasESC) {
      wasESC = true;
      return;
    } else if(wasESC) {
      wasESC = false;

      if((SOF == b) || (ESC != b)) {
        // if b is not SOF discard the frame
        if(SOF == b) { frameCompleted(frame); }

        frame.clear();
        inHeader = false;
        return;
      }
    }
    frame += static_cast<char>(b);
  } else if((ESC == b) && !lookingForSOF) {
    lookingForSOF = true;
  } else if((SOF == b) && lookingForSOF) {
    inHeader      = true;
    lookingForSOF = false;
  } else {
    lookingForSOF = false;
  }
}
			
Listing 1

There are other ways to implement this, such as using a class and getting rid of the static variables. However, a function is essentially what we like here. This is not really a job for a class. Even with a class, tracking the state and quickly seeing where we are remains complicated. Another way, of course, is to bring in the big guns and use some kind of state pattern using virtual methods. I once had the pleasure of working with such a beast. It turned out that it was very hard to see the control flow, and in the end, all calls ended up in one single class, which contained the actual logic.

However, one upside of using a class would be that we can have multiple objects parsing multiple streams. With the given ProcessNextByte we can only parse exactly one stream.

Coroutines applied

Listing 2 is the same logic, this time as a coroutine.

FSM Parse()
{
  while(true) {
    byte        b = co_await byte{};
    std::string frame{};

    if(ESC == b) {
      b = co_await byte{};

      if(SOF != b) { continue; } 
      //  not looking at a start sequence

      while(true) {  capture the full frame
        b = co_await byte{};

        if(ESC == b) {
          //  skip this byte and look at the
          // next one
          b = co_await byte{};

          if(SOF == b) {
            co_yield frame;
            break;
          } else if(ESC != b) { // out of sync
            break;
          }
        }
        frame += static_cast<char>(b);
      }
    }
  }
}
			
Listing 2

There are various great things. First, Parse is not a template, as ProcessNextByte was. By looking at the function’s body, the states are much clearer to see. We have two infinite-loops (which is a bad thing on some embedded systems as infinite-loops can usually be avoided) where the outer is really infinite and the inner runs as long as the frame is incomplete. We can see that this code first looks for ESC and then for SOF . If ESC is found, but the next byte is not SOF, the search for ESC starts over. The logic above was way more complicated and harder to see through.

Once ESC followed by SOF is found, the inner loop starts . It adds all received bytes to the local variable frame. But before that, Parse checks whether we are looking at an escape sequence . If so, the escape character is removed and replaced by the unescaped version. In our case, this is necessary for ESC as its value is 'H'. As there are no other characters we need to escape, for now, we only check whether the byte following ESC is either SOF or ESC. Both are valid. Any other byte is a processing error, which we silently discard by throwing away the current frame and starting over from the beginning.

I claim that this code is fairly straightforward to read, without knowing what co_await and co_yield do. I’m sorry about the co_, but coroutines were late to the party, and keywords like await or yield were already used in code-bases, hence the prefix.

We can also see something new. Parse returns FSM, my name for this coroutine type.

While I like most about the version I showed you above, there is something I don’t like as much. We need to write a huge amount of boilerplate code to make all the coroutine elements come alive. Sadly, C++20 does not ship with a coroutine library. Everything in the coroutine header is a very low-level construct, the building blocks required for a great coroutine library. Lewiss Baker maintains cppcoro in GitHub [Baker] and is the author of various proposals for a great coroutines library in C++23. If you are looking for a promising shortcut that has a good chance of ending up in the STL, cppcoro is the choice.

Coroutines (an explanation from Fran Buontempo)

If you’re not familiar with coroutines, have a look at ‘Concurrency, Parallelism and Coroutines’ by Anthony Williams from ACCU 2017 [Williams17]. Coroutines are just functions, which can be stopped and restarted or resumed.

Coroutines are stackless: they suspend execution by returning to the caller and the data that is required to resume execution is stored separately from the stack. This allows for sequential code that executes asynchronously. [cpp]

They hinge on three terms: co_await, which suspends the function and returns control to the caller until it’s resumed; co_yield, which yields a value to the caller (and then picks up where it left off when called again); and co_return, which completes execution and returns a value.

BYOC (Bring your own coroutine)

Let’s look at how we can implement the missing pieces without any other library, only the STL.

The type FSM

First, what is FSM?

It is a using alias to async_generator<std::string, byte>:

  using FSM = async_generator<std::string, byte>;

async_generator is another type we have to write, one way or the other, to satisfy the coroutine interface. This type consists of a special type, or in this case, a using alias promise_type. This is a name the compiler looks for to determine whether async_generator is a valid coroutine type. Precisely with this spelling! In Listing 3, we see the implementation of async_generator.

template<typename T, typename U>
struct [[nodiscard]] async_generator
{
  using promise_type = 
    promise_type_base<T, async_generator, 
    awaitable_promise_type_base<U>>;
  using PromiseTypeHandle = 
    std::coroutine_handle<promise_type>;
  T operator()()
  {
    //  the move also clears the mValue of the
    // promise
    auto
      tmp{std::move(mCoroHdl.promise().mValue)};
    //  but we have to set it to a defined state
    mCoroHdl.promise().mValue.clear();
    return tmp;
  }
  void SendSignal(U signal)
  {
    mCoroHdl.promise().mRecentSignal = signal;
    if(not mCoroHdl.done()) { mCoroHdl.resume(); }
  }
  async_generator(async_generator const&) = 
    delete;
  async_generator(async_generator && rhs)
  : mCoroHdl{std::exchange(rhs.mCoroHdl, nullptr)}
  {}
  ~async_generator()
  {
    if(mCoroHdl) { mCoroHdl.destroy(); }
  }
private:
  friend promise_type;  //  As the default ctor
            // is private G needs to be a friend
  explicit async_generator(promise_type * p)
  : mCoroHdl(PromiseTypeHandle::from_promise(*p))
  {}
  PromiseTypeHandle mCoroHdl;
};
			
Listing 3

With the call operator, we get access to the value from inside the coroutine. This is the receiving end of something that can be seen as a pipe. The pipe is filled on the other end, inside the coroutine, by co_yield.

We can put data into the pipe with SendSignal. This is received inside the coroutine by a co_await.

Because async_generator holds a communication channel with our coroutine, we only allow this type to be moveable but not copyable. An instance of this type is created for us by the compiler when instantiating an object of type promise_type. This is why I chose to make the constructor of async_generator private and say that promise_type is a friend. This is just to prevent misuse or false assumptions.

PromiseTypeHandle is a handle to the current coroutine. With it, we can transfer data between normal and coroutine code (e.g. co_yield and co_await).

Next up is the promise_type. The using alias is directing to promise_type_base, which is composed of T, async_generator, awaitable_promise_type_base<U>. So, two more new types.

The promise_type_base

First, the reason for the suffix _base is that the entire example uses two generators. One for the parsing logic we are looking at and another one for simulating a data stream. Listing 4 is the implementation.

template<typename T, typename G, 
  typename... Bases>  
  //  Allow an optional base class
struct promise_type_base : public Bases... {
  T mValue;
  auto yield_value(T value)
  {
    mValue = value;
    return std::suspend_always{};
  }
  G get_return_object() { return G{this}; };
  std::suspend_always initial_suspend() 
    { return {}; }
  std::suspend_always final_suspend() noexcept 
    { return {}; }
  void return_void() {}
  void unhandled_exception() { std::terminate(); }
};
			
Listing 4

This generator satisfies the co_yield interface of a coroutine. A very rough view is that the call co_yield is replaced by the compiler calling yield_value. So promise_type_base serves as a container for the value coming from the coroutine into our normal code. All the other methods are just to satisfy the coroutine interface. As we can see, coroutines are highly customizable, but that is a different story.

We can also see that promise_type_base derives from the third parameter passed to it. In our case, this is awaitable_promise_type_base<U>. This is a variadic argument, simply to allow creating a promise_type_base object without a base class.

The awaitable_promise_type_base

Next, we need to write the glue code for co_await, which waits for the next byte. This is the part of awaitable_promise_type_base, creating a so-called Awaitable-type. In Listing 5, you can see an implementation.

template<typename T>
struct awaitable_promise_type_base {
  std::optional<T> mRecentSignal;
  struct awaiter {
    std::optional<T>& mRecentSignal;
    bool await_ready() {
      return mRecentSignal.has_value(); }
    void await_suspend(std::coroutine_handle<>) {}
    T await_resume()
    {
      assert(mRecentSignal.has_value());
      auto tmp = *mRecentSignal;
      mRecentSignal.reset();
      return tmp;
    }
  };
  [[nodiscard]] awaiter await_transform(T) {
    return awaiter{mRecentSignal}; }
};
			
Listing 5

With these pieces, we have the FSM coroutine type and can use Parse. Figure 2 shows which element in our code interacts with which part of the async_generator.

Figure 2

Summary

I hope I could demonstrate that coroutines are a great tool when it comes to implementing parsers. The boiler-plate code aside, they are easy to write and highly readable. You can find the complete version of this example on godbolt.org [Fertig].

Until we have C++23 and hopefully much better coroutines support in the STL, we either have to write code as above or use cppcoro [Baker].

As always, I hope you learned something from this article. Feel free to reach out to me on Twitter to give feedback (https://twitter.com/Andreas__Fertig).

References

[Baker] Lewiss Baker, cppcoro on Github: https://github.com/lewissbaker/cppcoro

[cpp]‘Coroutines (C++20)’, available at: https://en.cppreference.com/w/cpp/language/coroutines

[Fertig] An electronic version of the original blog post and all examples is available on godbolt.org: https://godbolt.org/z/8hEffT

[Williams17] Anthony Williams, ‘Concurrency, Parallelism and Coroutines’ from ACCU 2017 available at: https://www.youtube.com/watch?v=UhrIKqDADX8

This article was first published on Andreas Fertig’s blog (https://andreasfertig.blog/2021/02/cpp20-a-coroutine-based-stream-parser/) on 2 February 2021.

It is a short version of ‘Chapter 2: Coroutines’, from his latest book Programming with C++20. The book contains a more detailed explanation and more information about this topic.

Andreas Fertig is a trainer and lecturer on C++11 to C++20, who presents at international conferences. Involved in the C++ standardization committee, he has published articles (for example, in iX) and several textbooks, most recently Programming with C++20. His tool – C++ Insights (https://cppinsights.io) – enables people to look behind the scenes of C++, and better understand constructs.






Your Privacy

By clicking "Accept Non-Essential Cookies" you agree ACCU can store non-essential cookies on your device and disclose information in accordance with our Privacy Policy and Cookie Policy.

Current Setting: Non-Essential Cookies REJECTED


By clicking "Include Third Party Content" you agree ACCU can forward your IP address to third-party sites (such as YouTube) to enhance the information presented on this site, and that third-party sites may store cookies on your device.

Current Setting: Third Party Content EXCLUDED



Settings can be changed at any time from the Cookie Policy page.