ACCU Home page ACCU Conference Page
Search Contact us ACCU at Flickr ACCU at GitHib ACCU at Google+ ACCU at Facebook ACCU at Linked-in ACCU at Twitter Skip Navigation

pinQueue with Position Reservation

Overload Journal #101 - February 2011 + Programming Topics   Author: Eugene Surman
Multiple threads can make processing a message queue faster. Eugene Surman needs the right data structure.

For the past five years I have mostly been developing multi-threaded messaging applications. While they were all quite different, there was one particular situation that kept recurring: sometimes it was required to maintain the sequential order of incoming and outgoing messages, even though they were being handled by multiple threads concurrently, and not necessarily in the same exact order they were received. I searched for a solution in many ready-made messaging libraries, but did not find anything satisfactory. So, I had to resort to developing a solution of my own: the PRQueue – a Queue with Position Reservation (or ‘seat reservation’).

PRQueue is implemented in C++ using two STL deques and the pthread library. Two simple classes – Mutex and Lock are used in the example to demonstrate the logic. A sample message is represented by the StringMsg class, and the QueueTest class is used as a test-bed application.

I chose deque as a main building block of the design because it has all necessary operations (including operator[]) to implement PRQueue. In particular, it’s important that the push_back() and pop_front() operations do not invalidate pointers and references to other elements of the deque.

Here is a simple example of how PRQueue can be utilized. Let’s say we need to log a stream of large multi-field messages. Converting numeric fields to text strings is a slow process that is not mission-critical, so we decided to offload this task to dedicated threads that will generate the log.

Initially, the processing diagram may look like figure 1.

Figure 1

Since the core processing of the messages takes place in multiple threads, the messages may be ready in an order that is different from the original input queue order: if, for example, one thread takes a message off the input queue and goes to sleep, while another thread takes the next message, runs to completion and places the processed message in the output queue, ahead of the first thread. As a result, the log entries may appear out of order. We assume that logging must be done after the messages are processed by the core routines.

Listing 1 is an example illustrating this point. I use the standard STL queue and 3 threads. This generates the output shown in Figure 2.

...
QueueTest quetest(3);
int i1 =0;
for( int i =10000; i; i--) {
  quetest.push( "| %d", i1++ );
  quetest.push( "- %d", i1++ );
}
...
			
Listing 1
Th# Time-stamp                  Msg#
1:  101108 15:04:49.576167  -   5243
3:  101108 15:04:49.576170  |   5244
1:  101108 15:04:49.576174  -   5245
3:  101108 15:04:49.576177  |   5246
3:  101108 15:04:49.576182  |   5248   // out
2:  101108 15:04:49.571945  |   4338   // of
1:  101108 15:04:49.576179  -   5247   // order
3:  101108 15:04:49.576188  -   5249
2:  101108 15:04:49.576189  |   5250
1:  101108 15:04:49.576191  -   5251
			
Figure 2

Using PRQueue the above scenario will be avoided. It will make sure the order of messages in the output queue matches the order that existed in the input queue, regardless of the order in which the core routines finish processing the messages.

The basic logic behind PRQueue is simple: when the next message is taken off the input queue, still inside the lock, the next push-back position, or ‘seat’, for the output queue is acquired. The lock is then released and the processing continues. After a message is fully processed the previously acquired position is used to place the message into the output queue.

Figure 3 shows the previous example re-written using PRQueue. The order of the messages in the log is now perfectly preserved.

Th# Time-stamp                  Msg#
2:  101108 15:04:49.571945  |   4338
...
...
1:  101108 15:04:49.576167  -   5243
3:  101108 15:04:49.576170  |   5244
1:  101108 15:04:49.576174  -   5245
3:  101108 15:04:49.576177  |   5246
1:  101108 15:04:49.576179  -   5247
3:  101108 15:04:49.576182  |   5248
3:  101108 15:04:49.576188  -   5249
2:  101108 15:04:49.576189  |   5250
1:  101108 15:04:49.576191  -   5251
			
Figure 3

PRQueue is constructed using two deques: ‘data’ and ‘filled’.

An element of ‘filled’ deque is an indicator showing that the position is filled with data and can be popped from PRQueue. A wrapper class DataQueue is a holder of ‘data’ and ‘filled’ deques. The PRQueue methods are for the most part ‘mutexed’ wrappers of DataQueue methods.

The design allows us to separate/hide thread safety code from the actual implementation, so the user shouldn’t be concerned with writing any locking/unlocking logic.

Let’s discuss PRQueue’s functionality in a bit more detail.

The PRQueue pop method does two things: it pops data from the input queue and reserves a push position in the output queue. The push method uses the previously reserved position to save data into the output queue.

For testing PRQueue with multiple threads a function process_msg is executed by every spawned thread. It pops a StringMsg from the input queue, processes the message by calling the StringMsg::process() method, and pushes the message out. (See Listing 2.)

// The function 'process_msg' is executed by every
// spawned input thread. The signature corresponds
// to the pthread_create 'start_routine' 
// File prqueue.cpp

void* process_msg( void* arg)
{
  int thidx = ++Thidx;
  QueueTest* quetest =(QueueTest*)arg;
  Msg* msg;
  PRQueue< Msg*>::position pos;
  
  cout << "Input thread=" << thidx <<
     " started" << endl;

  for(;;)
  {
    // Wait for the next available message in
    // input queue and pop it up, get the next
    // push position reserved in output queue
    quetest->input_que.pop(
       msg, quetest->output_que, pos);
    
      // Process message
      msg->process( thidx);
      
      // Push processed message into output queue
      // using reserved position
      quetest->output_que.push( msg, pos);
  }
  return NULL;
}
			
