Fixing TThreadedQueue…? (Or in other words: TMonitor… again!)

[Update: fixes for TMonitor were made in XE2 update 4. Consequently, this post should be considered for historical information only.]

Probably out of masochism or something, I’ve been looking at TMonitor again. For anyone who doesn’t recall the earlier episodes, TMonitor is the ‘new’ (and rather low-level) threading device introduced in D2009. Alas, but it proved buggy as hell, as well as slow. Given the publicity however, it received some love for XE2 RTM. So, all right in the end? Er no, since Darian Miller on StackOverflow quickly tweaked one of the test projects that demonstrated its brokeness in XE and… found it still broken in XE2!

More exactly, the test project in question uses the TThreadedQueue class. Having experimented a bit more with TMonitor and found it seemingly OK, I thought I’d investigate the TThreadedQueue implementation. My initial idea was to test its logic distinct from TMonitor’s by replacing use of TMonitor with calls to equivalent Windows APIs (a monitor combines a critical section and a ‘conditional variable’, and the Windows API acquired a condition variable primitive in Vista). However, on inspecting the code, I found it using the (to me) rather tricky-to-comprehend 3 parameter overload of TMonitor.Wait – and in my usage, I’ve stuck to the simpler, 2 parameter overload. This made me think: if I just rewrote TThreadedQueue similarly, would that fix the problem? I think it has, but I would be grateful if anyone else could verify:

1. Download the problem project from the Windows QC client – browse to report number 101114, and get the DPR from the ‘attachments’ tab.

2. Open the project in the IDE, and add a new unit to it.

3. Copy the code for TThreadedQueue from System.Generics.Collections.pas. As it is almost completely self-contained, you need only System.SyncObjs in the new unit’s uses clause.

4. In the copied class definition, remove FQueueNotEmpty and FQueueNotFull. Everything else in the interface stays the same however.

5. Remove the lines constructing and freeing FQueueNotEmpty and FQueueNotFull in Create and Destroy.

6. Change the implementation of the substantive PopItem variant to this:

function TThreadedQueue<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
begin
  AItem := Default(T);
  TMonitor.Enter(FQueueLock);
  try
    while (FQueueSize = 0) and not FShutDown do
      if not TMonitor.Wait(FQueueLock, FPopTimeout) then Exit(wrTimeout);
    if FShutDown then Exit(wrAbandoned);
    Result := wrSignaled;
    AItem := FQueue[FQueueOffset];
    if FQueueSize = Length(FQueue) then
      TMonitor.PulseAll(FQueueLock);
    Dec(FQueueSize);
    Inc(FQueueOffset);
    Inc(FTotalItemsPopped);
    if FQueueOffset = Length(FQueue) then
      FQueueOffset := 0;
  finally
    AQueueSize := FQueueSize;
    TMonitor.Exit(FQueueLock);
  end;
end;

The main thing here is that we are now using the simpler variant of Wait. As an aside, it’s OK to ‘pulse’ as early as we do, since waiting threads only get notified once we’ve exited the monitor.

7. Change the implementation of the substantive PushItem overload to this:

function TThreadedQueue<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
begin
  TMonitor.Enter(FQueueLock);
  try
    while (Length(FQueue) = FQueueSize) and not FShutDown do
      if not TMonitor.Wait(FQueueLock, FPushTimeout) then Exit(wrTimeout);
    if FShutDown then Exit(wrAbandoned);
    Result := wrSignaled;
    if FQueueSize = 0 then TMonitor.PulseAll(FQueueLock);
    FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
    Inc(FQueueSize);
    Inc(FTotalItemsPushed);
  finally
    AQueueSize := FQueueSize;
    TMonitor.Exit(FQueueLock);
  end;
end;

8. Change DoShutdown to this:

procedure TThreadedQueue<T>.DoShutDown;
begin
  FShutDown := True;
  TMonitor.PulseAll(FQueueLock);
end;

I’ve removed the locking around setting FShutdown since I don’t see what good it does (no one will be trying to set it to False).

9. Rerun the problem project: hopefully, it will now work (meaning, it will count up to 1000 without crashing), notwithstanding the fact we haven’t actually fixed the bug in TMonitor itself…

Advertisements

The tyranny of simple tests

Last TMonitor post I promise (well, if until the ‘hotfix’ for TMonitor.Wait gets forgotten about…): it seems TMonitor’s apparent slowness in uncontended scenarios as discussed in my previous post is less than it first appeared. In fact, by tweaking my original test, I can now get it to perform faster than TCriticalSection so long as more than one thread is created. Credit for this discovery must go to commentator Krystian over at Eric Grange’s blog – specifically, the problem with TMonitor’s apparent performance was primarily a function of it needing to dynamically allocate a small bit of memory for its own state, which when several TMonitor’s are intialised in quick succesion leads them to allocate memory in the same processor cache line. Ensure the TMonitor is initialised up front though, along with something else getting allocated at the same time, and the problem goes away.

