ACCU Home page ACCU Conference Page
Search Contact us ACCU at Flickr ACCU at GitHib ACCU at Google+ ACCU at Facebook ACCU at Linked-in ACCU at Twitter Skip Navigation

pinAn MWSR Queue with Minimalist Locking

Overload Journal #143 - February 2018 + Design of applications and programs   Author: Sergey Ignatchenko
Multithreaded queues come in many flavours. Sergey Ignatchenko describes his implementation of a multiple writer single reader queue.

Disclaimer: as usual, the opinions within this article are those of ‘No Bugs’ Hare, and do not necessarily coincide with the opinions of the translators and Overload editors; also, please keep in mind that translation difficulties from Lapine (like those described in [Loganberry04) might have prevented an exact translation. In addition, the translator and Overload expressly disclaim all responsibility from any action or inaction resulting from reading this article.

In [NoBugs17], we discussed the theory behind using CAS (Re)Actors to build multithreaded non-blocking primitives. A very brief recap:

  • Whenever we want non-blocking processing, we have to use CAS (Compare and Swap) operations
  • The idea of CAS (Re)Actors is to treat a CAS block (up to 128 bits in size on modern CPUs) as a state of the (Re)Actor; in other words, all we’re doing within one specific (Re)Actor always fits into the following pattern:
    • We read the state
    • We modify it if necessary
    • We write it back
  • In the context of CAS (Re)Actors, writing the state back is tricky: it requires CAS operations, which can fail due to some other thread interfering with us. If we fail, we simply drop the whole result and start anew; actually, this is a very typical pattern in CAS-based primitives. Another way to look at it is to consider it an incarnation of optimistic concurrency control.

As was mentioned in [NoBugs17], the benefit provided by CAS (Re)Actors is not about the sequence of CPU operations we’re issuing; in theory, exactly the same thing can be produced without (Re)Actors at all. The key benefit is about how we’re thinking about our multithreaded primitive, which tends to provide significant benefits in the complexity that we can handle. Today we’ll demonstrate a practical use of CAS (Re)Actors using one very specific example.

Optimistic concurrency control

OCC assumes that multiple transactions can frequently complete without interfering with each other. While running, transactions use data resources without acquiring locks on those resources. Before committing, each transaction verifies that no other transaction has modified the data it has read. [Wikipedia]

The task at hand

In quite a few rather serious real-world interactive distributed systems (such as stock exchanges and games), I found myself in need of a really fast queue, which had to have to have the following characteristics:

  • It should be an MWSR queue (allowing Multiple Writers, but only a Single Reader).
  • It should have flow control. In other words – if queue writers are doing better than queue readers – the queue must not grow infinitely. Instead, at some point (when the queue grows over a certain pre-defined limit), our queue must start blocking writers.
  • This means that our queue cannot possibly be 100% non-blocking. However, it should only be blocking. In other words, unless the queue is full, write should be non-blocking, and unless the queue is empty, read should be non-blocking too. NB: as this is a performance requirement, complying with it a mere 99.9% of the time is good enough.
  • As a side – but occasionally rather important – property, the queue should be at least almost-fair. 100% fair queues (the ones which guarantee first come, first served behaviour) are rather difficult to achieve, but an almost-fair property (~=‘there won’t be too much reordering in most of the scenarios’) is much easier to obtain. In particular, if we’re speaking about re-orderings caused by CAS failures (followed by an immediate retry along the lines of CAS (Re)Actors), then in practice, in 99.(9)% of the cases, unfairness will be limited to single-digit microseconds, which is OK for the vast majority of the use cases out there. In fact, most of the time, usually there are much more severe and unpredictable delays than those single-digit microseconds caused by CAS reorderings, so overall system behaviour will be pretty much indistinguishable from the behaviour of a perfectly fair system.

As much as I was in need of such a queue, it turned out to be an extremely difficult task, so that I wasn’t able to devise a system which avoids all the races. I did try to do it three or four times, but each time I found myself going into a vicious cycle of solving one race merely to create another one, and going into this ‘robbing Peter to pay Paul’ mode until I realized the futility of my efforts in that direction :-(. It was even worse as I was sure that a solution did exist – it is just that I wasn’t able to find it.

Enter CAS (Re)Actors

After several attempts at it, writing this queue-with-flow-control became a kind of personal obsession of mine, so there is no surprise that last year I took another shot at it. And as by that time I was spending quite a bit of time formalizing and generalizing my experiences with (Re)Actors, the idea of CAS-size (Re)Actors fortunately came to my mind. Let’s see how the CAS (Re)Actors did allow me to write that elusive MWSR queue with flow-control (we have to be sketchy here, but the whole supposedly-working implementation is available on Github [NoBugs18]).

(Re)Actors

Let’s say that our queue is based on two CAS (Re)Actors, EntranceReactor and ExitReactor:

  • The primary goal of EntranceReactor is to handle writers, providing slots in the queue and instructing them to block when no such slots are available.

    EntranceReactor’s state consists of:

    • firstIDToWrite – the first ID in the queue which is available for writing. NB: we consider all IDs as non-wrappable because (as was discussed in [NoBugs17]) it would take hundreds of years to wrap-around a 64-bit counter). The position of the first ID in the queue buffer can be calculated as a simple ID%QueueSize.
    • lastIDToWrite – the last ID which is available for writing (of course, we do want to allow more than one concurrent write to our fast-performing queue).
    • lockedThreadCount – the number of writers which are currently locked (because the queue is full).

    In accordance with CAS (Re)Actor doctrine, all operations over (Re)Actor are inherently atomic. For EntranceReactor, we define the following atomic operations:

    • allocateNextID() – allocates the next ID to the caller, and indicates whether the caller should lock for a while.
    • unlock() – indicates that the thread is unlocked.
    • moveLastToWrite(lastW) – here we’re telling our EntranceReactor that reader has already read everything up to lastW, so that it can allow more writes. moveLastToWrite() returns whether we should unlock one or more writers (which is an expensive operation so we want to avoid it as long as possible).
  • Our second (Re)Actor is ExitReactor, which handles our only reader; in particular, it maintains information about completed writes, so it can tell when information is available for reading.

    The state of ExitReactor consists of:

    • firstIDToRead – the first ID which is not read yet.
    • completedWritesMask – as there can be several concurrent writes, they can finish in an arbitrary order, so we have to account for them with a mask.
    • readerIsLocked – a simple flag, with semantics similar to lockedThreadCount (but as we have only one reader, a simple boolean flag is sufficient here).

    As for ExitReactor’s atomic operations, we define them as follows:

    • writeCompleted(ID) – indicates that the write of specific ID is completed.
    • startRead() – called by reader to start read, and either returns an ID to read, or indicates that we should lock instead.
    • readCompleted(ID) – indicates that the reader is done with reading; as it usually frees some space in the buffer, it returns a new ID where the write can be done.

For sizes of the fields, please refer to [NoBugs18]; however, it should be noted that, under the CAS (Re)Actors paradigm, we can easily use bit fields, so the only thing we care about is the combined bit size of the fields, which shouldn’t exceed the magic number of 128 (that is, for modern x64 CPUs which support the CMPXCHG16B instruction – and that is pretty much any Intel/AMD CPU produced over last 10 years or so).

On the ABA problem

As is well-known in MT programming, and was briefly discussed in [NoBugs17], the so-called ABA problem is one of those things which can easily kill the correctness of a multithreaded primitive. However, it seems that our (Re)Actors are free from ABA-related issues; very briefly:

  • As our IDs are monotonically increased and wraparound-free, all the writes which update at least one of the IDs are free from ABA problems (see also discussion on it in [NoBugs17]).
  • The fields lockedThreadCount and readerIsLocked have semantics with the property ‘it is only the current value which matters, and no history is relevant’, which also means that their updates are ABA-problem free.
  • This leaves completedWritesMask as the only potentially ABA-dangerous field. However, we can observe that if we consider the tuple (firstIDToRead,completedWritesMask) and take into account the logic behind these fields, this whole tuple is monotonically increased and wraparound-free; this, in turn, means that there is no potential for ABA problems here either <phew />.

Overall, from what I can see, our (Re)Actors are ABA-problem free; still, as an ABA problem is one of those bugs which can sit there for ages before manifesting itself, I would certainly appreciate somebody more skilled than me having another look at it.

Locking primitives

In addition to (Re)Actors, we have two locking primitives, both built more or less along the lines of Listing 1.

class LockedSingleThread {
private:
  int lockCount = 0;//MAY be both >0 and <0
  std::mutex mx;
  std::condition_variable cv;
public:
  void lockAndWait() {
    std::unique_lock<std::mutex> lock(mx);
    assert(lockCount == -1 || lockCount == 0);
    lockCount++;
    while (lockCount > 0) {
      cv.wait(lock);
    }
  }
  void unlock() {
    std::unique_lock<std::mutex> lock(mx);
    lockCount--;
    lock.unlock();
    cv.notify_one();
  }
};
			
Listing 1

It is a rather simple (but quite interesting) primitive, with the idea being that whenever some of our (Re)Actors return, we should lock. The corresponding thread calls lockAndWait() and waits on a conditional variable until some other thread calls unlock(). It is important to note that our locking primitives must unlock properly regardless of potential races between a thread being locked and unlock(). In other words, it should work regardless of whether unlock() comes before or after the thread scheduled to be locked reaches lockAndWait().

Putting it all together

Having all four building blocks (two (Re)Actors and two locking primitives), we can write our MWSRQueue (see Listing 2).

template<class QueueItem>
class MWSRQueue {
  static constexpr size_t QueueSize = 64;

private:
  QueueItem items[QueueSize];
  MT_CAS entrance;
  MWSRQueueFC_helpers::LockedThreadsList
    lockedWriters;
  MT_CAS exit;
  MWSRQueueFC_helpers::LockedSingleThread
    lockedReader;

public:
  MWSRQueue();
  void push(QueueItem&& item) {
    EntranceReactorHandle ent(entrance);
    std::pair<bool, uint64_t> ok_id =
      ent.allocateNextID();
    if (ok_id.first) {
      lockedWriters.lockAndWait(ok_id.second);
      ent.unlock();
    }
    size_t idx = index(ok_id.second);
    items[idx] = std::move(item);
    ExitReactorHandle ex(exit);
    bool unlock =
      ex.writeCompleted(ok_id.second);
    if (unlock)
      lockedReader.unlock();
  }
  QueueItem pop() {
    while (true) {
      ExitReactorHandle ex(exit);
      std::pair<size_t, uint64_t> sz_id =
        ex.startRead();
      size_t sz = sz_id.first;
      assert(sz <= QueueSize);
      if (!sz) {
        lockedReader.lockAndWait();
        // unlocking ex is done by
        // ex.writeCompleted()
        continue;//while(true)
      }
      uint64_t id = sz_id.second;
      size_t idx = index(id);
      QueueItem ret = std::move(items[idx]);
      uint64_t newLastW =
        ex.readCompleted(sz,id);
      EntranceReactorHandle ent(entrance);
      bool shouldUnlock =
        ent.moveLastToWrite(newLastW);
      if (shouldUnlock)
        lockedWriters.unlockAllUpTo(id +
          sz - 1 +QueueSize);
      return ret;
    } //while(true)
  }
private:
  size_t index(uint64_t i) {
    return i % QueueSize; //should be fast as
             // long as QueueSize is power of 2
  }
};
			
Listing 2

As we can see, after we have defined our (Re)Actors (including their operations), the whole thing is fairly simple. Within our push() function, we merely:

  • request an ID from EntranceReactor (and lockAndWait() if we’re told to do so)
  • write to the slot which corresponds to the ID. Note that while we’re actually writing, we’re not holding any locks, which is certainly a Good Thing™ concurrency-wise.
  • Inform ExitReactor that the write is completed (so reader can start reading the ID we just wrote)

As for our pop() function, it is only marginally more complicated:

  • We ask ExitReactor whether it is OK to read; if not, we’re locking (and re-trying from scratch later)
  • We read the data (again, at this point we’re not holding any kind of locks(!)).
  • We’re telling our ExitReactor that we’re done reading – and in response it may want to inform EntranceReactor that there is some room available. (This is implemented via a newLastW variable, but actually corresponds to sending a message – containing this one variable – from ExitReactor to EntranceReactor.)

That’s it! We’ve got our MWSRQueue, and with all the desired properties too. In particular, it is an MWSR queue, it does provide flow control, it locks only when it is necessary (on the queue being empty or full), and it is almost-fair (as IDs are assigned in the very first call to the allocateNextID(), the most unfairness which can possibly happen is limited to CAS retries, which are never long in practice).

However, IMNSHO the most important property of the queue is that it was observed to be easily debuggable. After I finished writing the code (which is around 700 LoC of heavily-multithreaded code, and is next-to-impossible to test until the whole thing is completed) and ran the simplistic tests found in the /test/ folder within [NoBugs18], there were, of course, bugs (like a dozen of them). And for multithreaded programs in general, debugging is a well-known nightmare (in particular, because (a) a bug manifests itself in a different place on different runs, and (b) adding tracing can easily change things too much so the bug won’t manifest itself anymore (!)). However, this specific queue happened to be debuggable very easily:

I was able to debug it within half a day.

I contend that anybody who has tried to debug multithreading programs of comparable complexity will realize how fast half-a-day is for this kind of not-so-trivial multithreading.

Maintainability

After it started to work, I ran some experiments, and found that with one single writer, it performed great: with real 128-bit CAS, I measured an upper bound performance of this queue at about 130 nanoseconds per push()+pop() pair. However, with more than one writer, performance was observed to degrade very severely (around 50 ×(!)).

After thinking about it for a few hours, I realized that, actually, the code used in the examples above can be improved a lot – in particular, we can (and should) avoid going into thread-sync stuff on each and every call to pop(). Indeed, as our queue can handle up to 64 slots at a time, we can read all of them into some kind of a ‘read cache’ (with proper synchronization), but then in subsequent calls to pop() we can easily read all the cached values without any thread sync involved. This optimization allowed me to improve performance in tests with two writers by over 50 × (so that performance with two writers became about the same as performance with one single writer). BTW, if you want to see the code with this ‘read cache’ optimization, it is a part of current implementation in [NoBugs18].

However, my main point here is not about the performance of this particular queue. What I want to emphasize is that:

  • In spite of the queue being rather complicated, it was easy to reason about it
  • After I realized what I want to do, implementing the whole thing (writing + debugging) took less than two hours(!). Once again, this is extremely fast for writing/debugging a significant change for reliably-working multithreaded programs.

Of course, a lot of further optimizations are possible for this queue (in particular, I am thinking of introducing ‘write caches’ along the lines of the ‘read cache’ above); still, even the current (not perfectly optimized) version in [NoBugs18] seems to perform pretty well under close-to-real-world usage patterns. On the other hand, please treat the code in [NoBugs18] as highly experimental, and be sure to test it very thoroughly before using it in any kind of production; multithreading bugs are sneaky, and there is always a chance that one of them did manage to hide within, in spite of all the reliability improvements provided by CAS (Re)Actors.

Conclusions

We have demonstrated how the real-world task of ‘creating an MWSR queue with flow control and minimal locking’ can be implemented using the concept of CAS (Re)Actors (which was discussed in detail in [NoBugs17]).

In the process, it was also observed that

Not only do CAS (Re)Actors allow us to write multithreaded programs very easily (by the standards of multithreaded programs, that is), but also CAS (Re)Actor-based programs are easily maintainable and easily optimizable.

As a nice side effect ;-), we also wrote a practically-usable MWSR queue with flow control and minimalistic locking, which can take as little as 120 nanoseconds per push()+pop() pair :-).

References

[Loganberry04] David ‘Loganberry’, Frithaes! – an Introduction to Colloquial Lapine!, http://bitsnbobstones.watershipdown.org/lapine/overview.html

[NoBugs17] ‘No Bugs’ Hare, CAS (Re)Actor for Non-Blocking Multithreaded Primitives, Overload #142, December 2017

[NoBugs18] ‘No Bugs’ Hare, mtprimitives, https://github.com/ITHare/mtprimitives/tree/master/src

[Wikipedia] Optimistic concurrency control (OCC)https://en.wikipedia.org/wiki/Optimistic_concurrency_control

Acknowledgement

Cartoon by Sergey Gordeev from Gordeev Animation Graphics, Prague

Overload Journal #143 - February 2018 + Design of applications and programs