Writing a simple threaded queue class using TMonitor (2)

In my previous post, I introduced TMonitor. In this post, I’ll actually put it work by writing a simple threaded queue class with it.

‘Simple’ could mean many things of course, but in this context it will denote two things in particular: firstly, my class will simply wrap the standard generic TQueue; and secondly, there won’t be any maximum item count implemented. The latter means only the the dequeuers’ (i.e., the item takers) may become waiters, i.e., be involved in a Wait loop. For sure, entry into the monitor on the enqueuing (item adding) side may be blocked, but once the lock is taken, it won’t be released until the end of the Enqueue method.

Nonetheless, speaking of a ‘thread-safe queue’ might imply something even simpler. In particular, it might be taken to mean using a TCriticalSection instance to guard the underlying TQueue’s Enqueue and Dequeue methods, similar to what the old TThreadList class does vis-a-vis TList. Such an approach would be a bit too simple however — if one thread (the producer thread) ‘pushes’ or adds items and another (the consumer thread) ‘pops’ or takes them out, there will need to be a notification system on top to tell the consumer when the queue has items ready to be popped. Happily though, a notification system is precisely what the wait/pulse functionality of a monitor provides.

So, without further ado, here’s the interface of the TSimpleThreadedQueue class:

unit SimpleThreadedQueue;

interface

uses
  SysUtils, Classes, Generics.Collections, SyncObjs;

type
  TSimpleThreadedQueue = class
  strict private
    FQueue: TQueue;
    FClosed: Boolean;
  public
    constructor Create;
    destructor Destroy; override;
    function Enqueue(const Item: T; Timeout: LongWord = INFINITE): TWaitResult;
    function Dequeue(var Item: T; Timeout: LongWord = INFINITE): TWaitResult;
    procedure Close;
    property Closed: Boolean read FClosed;
  end;

Here, the Enqueue and Dequeue methods are named after the methods of TQueue they’ll wrap. Their signatures are a bit different though given the ability to specify a timeout. For the result type, I’m using the standard TWaitResult enumeration; specifically, wrSignaled will denote success, wrTimeout the function timing out, and wrAbandoned for when the queue has been closed.

With regards to the last of these, the Close method will both close the queue for potential enqueuers and drain it for potential dequeuers (the Closed property will then report whether Close has been called). While we will call Close in the destructor, Delphi’s manual memory management means it will need to be called explicitly if there’s a chance any consumer thread is waiting on Dequeue. This is because we need an object to lock against, yet this same object will be freed in the destructor. While an extra layer of blocking code could be added to prevent this object (which will be the internal TQueue instance) being freed too early, I don’t think the added complication is worth it.

So, here’s what the implementation of Create and Destroy look like — nothing you wouldn’t expect:

constructor TSimpleThreadedQueue.Create;
begin
  inherited Create;
  FQueue := TQueue.Create;
end;

destructor TSimpleThreadedQueue.Destroy;
begin
  Close;
  FQueue.Free;
  inherited Destroy;
end;

The Close method is pretty trivial too:

procedure TSimpleThreadedQueue.Close;
begin
  if FClosed then Exit;
  FClosed := True;
  TMonitor.Enter(FQueue);
  try
    FQueue.Clear;
    TMonitor.PulseAll(FQueue); //notify any waiters Closed is now True
  finally
    TMonitor.Exit(FQueue);
  end;
end;

Since we’re not implementing a maximum queue size, Enqueue is nice and easy as well: first, the lock is requested, and if we don’t time out, the item is then ‘pushed’ before we ‘pulse’ the change of state to the head of the monitor’s waiter list:

function TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult;
begin
  if Closed then Exit(wrAbandoned);
  if not TMonitor.Enter(FQueue, Timeout) then Exit(wrTimeout);
  try
    if Closed then Exit(wrAbandoned);
    FQueue.Enqueue(Item);
    TMonitor.Pulse(FQueue);
    Result := wrSignaled;
  finally
    TMonitor.Exit(FQueue);
  end;
end;

Lastly, we have Dequeue’s implementation. The only complication here is the fact we have a timeout value to implement across two separate calls, meaning we should decrement the timeout amount each time. For this, I’ve used TStopwatch – the GetTickCount API function might be used instead of course, but doing it this way keeps us 100% in the Delphi RTL:

function TSimpleThreadedQueue.Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
var
  Stopwatch: TStopwatch;
  TimeoutLeft: Int64;
begin
  if Closed then Exit(wrAbandoned);
  Stopwatch := TStopwatch.StartNew;
  if not TMonitor.Enter(FQueue, Timeout) then Exit(wrTimeout);
  try
    while not Closed and (FQueue.Count = 0) do
    begin
      TimeoutLeft := Timeout - Stopwatch.ElapsedMilliseconds;
      if TimeoutLeft < 0 then TimeoutLeft := 0;
      if not TMonitor.Wait(FQueue, LongWord(TimeoutLeft)) then Exit(wrTimeout);
    end;
    if Closed then Exit(wrAbandoned);
    Item := FQueue.Dequeue;
    Result := wrSignaled;
  finally
    TMonitor.Exit(FQueue);
  end;