While this has TMonitor now besting TCriticalSection, TRTLCriticalSection still comes out on top: quite simply, even though TCriticalSection could barely be any lighter, the simple fact of it being a class ‘kills’ its performance. This is then aggrevated by the fact its Enter and Leave methods are inlined to call the virtual Acquire and Release rather than vice versa. OK, other things being equal it should be the other way round, but really…

Anyhow, onto the revised test: since there were complaints about me using TCountdownEvent and TStopwatch, I’ve replaced them with direct API calls. While TStringList and indeed the VCL are nonetheless perfectively acceptable (?), I’ve nonetheless kept the test program as a console application. However, I’ve taken memory deallocation out of the picture by freeing the thread classes explicitly (explicit memory allocation was already not tested for given I created the thread classes before timing started). So, without further ado, here’s the new code:

program TMonitorVsTCriticalSectionV2;

{$APPTYPE CONSOLE}

uses
  Windows,
  SysUtils,
  Classes,
  SyncObjs;

type
  TTestThread = class(TThread)
  strict private
    FSomeHeapData: IInterface;
  public
    constructor Create; virtual;
  end;

  TTestThreadClass = class of TTestThread;

constructor TTestThread.Create;
begin
  inherited Create(True);
  FSomeHeapData := TInterfaceList.Create;
end;

procedure RunTest(const TestName: string; ThreadCount: Integer;
  ThreadClass: TTestThreadClass);
var
  I: Integer;
  Threads: array of TThread;
  ThreadHandles: array of THandle;
  StartCounts, EndCounts, CountsFreq: Int64;
begin
  SetLength(ThreadHandles, ThreadCount);
  SetLength(Threads, ThreadCount);
  for I := 0 to ThreadCount - 1 do
  begin
    Threads[I] := ThreadClass.Create;
    ThreadHandles[I] := Threads[I].Handle;
  end;
  QueryPerformanceCounter(StartCounts);
  for I := 0 to ThreadCount - 1 do
    Threads[I].Start;
  WaitForMultipleObjects(ThreadCount, @ThreadHandles[0], True, INFINITE);
  QueryPerformanceCounter(EndCounts);
  QueryPerformanceFrequency(CountsFreq);
  //free the threads explicitly to take memory deallocation out of the equation
  for I := 0 to ThreadCount - 1 do
    Threads[I].Free;
  Writeln(TestName, ' ', ThreadCount, ' thread(s) took ',
    Round((EndCounts - StartCounts) * 1000 / CountsFreq), 'ms');
end;

const
  CountdownFrom = $FFFFFF; //increase if necessary...
  MaxThreads = 10;

type
  TCriticalSectionThread = class(TTestThread)
  protected
    FCriticalSection: TCriticalSection;
    procedure Execute; override;
  public
    constructor Create; override;
    destructor Destroy; override;
  end;

  TCriticalSectionThreadNoVirt = class(TCriticalSectionThread)
  protected
    procedure Execute; override;
  end;

  TMonitorThread = class(TTestThread)
  protected
    procedure Execute; override;
  public
    constructor Create; override;
  end;

  TRTLCriticalSectionThread = class(TTestThread)
  strict private
    FCriticalSection: TRTLCriticalSection;
  protected
    procedure Execute; override;
  public
    constructor Create; override;
    destructor Destroy; override;
  end;

  TRTLCriticalSectionThreadDynAlloc = class(TTestThread)
  strict private
    FCriticalSection: PRTLCriticalSection;
  protected
    procedure Execute; override;
  public
    constructor Create; override;
    destructor Destroy; override;
  end;

constructor TCriticalSectionThread.Create;
begin
  inherited Create;
  FCriticalSection := TCriticalSection.Create;
end;

destructor TCriticalSectionThread.Destroy;
begin
  FCriticalSection.Free;
  inherited Destroy;
end;

procedure TCriticalSectionThread.Execute;
var
  Counter: Integer;
begin
  Counter := CountdownFrom;
  repeat
    FCriticalSection.Enter;
    try
      Dec(Counter);
    finally
      FCriticalSection.Leave;
    end;
  until (Counter <= 0);
end;

type
  TCSAccess = class(TCriticalSection);

procedure TCriticalSectionThreadNoVirt.Execute;
var
  Counter: Integer;
begin
  Counter := CountdownFrom;
  repeat
    TCSAccess(FCriticalSection).FSection.Enter;
    try
      Dec(Counter);
    finally
      TCSAccess(FCriticalSection).FSection.Leave;
    end;
  until (Counter <= 0);
end;

constructor TMonitorThread.Create;
begin
  inherited;
  //force our monitor to be initialised
  TMonitor.Enter(Self);
  TMonitor.Exit(Self);
end;

procedure TMonitorThread.Execute;
var
  Counter: Integer;