Listing 2

The pop method is not only waiting for the next message to arrive in the input queue, it also checks if the message is ready to be popped by looking at the element of the ‘filled’ queue. If data is not filled yet, pop will go back to sleep and wait.

Pop logic (Listing 3):

  • Lock input queue
  • If input queue is not empty and top element is filled with data, pop it (otherwise release lock and go to sleep)
  • Lock output queue
  • Reserve bottom position in output queue.
  • Unlock output queue
  • Unlock input queue
// Pop data from input queue and reserve position
// in output queue file prqueue.hpp
void PRQueue::pop( DATA& data, PRQueue& outque,
   PRQueue::position& pos)
{
  Lock lk( m_mux);
  
  // Waiting for the message in input queue - pop
  // message
  while( true) {
    if( m_que.pop( data))
      break;
    // either message has not arrived or position
    // is not filled
    wait_while_empty();
  }
  // Reserve position in output queue
  outque.reserve_pos( pos);
}

//
void PRQueue::reserve_pos(
   PRQueue::position& pos) {
  Lock lk( m_mux);
  m_que.reserve( pos);
}
			
Listing 3

The push method copies data to the reserved position of the output queue and sets the ‘filled’ indicator to true. It also releases threads waiting on a condition variable by sending a notification signal (prqueue.hpp) – see Listing 4.

// Push data using reserved position into output
// queue (prqueue.hpp)
void PRQueue::push( const DATA& data,
   const PRQueue::position& pos)
{
  Lock lk( m_mux);
  m_que.fill( data, pos);
  notify_not_empty();
}
			
Listing 4

Now, the messages are arriving in the output queue in order. If we want to extend the chain of our processing conveyor further, another PRQueue can be added to the end. In the test case above we don’t do it: we use a single output thread simply to read processed messages from the output queue and print them out. In that final step, a ‘simple pop’ method was used without its second and third arguments (references to the output queue and position value). See Listing 5.

// The function 'print_msg' executed by final
// single output thread file prqueue.cpp
void* print_msg( void* arg)
{
  QueueTest* quetest =(QueueTest*)arg;
  Msg* msg;

  cout << "Output thread started" << endl;
  for(;;)
  {
    // pop-up message from output queue and print it
    quetest->output_que.pop( msg);
    msg->print();
    delete msg;
  }
  return NULL;
}
			
Listing 5

Now, let’s take a look at the auxiliary class DataQueue.

As was mentioned before, DataQueue is a holder of two STL deques: ‘data’ and ‘filled’. The DataQueue also defines ‘structure position’ and methods where the key steps of position reservation and data popping happen.

The DataQueue is included in PRQueue as a data-member m_que (see Listing 6).

// An auxiliary class DataQueue - holder of 'data'
// and 'filled' deques
template< typename DATA> class DataQueue
{

public:
  typedef typename
     deque< DATA>::pointer data_pointer;
  typedef typename
     deque< bool>::pointer filled_pointer;

  // Structure to hold pointers of reserved
  // position
  struct position {
    position() : data_pnt(0), filled_pnt(0) {} 
    data_pointer data_pnt;
    filled_pointer filled_pnt;
  };

  // Check if data deque is not empty and front
  // element is 'filled'.
  // Copy front data out, pop-up front elements
  // of both deques
  bool pop( DATA& out) {
    if( m_data_que.empty() || 
       ! m_filled_que.front())
      return false;
    out = m_data_que.front();
    m_data_que.pop_front();
    m_filled_que.pop_front();
    return true;
  }
  // Add dummy elements to the back of both
  // deques.
  // Save pointers of both elements to the output
  // position
  void reserve( position& pos) {
    m_data_que.push_back( m_dummy);
    m_filled_que.push_back( false);
    pos.data_pnt =
       &m_data_que[ m_data_que.size() -1];
    pos.filled_pnt =
       &m_filled_que[ m_filled_que.size() -1];
  }
  // Copy data and set 'filled' indicator by
  // position
  void fill( const DATA& data,
     const position& pos) {
    *pos.data_pnt = data;
    *pos.filled_pnt = true;
  }
  void push( const DATA& data) {
    m_data_que.push_back( data);
    m_filled_que.push_back( true);
  }

private : 
  deque<DATA> m_data_que;
  deque<bool> m_filled_que;
  DATA m_dummy;
};//DataQueue
			
Listing 6

To compile and run PRQueue test, use the commands in Figure 4.

   c++ -I. prqueue.cpp -lpthread                # PRQueue test
   c++ -I. prqueue.cpp -lpthread -DSIMPLE_QUE   # SimpleQueue test
   
   # Try long message
   c++ -I. prqueue.cpp -lpthread -DLONG_MSG
   c++ -I. prqueue.cpp -lpthread -DLONG_MSG -DSIMPLE_QUE

   a.out [number-of-messages]
			
Figure 4

Conclusion

The queue with Position Reservation (PRQueue) presented here could be useful in multi-threaded applications when the order of streaming messages should be preserved. PRQueue will make sure that the order of messages in the output queue exactly matches the order that existed in the input queue, because the next push-back position in the output queue is reserved synchronously with taking the message off the input queue. The reserved spot is later filled with data when the message is done processing and ready.

Reference

A zip file containing the code is available at:http://accu.org/content/journals/ol101/prqueue.zip

Overload Journal #101 - February 2011 + Programming Topics