end;

To compile, Diagnostics needs to be added to the implementation section’s uses clause:

uses Diagnostics;

This done, TSimpleThreadedQueue is complete.

Next up is to create a simple demo app. For this, we’ll design a basic form with a button, the pressing of which will generate some work for a worker thread to get on with. This work will be put in an ‘in’ queue (there will be more than one worker thread available), and when completed, it will be put in an ‘out’ queue for the main thread to pick up. For simplicity, each item of ‘work’ will be a string value; to simulate processing, a worker thread will merely sleep for a bit before pushing the value just popped into an ‘out’ queue. While plainly in the realm of demo code, this setup will still involve both a single producer, multiple consumer situation (the ‘in’ queue’) and a multiple producer, single consumer one (the ‘out’ queue).

So, create a new VCL forms application. In the main form’s unit, add both SyncObjs and SimpleThreadedQueue (i.e., the unit just created) to the uses clause, and the following fields to the form class itself:

    FCurWorkItem: Integer;
    FInQueue, FOutQueue: TSimpleThreadedQueue;
    FThreadsActive: TCountdownEvent;

In the form designer, set up the form to look like this:

screenshot

The buttons on the right hand side are on a right-aligned TPanel, the TMemo to the left is client aligned (AlignWithMargins is True too), and in the middle is a TTimer — set its Interval property to something like 250. Name the components memOutput, panBtns, btnEnqueueWorkItem, btnFinish and tmrWorkDone, before handling the form’s OnCreate event like this:

procedure TfrmSTQTest.FormCreate(Sender: TObject);
const
  MaxDelay = 2000;       //tweak as you please
  WorkerThreadCount = 4; //ditto
var
  I: Integer;
begin
  FThreadsActive := TCountdownEvent.Create(1); //has to be at least 1
  FInQueue := TSimpleThreadedQueue<string>.Create;
  FOutQueue := TSimpleThreadedQueue<string>.Create;
  for I := 1 to WorkerThreadCount do
    TThread.CreateAnonymousThread(
      procedure
      var
        Delay: Integer;
        S: string;
        ThreadID: TThreadID;
      begin
        FThreadsActive.AddCount;
        try
          ThreadID := TThread.CurrentThread.ThreadID;
          FOutQueue.Enqueue(Format('Thread %d is starting', [ThreadID]));
          while FInQueue.Dequeue(S) = wrSignaled do
          begin
            Delay := Random(MaxDelay);
            FOutQueue.Enqueue(Format('    Dequeued %s (thread %d); will now sleep for ' +
              '%d msecs to simulate processing', [S, ThreadID, Delay]));
            Sleep(Delay);
            FOutQueue.Enqueue(Format('      Finished processing %s (thread %d)',
              [S, ThreadID]));
          end;
          FOutQueue.Enqueue(Format('        Thread %d is terminating', [ThreadID]));
        finally
          FThreadsActive.Signal;
        end;
      end).Start;
end;

Here, we initialise and start running a number of worker threads, along with a TCountdownEvent to keep track of the number of threads that are currently running. (As a TCountdownEvent needs to start off with a positive counter value, we’ll say the initial 1 represents the main thread.) Next, handle the form’s OnDestroy event as so:

procedure TfrmSTQTest.FormDestroy(Sender: TObject);
begin
  btnEnqueueWorkItem.Enabled := False;
  FInQueue.Close;
  FThreadsActive.Signal;  //release our 1
  FThreadsActive.WaitFor; //wait for the worker threads to terminate
  FThreadsActive.Free;
  FInQueue.Free;
  FOutQueue.Free;
end;

Note the explicit call to the ‘in’ queue’s Close method. Doing this will cause the Dequeue call in the waiting worker threads to break off, returning wrAbandoned. After signalling the countdown event itself, the main thread then waits on the event being completely signalled (for the TCountdownEvent’s internal counter falling to zero in other words), which will happen once every worker thread has finished. Finally, the queue objects can actually be destroyed.

To give the worker threads something to do, handle the first button’s OnClick event like this:

procedure TfrmSTQTest.btnEnqueueWorkItemClick(Sender: TObject);
var
  S: string;
begin
  Inc(FCurWorkItem);
  S := Format('work item %d', [FCurWorkItem]);
  memOutput.Lines.Add('  Queuing ' + S + '...');
  FInQueue.Enqueue(S);
end;

The ‘finish’ button is just to be able to see the worker threads shutting down cleanly (not that they shouldn’t!) – you don’t actually need to click it before closing the form:

procedure TfrmSTQTest.btnFinishClick(Sender: TObject);
begin
  btnEnqueueWorkItem.Enabled := False;
  btnFinish.Enabled := False;
  FInQueue.Close;