begin
  Counter := CountdownFrom;
  repeat
    TMonitor.Enter(Self);
    try
      Dec(Counter);
    finally
      TMonitor.Exit(Self);
    end;
  until (Counter <= 0);
end;

constructor TRTLCriticalSectionThread.Create;
begin
  inherited Create;
  InitializeCriticalSection(FCriticalSection);
end;

destructor TRTLCriticalSectionThread.Destroy;
begin
  DeleteCriticalSection(FCriticalSection);
  inherited Destroy;
end;

procedure TRTLCriticalSectionThread.Execute;
var
  Counter: Integer;
begin
  Counter := CountdownFrom;
  repeat
    FCriticalSection.Enter;
    try
      Dec(Counter);
    finally
      FCriticalSection.Leave;
    end;
  until (Counter <= 0);
end;

constructor TRTLCriticalSectionThreadDynAlloc.Create;
begin
  inherited Create;
  New(FCriticalSection);
  InitializeCriticalSection(FCriticalSection^);
end;

destructor TRTLCriticalSectionThreadDynAlloc.Destroy;
begin
  DeleteCriticalSection(FCriticalSection^);
  Dispose(FCriticalSection);
  inherited Destroy;
end;

procedure TRTLCriticalSectionThreadDynAlloc.Execute;
var
  Counter: Integer;
begin
  Counter := CountdownFrom;
  repeat
    FCriticalSection.Enter;
    try
      Dec(Counter);
    finally
      FCriticalSection.Leave;
    end;
  until (Counter <= 0);
end;

var
  I, J: Integer;
begin
  for I := 1 to 3 do
  begin
    Writeln('*** ROUND ', I, ' ***');
    for J := 1 to MaxThreads do
    begin
      RunTest('TMonitor                               ',
        J, TMonitorThread);
      RunTest('TCriticalSection                       ',
        J, TCriticalSectionThread);
      RunTest('TCriticalSection (avoid virtual calls) ',
        J, TCriticalSectionThreadNoVirt);
      RunTest('TRTLCriticalSection (New/Dispose)      ',
        J, TRTLCriticalSectionThreadDynAlloc);
      RunTest('TRTLCriticalSection                    ',
        J, TRTLCriticalSectionThread);
      WriteLn;
    end;
  end;
  Write('Press ENTER to exit...');
  ReadLn;
end.

As said, for two or more threads, I now consistently get TMonitor to both outperform TCriticalSection and not exhibit the weirdness it did in my original test – instead, said weirdness is transposed to TCriticalSection. However, as before, using TRTLCriticalSection directly performs best by far.

Since everyone likes definite conclusions, I guess I can only conclude by saying this: you should avoid virtual method calls at the very least, and preferably classes too. Indeed, even dynamic memory allocations should be verbotten – stick to locals and opaque (or at least semi-opaque) records whose data are allocated for you by the Windows API. Sounds about right, eh?

TMonitor redux

Rather embarrassingly, it appears the wait/pulse functionality of TMonitor is hopelessly broken. That’ll teach me for trying to demo something new, eh? In my (partial) defence, I had found serious issues before, but came to believe they must have arisen from how I was originally trying to use TMonitor. Obviously not though!

Frankly, if the wait/pulse functionality can’t be relied upon, TMonitor is pretty pointless in my view given its only other function replicates what TRTLCriticalSection/TCriticalSection already provided for. Even worse though, Eric Grange has now posted claiming TMonitor doesn’t work properly even as a TCriticalSection alternative – while exceptions don’t get raised, ‘you can quickly run into situations where everything gets serialized, even when there is no need to’. In the comments he goes further, claiming that with the test code presented, ‘A single thread runs in 70 ms on the quad-core here, two threads run in 140 ms, 3 threads in 210 ms, etc.’

The statement in the comments especially I found surprising, and given the test code used looked just a little too simple to me, I thought I’d do some tests myself. Obviously, if I had the brains for it I could write a sampling profiler or something, but I dashed off the following test program instead. It does pretty much the same thing as Eric’s code — i.e., it tests for when threads are uncontended — but in just a slightly fuller form:

program TMonitorVsTCriticalSection;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  Classes,
  Diagnostics,
  SyncObjs;

type
  TTestThread = class(TThread)
  strict private
    FThreadCounter: TCountdownEvent;
  public
    constructor Create(AThreadCounter: TCountdownEvent); virtual;
    destructor Destroy; override;
  end;

  TTestThreadClass = class of TTestThread;

constructor TTestThread.Create(AThreadCounter: TCountdownEvent);
begin
  inherited Create(True);
  FThreadCounter := AThreadCounter;
  FreeOnTerminate := True;
end;

destructor TTestThread.Destroy;
begin
  FThreadCounter.Signal;
  inherited Destroy;
end;

procedure RunTest(const TestName: string; ThreadCount: Integer;
  ThreadClass: TTestThreadClass);
var
  ThreadCounter: TCountdownEvent;
  I: Integer;
  Stopwatch: TStopwatch;
  Threads: array of TThread;
