The observer pattern is over two decades old. Alan Griffiths fits a venerable design pattern into a contemporary context.
Two decades ago the ‘Gang of Four’ popularised a pattern form and described twenty one patterns (and one anti-pattern). One of these is the Observer Pattern and the subject of this article.
I’ve been working on an open source project for the past couple of years and like many projects we found use for Observers . In order for you to appreciate the problems we encountered I first need to explain a little about the project. (I will be brief.)
The project is a library that allows the code that uses it to handle graphics and input devices in a manner that is portable across the Linux device drivers found on desktop and android devices. One area in which we used Observer was to monitor changes to the ‘surfaces’ representing things that appear on the screen(s). These need to be monitored by other components such as the ones that composite these surfaces onto the screens and the one that routes input.
There are a number of threads running in the application as this makes it easy to partition work between applications updating their surfaces, input events and interacting with display devices such as monitors. As I will describe, this leads to a need to address some synchronization issues.
It is hopefully evident that the ‘consumer’ of observations (e.g. one representing a monitor) generally outlive the surfaces being observed. Mostly the consumer is part of the application infrastructure and not part of the dynamic state of applications being launched, opening and closing windows and exiting.
For this case it is simplest to create an ‘observer’ object (that calls notification methods on the consumer) and pass its ownership to the ‘subject’. The consumer can then forget about everything except handling the notifications.
To summarise the differences from the classical Observer Pattern context :
- We’ve split the Observer role into Consumer and Observer
- We’re not using garbage collection and so need to explicitly address the lifetime of the objects
- We have multiple active threads
The subject maintains a collection of listeners and the naïve approach to synchronization is for the collection to be locked when being updated and when sending notifications. (In fact that is how we first implemented it.)
This works well until we hit one of two cases:
- The consumer takes some action that generates a new notification
- The consumer is destroyed (e.g. a monitor is unplugged) and needs to prevent further notifications to a dead object.
In case 1, if we have hold exclusive lock during notifications then we’ll get a deadlock.
In case 2, we expect the consumer to remove the observer from the subject’s collection after which notifications cease. And to ensure ‘after’ needs an ordering on removal and notifications we need something akin to the lock that causes deadlock in case 1.
One failed solution that we tried is to copy the collection before propagating a notification and release the lock before calling each of the observers. That doesn’t work with the above solution to case 2: after releasing the lock nothing prevents listeners being ‘removed’ on another thread before being notified through the copy.
Another solution we rejected was for the subject’s collection to be formed of
weak_ptr<>
s and the consumer to manage a collection of
shared_ptr<>
s to the observers it creates. Having an additional collection to manage in the consumer didn’t lead to any simplification. This might be different in other contexts but many of our consumers only need to register an observer and handle a few events without tracking either the subject or the observer.
Another (working) solution we tried was to hold a recursive lock during notifications and updates to the collection. That allowed other notifications to take place on the same thread and changes to the collection. Because the collection could change we took a copy of the collection and traversed that (to avoid iterators invalidating), but before invoking them also would check that objects exist in the ‘true’ collection. The disadvantage of this approach is that copying the collection is a lot of work to send each notification for collections that very rarely changed (your application might be updating the ‘surface’ at 60fps but adding and removing monitors might happen once a month).
You might think that we could avoid this trouble by sharing ownership of the consumer with the observer (so that the consumer could not ‘die’ while the observer exists). After all, that is exactly what would happen ‘automatically’ in a garbage collected language. The trouble is that it becomes extremely difficult to ensure that the consumer dies when one needs it to.
Eventually we came up with an ‘observer collection’ implementation that works for our specific circumstances. If you have massively parallel code or rapidly changing collections this is probably not going to work for you.
The code makes use of a
RecursiveReadWriteMutex
and associated
RecursiveReadLock
and
RecursiveWriteLock
. These work pretty much as one might expect – a read lock can be acquired provided there are no write locks and a write lock can be acquired unless another thread has a lock. (This isn’t a component of C++11, so we rolled our own – one day we will have shared and exclusive locks available to us in the standard library.)
Information about the observers is held in a singly linked list with atomic forward pointers that allow lock free traversal and expansion (Listing 1). (We don’t need contraction for our use case – we might end up with a few ‘free’ items in the list but not enough to be of concern.)
struct ListItem { ListItem() {} RecursiveReadWriteMutex mutex; shared_ptr<Observer> observer; atomic<ListItem*> next{nullptr}; ~ListItem() { delete next.load(); } }; |
Listing 1 |
There are three member functions to go along with this. The easy one is
for_each()
which is used to traverse the collection and send notifications. Each node is read locked in turn, and the supplied functor invoked. Note that we need to lock the node until we complete the notification to prevent a race with another thread removing the observer and deleting the consumer. No lock is needed for the traversal itself as we are assuming that no node is ever removed from the list. (Listing 2)
template<class Observer> void BasicObservers<Observer>::for_each( function<void(shared_ptr<Observer> const& observer)> const& f) { ListItem* current_item = &head; while (current_item) { RecursiveReadLock lock{current_item->mutex}; // We need to take a copy in case we recursively // remove during call if (auto const copy_of_observer = current_item->observer) f(copy_of_observer); current_item = current_item->next; } } |
Listing 2 |
The second function is to add an item. This searches for a free node. If it finds one it tries to upgrade to a write lock and, if it is still free, uses it to store the supplied observer. Otherwise a new node is added to the end of the list using the C++11 atomic ‘compare exchange’. This code also assumes that the list never shrinks as
current_item
has to remain valid. (Listing 3) One ‘gotcha’ here (spotted by the
Overload
review team) is that
compare_exchange_weak()
can ‘fail spuriously’
1
, so it is necessary to test that expected has changed before assigning it to
current_item
.
template<class Observer> void BasicObservers<Observer>::add( shared_ptr<Observer> const& observer) { ListItem* current_item = &head; do { // Note: we release the read lock to avoid two // threads calling add at the same time // mutually blocking the other's upgrade to // write lock. { RecursiveReadLock lock{current_item->mutex}; if (current_item->observer) continue; } RecursiveWriteLock lock{current_item->mutex}; if (!current_item->observer) { current_item->observer = observer; return; } } while (current_item->next && (current_item = current_item->next)); // No empty Items so append a new one auto new_item = new ListItem; new_item->observer = observer; for (ListItem* expected{nullptr}; !current_item->next.compare_exchange_weak (expected, new_item); expected = nullptr) { if (expected) current_item = expected; } } |
Listing 3 |
The final member function removes an observer by searching the list. The logic is very similar to that in
add()
. (Listing 4)
template<class Observer> void BasicObservers<Observer>::remove (shared_ptr<Observer> const& observer) { ListItem* current_item = &head; do { { RecursiveReadLock lock{current_item->mutex}; if (current_item->observer != observer) continue; } RecursiveWriteLock lock{current_item->mutex}; if (current_item->observer == observer) { current_item->observer.reset(); return; } } while ((current_item = current_item->next)); } |
Listing 4 |
It is a deceptively simple solution to a problem that for a while seemed intractable. I hope you enjoy it.
References
The project is called ‘Mir’ and can be found (including the full version of the code) at http://unity.ubuntu.com/mir/
Acknowledgements
The Mir team ( https://launchpad.net/~mir-team/+members ) especially Alberto Aguirre and Robert Carr for encountering this design context and working through a series of proposed resolutions.
The Overload team for seeing the code and text presented here with a fresh eye and spotting a few problems with correctness and clarity that had been overlooked.
-
http://www.cplusplus.com/reference/atomic/atomic/compare_exchange_weak/
: “Unlike
compare_exchange_strong
, this weak version is allowed to fail spuriously by returningfalse
even when expected indeed compares equal to the contained object . This may be acceptable behavior for certain looping algorithms, and may lead to significantly better performance on some platforms. On these spurious failures , the function returnsfalse
while not modifying expected .”