end;

Finally, the timer is used to periodically poll the ‘out’ queue for data to display. Note that 0 is passed as the timeout value, meaning we will neither block on the lock already being held nor wait for new data to arrive:

procedure TfrmSTQTest.tmrWorkDoneTimer(Sender: TObject);
var
  S: string;
begin
  while FOutQueue.Dequeue(S, 0) = wrSignaled do //drain the out queue but don't wait on it
    memOutput.Lines.Add(S);
end;

That’s about it really. If you compile and run the resulting application, the most instructive approach is probably the most jejune: with the enqueue button focussed, just press and hold down ENTER for a bit. That should result in several work items being enqueued, interspersed with a few being completed as the four worker threads get to work. Obviously, tweaking the values used in the OnCreate handler will affect this – have the threads sleep for at most 200ms rather than 2000ms, for example, and the ‘in’ queue will be processed almost as quickly as you can fill it up.

[Update: merely a few hours after I posted, a question on StackOverflow pops up suggesting I should have used PulseAll rather than Pulse on the grounds PulseAll is surely always needed ‘except in the trivial cases when there are only two threads in total’. The number of threads is a red herring however – it’s the nature of the state change being signalled that matters. If the monitor’s signalling functionality were to be used for more than one thing (e.g., to establish a maximum item count too), PulseAll would be needed. As it is, Wait is only being called in one place, meaning any waiter is a legitimate taker of any Pulse (so to speak).]

Advertisements

Writing a simple threaded queue class using TMonitor (1)

While it’s been in the product for a few versions now, I’ve come across very little example code for TMonitor, despite its implementation having been heavily blogged about by its author (Allen Bauer) while he actually was writing it. In this post and its sequel, I’ll be looking to help rectify that, if only in a very small way.

Naturally, your first question may well be to ask what on earth is TMonitor. One answer is that it is a Delphi implementation of the Monitor class in .NET, having a very similar interface to the latter and essentially the same functionality. Of course, .NET didn’t invent the concept of a ‘monitor’, and you can read more about it on Wikipedia. In short, a monitor is the combination of a critical section and a ‘condition variable’. If you’re like me, the second part of this sounds very jargon-y, with the first leading to the thought ‘what’s the point?’ – after all, Delphi has a TCriticalSection class, or if you prefer, direct access to the Windows critical section API. In contrast, in .NET, Monitor is the critical section class, which – naturally enough – makes it quite essential over there (the fact the critical section aspect is also given some syntactical sugar in C# with the lock keyword can’t not help either). So, what is the point back in Delphi-land?

In essence, what TMonitor adds over a regular critical section object is a signalling capability — a very primitive signalling capability admitedly, but a signalling capability nonetheless. To give a flavour, here’s a more or less direct Delphi translation of a C# example of Joe Albahari’s (source):

program SimpleWaitPulse;

{$APPTYPE CONSOLE}

uses
  Classes;

var
  Go: Boolean;
  Thread: TThread;
begin
  Thread := TThread.CreateAnonymousThread(
    procedure
    begin
      TMonitor.Enter(Thread);
      try
        while not Go do
          TMonitor.Wait(Thread, INFINITE);
        WriteLn('Woken!!!');
      finally
        TMonitor.Exit(Thread);
      end;
    end);
  Thread.Start;
  ReadLn;
  TMonitor.Enter(Thread);
  try
    Go := True;
    TMonitor.Pulse(Thread);
  finally
    TMonitor.Exit(Thread);
  end;
  ReadLn;
end.

This starts a new thread that, on taking the lock, immediately releases it to wait on the Go variable becoming True. The latter happens when the user presses ENTER, upon which the main thread takes the lock, sets Go to True, ‘pulses’ the change in state, before finally releasing the lock.

Even though this is a very simple example – so simple in fact it fails to show up WriteLn’s lack of thread safety (ahem)! – there’s a few things to note from it:

  • As always with locking primitives, use of try/finally is very much necessary.
  • The sort of state waited on is entirely up to the discretion of the code that uses TMonitor.
  • The Wait and Pulse calls are made within an Enter/Exit pair. The call to Wait therefore releases the lock, allowing another thread to get in and call Pulse. The Pulse call itself then wakes the thread at the top of the Wait queue. In the present example there’s only one waiter, but if there were more than one and you wished to wake all of them, you should call PulseAll rather than Pulse several times.
  • Regarding the Wait call again, observe how it is made in the context of a while loop. In this case that isn’t strictly necessary, since there’s only going to be one waiter waiting on a single boolean condition. Matters would be different, though, whenever there is more than one consumer — between being released and acting on the pulsed state, that state may have been changed by another newly-released thread.

With this knowledge, we can use TMonitor to implement a simple threaded queue class, wrapping the standard TQueue. As for the actual implementation, you’ll have to wait to my next post though.