begin
  SetLength(Threads, ThreadCount);
  ThreadCounter := TCountdownEvent.Create(ThreadCount);
  try
    for I := 0 to ThreadCount - 1 do
      Threads[I] := ThreadClass.Create(ThreadCounter);
    Stopwatch := TStopwatch.StartNew;
    for I := 0 to ThreadCount - 1 do
      Threads[I].Start;
    ThreadCounter.WaitFor;
    Stopwatch.Stop;
    Writeln(TestName, ': ', ThreadCount, ' thread(s) took ',
      Stopwatch.ElapsedMilliseconds, 'ms');
  finally
    ThreadCounter.Free;
  end;
end;

const
  CountdownFrom = $FFFFFF; //increase if necessary...
  MaxThreads = 10;

type
  TCriticalSectionThread = class(TTestThread)
  strict private
    FCriticalSection: TCriticalSection;
  protected
    procedure Execute; override;
  public
    constructor Create(AThreadCounter: TCountdownEvent); override;
    destructor Destroy; override;
  end;

  TMonitorThread = class(TTestThread)
  protected
    procedure Execute; override;
  end;

constructor TCriticalSectionThread.Create(AThreadCounter: TCountdownEvent);
begin
  inherited Create(AThreadCounter);
  FCriticalSection := TCriticalSection.Create;
end;

destructor TCriticalSectionThread.Destroy;
begin
  FCriticalSection.Free;
  inherited Destroy;
end;

procedure TCriticalSectionThread.Execute;
var
  Counter: Integer;
