In my previous post, I introduced TMonitor. In this post, I’ll actually put it work by writing a simple threaded queue class with it.
‘Simple’ could mean many things of course, but in this context it will denote two things in particular: firstly, my class will simply wrap the standard generic TQueue; and secondly, there won’t be any maximum item count implemented. The latter means only the the dequeuers’ (i.e., the item takers) may become waiters, i.e., be involved in a Wait loop. For sure, entry into the monitor on the enqueuing (item adding) side may be blocked, but once the lock is taken, it won’t be released until the end of the Enqueue method.
Nonetheless, speaking of a ‘thread-safe queue’ might imply something even simpler. In particular, it might be taken to mean using a TCriticalSection instance to guard the underlying TQueue’s Enqueue and Dequeue methods, similar to what the old TThreadList class does vis-a-vis TList. Such an approach would be a bit too simple however — if one thread (the producer thread) ‘pushes’ or adds items and another (the consumer thread) ‘pops’ or takes them out, there will need to be a notification system on top to tell the consumer when the queue has items ready to be popped. Happily though, a notification system is precisely what the wait/pulse functionality of a monitor provides.
So, without further ado, here’s the interface of the TSimpleThreadedQueue class:
unit SimpleThreadedQueue; interface uses SysUtils, Classes, Generics.Collections, SyncObjs; type TSimpleThreadedQueue = class strict private FQueue: TQueue; FClosed: Boolean; public constructor Create; destructor Destroy; override; function Enqueue(const Item: T; Timeout: LongWord = INFINITE): TWaitResult; function Dequeue(var Item: T; Timeout: LongWord = INFINITE): TWaitResult; procedure Close; property Closed: Boolean read FClosed; end;
Here, the Enqueue and Dequeue methods are named after the methods of TQueue they’ll wrap. Their signatures are a bit different though given the ability to specify a timeout. For the result type, I’m using the standard TWaitResult enumeration; specifically, wrSignaled will denote success, wrTimeout the function timing out, and wrAbandoned for when the queue has been closed.
With regards to the last of these, the Close method will both close the queue for potential enqueuers and drain it for potential dequeuers (the Closed property will then report whether Close has been called). While we will call Close in the destructor, Delphi’s manual memory management means it will need to be called explicitly if there’s a chance any consumer thread is waiting on Dequeue. This is because we need an object to lock against, yet this same object will be freed in the destructor. While an extra layer of blocking code could be added to prevent this object (which will be the internal TQueue instance) being freed too early, I don’t think the added complication is worth it.
So, here’s what the implementation of Create and Destroy look like — nothing you wouldn’t expect:
constructor TSimpleThreadedQueue.Create; begin inherited Create; FQueue := TQueue.Create; end; destructor TSimpleThreadedQueue.Destroy; begin Close; FQueue.Free; inherited Destroy; end;
The Close method is pretty trivial too:
procedure TSimpleThreadedQueue.Close; begin if FClosed then Exit; FClosed := True; TMonitor.Enter(FQueue); try FQueue.Clear; TMonitor.PulseAll(FQueue); //notify any waiters Closed is now True finally TMonitor.Exit(FQueue); end; end;
Since we’re not implementing a maximum queue size, Enqueue is nice and easy as well: first, the lock is requested, and if we don’t time out, the item is then ‘pushed’ before we ‘pulse’ the change of state to the head of the monitor’s waiter list:
function TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult; begin if Closed then Exit(wrAbandoned); if not TMonitor.Enter(FQueue, Timeout) then Exit(wrTimeout); try if Closed then Exit(wrAbandoned); FQueue.Enqueue(Item); TMonitor.Pulse(FQueue); Result := wrSignaled; finally TMonitor.Exit(FQueue); end; end;
Lastly, we have Dequeue’s implementation. The only complication here is the fact we have a timeout value to implement across two separate calls, meaning we should decrement the timeout amount each time. For this, I’ve used TStopwatch – the GetTickCount API function might be used instead of course, but doing it this way keeps us 100% in the Delphi RTL:
function TSimpleThreadedQueue.Dequeue(var Item: T; Timeout: LongWord): TWaitResult; var Stopwatch: TStopwatch; TimeoutLeft: Int64; begin if Closed then Exit(wrAbandoned); Stopwatch := TStopwatch.StartNew; if not TMonitor.Enter(FQueue, Timeout) then Exit(wrTimeout); try while not Closed and (FQueue.Count = 0) do begin TimeoutLeft := Timeout - Stopwatch.ElapsedMilliseconds; if TimeoutLeft < 0 then TimeoutLeft := 0; if not TMonitor.Wait(FQueue, LongWord(TimeoutLeft)) then Exit(wrTimeout); end; if Closed then Exit(wrAbandoned); Item := FQueue.Dequeue; Result := wrSignaled; finally TMonitor.Exit(FQueue); end; end;
To compile, Diagnostics needs to be added to the implementation section’s uses clause:
uses Diagnostics;
This done, TSimpleThreadedQueue is complete.
Next up is to create a simple demo app. For this, we’ll design a basic form with a button, the pressing of which will generate some work for a worker thread to get on with. This work will be put in an ‘in’ queue (there will be more than one worker thread available), and when completed, it will be put in an ‘out’ queue for the main thread to pick up. For simplicity, each item of ‘work’ will be a string value; to simulate processing, a worker thread will merely sleep for a bit before pushing the value just popped into an ‘out’ queue. While plainly in the realm of demo code, this setup will still involve both a single producer, multiple consumer situation (the ‘in’ queue’) and a multiple producer, single consumer one (the ‘out’ queue).
So, create a new VCL forms application. In the main form’s unit, add both SyncObjs and SimpleThreadedQueue (i.e., the unit just created) to the uses clause, and the following fields to the form class itself:
FCurWorkItem: Integer; FInQueue, FOutQueue: TSimpleThreadedQueue; FThreadsActive: TCountdownEvent;
In the form designer, set up the form to look like this:
The buttons on the right hand side are on a right-aligned TPanel, the TMemo to the left is client aligned (AlignWithMargins is True too), and in the middle is a TTimer — set its Interval property to something like 250. Name the components memOutput, panBtns, btnEnqueueWorkItem, btnFinish and tmrWorkDone, before handling the form’s OnCreate event like this:
procedure TfrmSTQTest.FormCreate(Sender: TObject); const MaxDelay = 2000; //tweak as you please WorkerThreadCount = 4; //ditto var I: Integer; begin FThreadsActive := TCountdownEvent.Create(1); //has to be at least 1 FInQueue := TSimpleThreadedQueue<string>.Create; FOutQueue := TSimpleThreadedQueue<string>.Create; for I := 1 to WorkerThreadCount do TThread.CreateAnonymousThread( procedure var Delay: Integer; S: string; ThreadID: TThreadID; begin FThreadsActive.AddCount; try ThreadID := TThread.CurrentThread.ThreadID; FOutQueue.Enqueue(Format('Thread %d is starting', [ThreadID])); while FInQueue.Dequeue(S) = wrSignaled do begin Delay := Random(MaxDelay); FOutQueue.Enqueue(Format(' Dequeued %s (thread %d); will now sleep for ' + '%d msecs to simulate processing', [S, ThreadID, Delay])); Sleep(Delay); FOutQueue.Enqueue(Format(' Finished processing %s (thread %d)', [S, ThreadID])); end; FOutQueue.Enqueue(Format(' Thread %d is terminating', [ThreadID])); finally FThreadsActive.Signal; end; end).Start; end;
Here, we initialise and start running a number of worker threads, along with a TCountdownEvent to keep track of the number of threads that are currently running. (As a TCountdownEvent needs to start off with a positive counter value, we’ll say the initial 1 represents the main thread.) Next, handle the form’s OnDestroy event as so:
procedure TfrmSTQTest.FormDestroy(Sender: TObject); begin btnEnqueueWorkItem.Enabled := False; FInQueue.Close; FThreadsActive.Signal; //release our 1 FThreadsActive.WaitFor; //wait for the worker threads to terminate FThreadsActive.Free; FInQueue.Free; FOutQueue.Free; end;
Note the explicit call to the ‘in’ queue’s Close method. Doing this will cause the Dequeue call in the waiting worker threads to break off, returning wrAbandoned. After signalling the countdown event itself, the main thread then waits on the event being completely signalled (for the TCountdownEvent’s internal counter falling to zero in other words), which will happen once every worker thread has finished. Finally, the queue objects can actually be destroyed.
To give the worker threads something to do, handle the first button’s OnClick event like this:
procedure TfrmSTQTest.btnEnqueueWorkItemClick(Sender: TObject); var S: string; begin Inc(FCurWorkItem); S := Format('work item %d', [FCurWorkItem]); memOutput.Lines.Add(' Queuing ' + S + '...'); FInQueue.Enqueue(S); end;
The ‘finish’ button is just to be able to see the worker threads shutting down cleanly (not that they shouldn’t!) – you don’t actually need to click it before closing the form:
procedure TfrmSTQTest.btnFinishClick(Sender: TObject); begin btnEnqueueWorkItem.Enabled := False; btnFinish.Enabled := False; FInQueue.Close; end;
Finally, the timer is used to periodically poll the ‘out’ queue for data to display. Note that 0 is passed as the timeout value, meaning we will neither block on the lock already being held nor wait for new data to arrive:
procedure TfrmSTQTest.tmrWorkDoneTimer(Sender: TObject); var S: string; begin while FOutQueue.Dequeue(S, 0) = wrSignaled do //drain the out queue but don't wait on it memOutput.Lines.Add(S); end;
That’s about it really. If you compile and run the resulting application, the most instructive approach is probably the most jejune: with the enqueue button focussed, just press and hold down ENTER for a bit. That should result in several work items being enqueued, interspersed with a few being completed as the four worker threads get to work. Obviously, tweaking the values used in the OnCreate handler will affect this – have the threads sleep for at most 200ms rather than 2000ms, for example, and the ‘in’ queue will be processed almost as quickly as you can fill it up.
[Update: merely a few hours after I posted, a question on StackOverflow pops up suggesting I should have used PulseAll rather than Pulse on the grounds PulseAll is surely always needed ‘except in the trivial cases when there are only two threads in total’. The number of threads is a red herring however – it’s the nature of the state change being signalled that matters. If the monitor’s signalling functionality were to be used for more than one thing (e.g., to establish a maximum item count too), PulseAll would be needed. As it is, Wait is only being called in one place, meaning any waiter is a legitimate taker of any Pulse (so to speak).]