begin
  Counter := CountdownFrom;
  repeat
    FCriticalSection.Enter;
    try
      Dec(Counter);
    finally
      FCriticalSection.Leave;
    end;
  until (Counter end;

procedure TMonitorThread.Execute;
var
  Counter: Integer;
begin
  Counter := CountdownFrom;
  repeat
    TMonitor.Enter(Self);
    try
      Dec(Counter);
    finally
      TMonitor.Exit(Self);
    end;
  until (Counter end;

var
  I, J: Integer;
begin
  for I := 1 to 3 do
  begin
    Writeln('*** ROUND ', I, ' ***');
    for J := 1 to MaxThreads do
    begin
      RunTest('TCriticalSection', J, TCriticalSectionThread);
      RunTest('TMonitor', J, TMonitorThread);
    end;
    WriteLn;
  end;
  Write('Press ENTER to exit...');
  ReadLn;
end.

When run on my quad core laptop (Win7 64 bit, Delphi XE, stock memory manager, default debug build), I get results like the following:

*** ROUND 1 ***
TCriticalSection: 1 thread(s) took 1011ms
TMonitor: 1 thread(s) took 1957ms
TCriticalSection: 2 thread(s) took 2674ms
TMonitor: 2 thread(s) took 8667ms
TCriticalSection: 3 thread(s) took 5562ms
TMonitor: 3 thread(s) took 8996ms
TCriticalSection: 4 thread(s) took 5829ms
TMonitor: 4 thread(s) took 9154ms
TCriticalSection: 5 thread(s) took 4953ms
TMonitor: 5 thread(s) took 8936ms
TCriticalSection: 6 thread(s) took 3652ms
TMonitor: 6 thread(s) took 10578ms
TCriticalSection: 7 thread(s) took 3004ms
TMonitor: 7 thread(s) took 6982ms
TCriticalSection: 8 thread(s) took 8471ms
TMonitor: 8 thread(s) took 6903ms
TCriticalSection: 9 thread(s) took 5844ms
TMonitor: 9 thread(s) took 6310ms
TCriticalSection: 10 thread(s) took 3496ms
TMonitor: 10 thread(s) took 7285ms

My (initial) conclusion? There probably is some issue here, but it isn’t the case that going from (say) 2 threads to 4 doubles the problem – you take the hit at thread 2 or 3, and it stays stable after that. Moreover, TCriticalSection appears to suffer from the *same* issue. However, TCriticalSection is fundamentally faster than TMonitor, given the situation in question (i.e., where locks are being acquired on different objects). [Or is it? See the updates below…]

Note that for slight variant, you might exchange

TMonitor.Enter;

for

if not TMonitor.TryEnter then
  raise EProgrammerNotFound.Create('Where on Earth is he?');

While the exception not being raised doesn’t prove there’s no thread contention where there shouldn’t be, it does make for a verification of the explicit contract made by TMonitor’s interface.

Update 1 – using spin counts

Andreas Hausladen in the comments suggests initialising the spin count of the critical section object to something other than its default value of 0, and indeed, that finally gets the expected behaviour (at least for me), i.e. the cost of the extra threads minus the startup time coming to effectively nil [see update 3 below though!]. Here’s the TTestThread descendant to ‘show’ this — you’ll need to add Windows to the uses clause too:

  TRTLCriticalSectionThread = class(TTestThread)
  strict private
    FCriticalSection: TRTLCriticalSection;
  protected
    procedure Execute; override;
  public
    constructor Create(AThreadCounter: TCountdownEvent); override;
    destructor Destroy; override;
  end;

constructor TRTLCriticalSectionThread.Create(AThreadCounter: TCountdownEvent);
begin
  inherited Create(AThreadCounter);
  InitializeCriticalSectionAndSpinCount(FCriticalSection, 4000);
end;

destructor TRTLCriticalSectionThread.Destroy;
begin
  DeleteCriticalSection(FCriticalSection);
  inherited Destroy;
end;

procedure TRTLCriticalSectionThread.Execute;
var
  Counter: Integer;
begin
  Counter := CountdownFrom;
  repeat
    FCriticalSection.Enter;
    try
      Dec(Counter);
    finally
      FCriticalSection.Leave;
    end;
  until (Counter end;

You can initialise the spin count of the TMonitor too by calling TMonitor.SetSpinCount before the first call to Enter. However, that doesn’t make a difference for me. On the other hand, I wouldn’t expect it to – after all, each thread in the example has its own monitor or critical section object, so where does the possibility for needing to spin or block arise in the first place? [It doesn’t – see below.]

Update 2 – comparing with C#/.NET 4

Here’s a quick C# translation (insofar as I can write C# ‘quickly’…) – as I’m using the CountdownEvent class (like I used TCountdownEvent in the Delphi version), it requires the 4.0 Framework:

using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Threading;

namespace ConsoleApplication1
{
    class Program
    {
        const int CountdownFrom = 0xFFFFFF;
        const int MaxThreads = 10;

        static void RunTest(int ThreadCount)
        {
            CountdownEvent ThreadCounter = new CountdownEvent(ThreadCount);
            List<Thread> Threads = new List<Thread>(ThreadCount);
            for (int I = 0; I < ThreadCount; I++)
                Threads.Add(new Thread( delegate() {
                    try
                    {
                        int Counter = CountdownFrom;
                        object Key = new object();
                        do
                        {
                            lock (Key)
                            {
                                Counter--;
                            }
                        }
                        while (Counter > 0);
                    }
                    finally
                    {
                        ThreadCounter.Signal();
                    }
                }));
            Stopwatch Stopwatch = Stopwatch.StartNew();
            for (int I = 0; I < ThreadCount; I++)
            {
                Threads[I].Start();
            }
            ThreadCounter.Wait();
            Stopwatch.Stop();
            Console.WriteLine("{0} thread(s) took {1}ms", ThreadCount,
                Stopwatch.ElapsedMilliseconds);
        }
        static void Main(string[] args)
        {
            for (int I = 1; I <= 3; I++)
            {
                Console.WriteLine("*** ROUND {0} ***", I);
                for (int J = 1; J <= MaxThreads; J++)
                {
                    RunTest(J);
                }
                Console.WriteLine();
            }
            Console.Write("Press ENTER to exit...");
            Console.ReadLine();
        }
    }
}

I find this beating even the TRTLCriticalSectionThread Delphi class so long as the number of threads doesn’t exceed the number of cores. Once it does, the times progressively increase, but still nowhere near the TMonitor ones. Go figure…

Update 3 – problem found…

Eric Grange, and equally importantly, his blog commentators have been back on the case and found the issue: TMonitor allocates a small block of memory internally, and given my test code initialises a series of monitors in quick succession, all have their internal memory block allocated in the same cache line. Use a different (read: typically less good) memory manager from the stock FastMM, and the problem can dissipate; e.g., it halves for me when using a custom memory manager that wraps the WinAPI’s HeapXXX functions. To make it totally disappear, the TMonitorThread class previously presented can be modified as thus (the Execute method stays the same):

  TMonitorThread = class(TTestThread)
  protected
    FDummyIntfs: array[0..32] of IInterface;
    procedure Execute; override;
  public
    constructor Create(AThreadCounter: TCountdownEvent); override;
  end;

constructor TMonitorThread.Create(AThreadCounter: TCountdownEvent);
var
  I: Integer;
begin
  inherited;
  //force our monitor to be initialised
  TMonitor.Enter(Self);
  TMonitor.Exit(Self);
  //cause some further dynamic memory allocations
  for I := Low(FDummyIntfs) to High(FDummyIntfs) do
    FDummyIntfs[I] := TInterfaceList.Create;
end;

On my laptop, TMonitor now easily beats TCriticalSection for spead, though still not TRTLCriticalSection. I’ve also discovered that enabling spinning on the latter was a red herring – just avoiding the class wrapper (even though said wrapper is extremely simple!) was the key thing. In fact, if TRTLCriticalSectionThread allocates its TRTLCriticalSection record dynamically using New, its advantage over TCriticalSectionThread melts away, and would be completely gone were TCriticalSection to map its Acquire and Release methods onto Enter and Leave rather than vice versa. I almost feel a follow up post coming on…

Writing a simple threaded queue class using TMonitor (2)

In my previous post, I introduced TMonitor. In this post, I’ll actually put it work by writing a simple threaded queue class with it.

‘Simple’ could mean many things of course, but in this context it will denote two things in particular: firstly, my class will simply wrap the standard generic TQueue; and secondly, there won’t be any maximum item count implemented. The latter means only the the dequeuers’ (i.e., the item takers) may become waiters, i.e., be involved in a Wait loop. For sure, entry into the monitor on the enqueuing (item adding) side may be blocked, but once the lock is taken, it won’t be released until the end of the Enqueue method.

Nonetheless, speaking of a ‘thread-safe queue’ might imply something even simpler. In particular, it might be taken to mean using a TCriticalSection instance to guard the underlying TQueue’s Enqueue and Dequeue methods, similar to what the old TThreadList class does vis-a-vis TList. Such an approach would be a bit too simple however — if one thread (the producer thread) ‘pushes’ or adds items and another (the consumer thread) ‘pops’ or takes them out, there will need to be a notification system on top to tell the consumer when the queue has items ready to be popped. Happily though, a notification system is precisely what the wait/pulse functionality of a monitor provides.

So, without further ado, here’s the interface of the TSimpleThreadedQueue class:

unit SimpleThreadedQueue;

interface

uses
  SysUtils, Classes, Generics.Collections, SyncObjs;

type
  TSimpleThreadedQueue = class
  strict private
    FQueue: TQueue;
    FClosed: Boolean;
  public
    constructor Create;
    destructor Destroy; override;
    function Enqueue(const Item: T; Timeout: LongWord = INFINITE): TWaitResult;
    function Dequeue(var Item: T; Timeout: LongWord = INFINITE): TWaitResult;
    procedure Close;
    property Closed: Boolean read FClosed;
  end;

Here, the Enqueue and Dequeue methods are named after the methods of TQueue they’ll wrap. Their signatures are a bit different though given the ability to specify a timeout. For the result type, I’m using the standard TWaitResult enumeration; specifically, wrSignaled will denote success, wrTimeout the function timing out, and wrAbandoned for when the queue has been closed.

With regards to the last of these, the Close method will both close the queue for potential enqueuers and drain it for potential dequeuers (the Closed property will then report whether Close has been called). While we will call Close in the destructor, Delphi’s manual memory management means it will need to be called explicitly if there’s a chance any consumer thread is waiting on Dequeue. This is because we need an object to lock against, yet this same object will be freed in the destructor. While an extra layer of blocking code could be added to prevent this object (which will be the internal TQueue instance) being freed too early, I don’t think the added complication is worth it.

So, here’s what the implementation of Create and Destroy look like — nothing you wouldn’t expect:

constructor TSimpleThreadedQueue.Create;
begin
  inherited Create;
  FQueue := TQueue.Create;
end;

destructor TSimpleThreadedQueue.Destroy;
begin
  Close;
  FQueue.Free;
  inherited Destroy;
end;

The Close method is pretty trivial too:

procedure TSimpleThreadedQueue.Close;
begin
  if FClosed then Exit;
  FClosed := True;
  TMonitor.Enter(FQueue);
  try
    FQueue.Clear;
    TMonitor.PulseAll(FQueue); //notify any waiters Closed is now True
  finally
    TMonitor.Exit(FQueue);
  end;
end;

Since we’re not implementing a maximum queue size, Enqueue is nice and easy as well: first, the lock is requested, and if we don’t time out, the item is then ‘pushed’ before we ‘pulse’ the change of state to the head of the monitor’s waiter list:

function TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult;
begin
  if Closed then Exit(wrAbandoned);
  if not TMonitor.Enter(FQueue, Timeout) then Exit(wrTimeout);
  try
    if Closed then Exit(wrAbandoned);
    FQueue.Enqueue(Item);
    TMonitor.Pulse(FQueue);
    Result := wrSignaled;
  finally
    TMonitor.Exit(FQueue);
  end;
end;

Lastly, we have Dequeue’s implementation. The only complication here is the fact we have a timeout value to implement across two separate calls, meaning we should decrement the timeout amount each time. For this, I’ve used TStopwatch – the GetTickCount API function might be used instead of course, but doing it this way keeps us 100% in the Delphi RTL:

function TSimpleThreadedQueue.Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
var
  Stopwatch: TStopwatch;
  TimeoutLeft: Int64;
begin
  if Closed then Exit(wrAbandoned);
  Stopwatch := TStopwatch.StartNew;
  if not TMonitor.Enter(FQueue, Timeout) then Exit(wrTimeout);
  try
    while not Closed and (FQueue.Count = 0) do
    begin
      TimeoutLeft := Timeout - Stopwatch.ElapsedMilliseconds;
      if TimeoutLeft < 0 then TimeoutLeft := 0;
      if not TMonitor.Wait(FQueue, LongWord(TimeoutLeft)) then Exit(wrTimeout);
    end;
    if Closed then Exit(wrAbandoned);
    Item := FQueue.Dequeue;
    Result := wrSignaled;
  finally
    TMonitor.Exit(FQueue);
  end;
end;

To compile, Diagnostics needs to be added to the implementation section’s uses clause:

uses Diagnostics;

This done, TSimpleThreadedQueue is complete.

Next up is to create a simple demo app. For this, we’ll design a basic form with a button, the pressing of which will generate some work for a worker thread to get on with. This work will be put in an ‘in’ queue (there will be more than one worker thread available), and when completed, it will be put in an ‘out’ queue for the main thread to pick up. For simplicity, each item of ‘work’ will be a string value; to simulate processing, a worker thread will merely sleep for a bit before pushing the value just popped into an ‘out’ queue. While plainly in the realm of demo code, this setup will still involve both a single producer, multiple consumer situation (the ‘in’ queue’) and a multiple producer, single consumer one (the ‘out’ queue).

So, create a new VCL forms application. In the main form’s unit, add both SyncObjs and SimpleThreadedQueue (i.e., the unit just created) to the uses clause, and the following fields to the form class itself:

    FCurWorkItem: Integer;
    FInQueue, FOutQueue: TSimpleThreadedQueue;
    FThreadsActive: TCountdownEvent;

In the form designer, set up the form to look like this:

screenshot

The buttons on the right hand side are on a right-aligned TPanel, the TMemo to the left is client aligned (AlignWithMargins is True too), and in the middle is a TTimer — set its Interval property to something like 250. Name the components memOutput, panBtns, btnEnqueueWorkItem, btnFinish and tmrWorkDone, before handling the form’s OnCreate event like this:

procedure TfrmSTQTest.FormCreate(Sender: TObject);
const
  MaxDelay = 2000;       //tweak as you please
  WorkerThreadCount = 4; //ditto
var
  I: Integer;
begin
  FThreadsActive := TCountdownEvent.Create(1); //has to be at least 1
  FInQueue := TSimpleThreadedQueue<string>.Create;
  FOutQueue := TSimpleThreadedQueue<string>.Create;
  for I := 1 to WorkerThreadCount do
    TThread.CreateAnonymousThread(
      procedure
      var
        Delay: Integer;
        S: string;
        ThreadID: TThreadID;
      begin
        FThreadsActive.AddCount;
        try
          ThreadID := TThread.CurrentThread.ThreadID;
          FOutQueue.Enqueue(Format('Thread %d is starting', [ThreadID]));
          while FInQueue.Dequeue(S) = wrSignaled do
          begin
            Delay := Random(MaxDelay);
            FOutQueue.Enqueue(Format('    Dequeued %s (thread %d); will now sleep for ' +
              '%d msecs to simulate processing', [S, ThreadID, Delay]));
            Sleep(Delay);
            FOutQueue.Enqueue(Format('      Finished processing %s (thread %d)',
              [S, ThreadID]));
          end;
          FOutQueue.Enqueue(Format('        Thread %d is terminating', [ThreadID]));
        finally
          FThreadsActive.Signal;
        end;
      end).Start;
end;

Here, we initialise and start running a number of worker threads, along with a TCountdownEvent to keep track of the number of threads that are currently running. (As a TCountdownEvent needs to start off with a positive counter value, we’ll say the initial 1 represents the main thread.) Next, handle the form’s OnDestroy event as so:

procedure TfrmSTQTest.FormDestroy(Sender: TObject);
begin
  btnEnqueueWorkItem.Enabled := False;
  FInQueue.Close;
  FThreadsActive.Signal;  //release our 1
  FThreadsActive.WaitFor; //wait for the worker threads to terminate
  FThreadsActive.Free;
  FInQueue.Free;
  FOutQueue.Free;
end;

Note the explicit call to the ‘in’ queue’s Close method. Doing this will cause the Dequeue call in the waiting worker threads to break off, returning wrAbandoned. After signalling the countdown event itself, the main thread then waits on the event being completely signalled (for the TCountdownEvent’s internal counter falling to zero in other words), which will happen once every worker thread has finished. Finally, the queue objects can actually be destroyed.

To give the worker threads something to do, handle the first button’s OnClick event like this:

procedure TfrmSTQTest.btnEnqueueWorkItemClick(Sender: TObject);
var
  S: string;
begin
  Inc(FCurWorkItem);
  S := Format('work item %d', [FCurWorkItem]);
  memOutput.Lines.Add('  Queuing ' + S + '...');
  FInQueue.Enqueue(S);
end;

The ‘finish’ button is just to be able to see the worker threads shutting down cleanly (not that they shouldn’t!) – you don’t actually need to click it before closing the form:

procedure TfrmSTQTest.btnFinishClick(Sender: TObject);
begin
  btnEnqueueWorkItem.Enabled := False;
  btnFinish.Enabled := False;
  FInQueue.Close;
end;

Finally, the timer is used to periodically poll the ‘out’ queue for data to display. Note that 0 is passed as the timeout value, meaning we will neither block on the lock already being held nor wait for new data to arrive:

procedure TfrmSTQTest.tmrWorkDoneTimer(Sender: TObject);
var
  S: string;
begin
  while FOutQueue.Dequeue(S, 0) = wrSignaled do //drain the out queue but don't wait on it
    memOutput.Lines.Add(S);
end;

That’s about it really. If you compile and run the resulting application, the most instructive approach is probably the most jejune: with the enqueue button focussed, just press and hold down ENTER for a bit. That should result in several work items being enqueued, interspersed with a few being completed as the four worker threads get to work. Obviously, tweaking the values used in the OnCreate handler will affect this – have the threads sleep for at most 200ms rather than 2000ms, for example, and the ‘in’ queue will be processed almost as quickly as you can fill it up.

[Update: merely a few hours after I posted, a question on StackOverflow pops up suggesting I should have used PulseAll rather than Pulse on the grounds PulseAll is surely always needed ‘except in the trivial cases when there are only two threads in total’. The number of threads is a red herring however – it’s the nature of the state change being signalled that matters. If the monitor’s signalling functionality were to be used for more than one thing (e.g., to establish a maximum item count too), PulseAll would be needed. As it is, Wait is only being called in one place, meaning any waiter is a legitimate taker of any Pulse (so to speak).]

Writing a simple threaded queue class using TMonitor (1)

While it’s been in the product for a few versions now, I’ve come across very little example code for TMonitor, despite its implementation having been heavily blogged about by its author (Allen Bauer) while he actually was writing it. In this post and its sequel, I’ll be looking to help rectify that, if only in a very small way.

Naturally, your first question may well be to ask what on earth is TMonitor. One answer is that it is a Delphi implementation of the Monitor class in .NET, having a very similar interface to the latter and essentially the same functionality. Of course, .NET didn’t invent the concept of a ‘monitor’, and you can read more about it on Wikipedia. In short, a monitor is the combination of a critical section and a ‘condition variable’. If you’re like me, the second part of this sounds very jargon-y, with the first leading to the thought ‘what’s the point?’ – after all, Delphi has a TCriticalSection class, or if you prefer, direct access to the Windows critical section API. In contrast, in .NET, Monitor is the critical section class, which – naturally enough – makes it quite essential over there (the fact the critical section aspect is also given some syntactical sugar in C# with the lock keyword can’t not help either). So, what is the point back in Delphi-land?

In essence, what TMonitor adds over a regular critical section object is a signalling capability — a very primitive signalling capability admitedly, but a signalling capability nonetheless. To give a flavour, here’s a more or less direct Delphi translation of a C# example of Joe Albahari’s (source):

program SimpleWaitPulse;

{$APPTYPE CONSOLE}

uses
  Classes;

var
  Go: Boolean;
  Thread: TThread;
begin
  Thread := TThread.CreateAnonymousThread(
    procedure
    begin
      TMonitor.Enter(Thread);
      try
        while not Go do
          TMonitor.Wait(Thread, INFINITE);
        WriteLn('Woken!!!');
      finally
        TMonitor.Exit(Thread);
      end;
    end);
  Thread.Start;
  ReadLn;
  TMonitor.Enter(Thread);
  try
    Go := True;
    TMonitor.Pulse(Thread);
  finally
    TMonitor.Exit(Thread);
  end;
  ReadLn;
end.

This starts a new thread that, on taking the lock, immediately releases it to wait on the Go variable becoming True. The latter happens when the user presses ENTER, upon which the main thread takes the lock, sets Go to True, ‘pulses’ the change in state, before finally releasing the lock.

Even though this is a very simple example – so simple in fact it fails to show up WriteLn’s lack of thread safety (ahem)! – there’s a few things to note from it:

  • As always with locking primitives, use of try/finally is very much necessary.
  • The sort of state waited on is entirely up to the discretion of the code that uses TMonitor.
  • The Wait and Pulse calls are made within an Enter/Exit pair. The call to Wait therefore releases the lock, allowing another thread to get in and call Pulse. The Pulse call itself then wakes the thread at the top of the Wait queue. In the present example there’s only one waiter, but if there were more than one and you wished to wake all of them, you should call PulseAll rather than Pulse several times.
  • Regarding the Wait call again, observe how it is made in the context of a while loop. In this case that isn’t strictly necessary, since there’s only going to be one waiter waiting on a single boolean condition. Matters would be different, though, whenever there is more than one consumer — between being released and acting on the pulsed state, that state may have been changed by another newly-released thread.

With this knowledge, we can use TMonitor to implement a simple threaded queue class, wrapping the standard TQueue. As for the actual implementation, you’ll have to wait to my next post though.