Fixing TThreadedQueue…? (Or in other words: TMonitor… again!)

[Update: fixes for TMonitor were made in XE2 update 4. Consequently, this post should be considered for historical information only.]

Probably out of masochism or something, I’ve been looking at TMonitor again. For anyone who doesn’t recall the earlier episodes, TMonitor is the ‘new’ (and rather low-level) threading device introduced in D2009. Alas, but it proved buggy as hell, as well as slow. Given the publicity however, it received some love for XE2 RTM. So, all right in the end? Er no, since Darian Miller on StackOverflow quickly tweaked one of the test projects that demonstrated its brokeness in XE and… found it still broken in XE2!

More exactly, the test project in question uses the TThreadedQueue class. Having experimented a bit more with TMonitor and found it seemingly OK, I thought I’d investigate the TThreadedQueue implementation. My initial idea was to test its logic distinct from TMonitor’s by replacing use of TMonitor with calls to equivalent Windows APIs (a monitor combines a critical section and a ‘conditional variable’, and the Windows API acquired a condition variable primitive in Vista). However, on inspecting the code, I found it using the (to me) rather tricky-to-comprehend 3 parameter overload of TMonitor.Wait – and in my usage, I’ve stuck to the simpler, 2 parameter overload. This made me think: if I just rewrote TThreadedQueue similarly, would that fix the problem? I think it has, but I would be grateful if anyone else could verify:

1. Download the problem project from the Windows QC client – browse to report number 101114, and get the DPR from the ‘attachments’ tab.

2. Open the project in the IDE, and add a new unit to it.

3. Copy the code for TThreadedQueue from System.Generics.Collections.pas. As it is almost completely self-contained, you need only System.SyncObjs in the new unit’s uses clause.

4. In the copied class definition, remove FQueueNotEmpty and FQueueNotFull. Everything else in the interface stays the same however.

5. Remove the lines constructing and freeing FQueueNotEmpty and FQueueNotFull in Create and Destroy.

6. Change the implementation of the substantive PopItem variant to this:

function TThreadedQueue<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
begin
  AItem := Default(T);
  TMonitor.Enter(FQueueLock);
  try
    while (FQueueSize = 0) and not FShutDown do
      if not TMonitor.Wait(FQueueLock, FPopTimeout) then Exit(wrTimeout);
    if FShutDown then Exit(wrAbandoned);
    Result := wrSignaled;
    AItem := FQueue[FQueueOffset];
    if FQueueSize = Length(FQueue) then
      TMonitor.PulseAll(FQueueLock);
    Dec(FQueueSize);
    Inc(FQueueOffset);
    Inc(FTotalItemsPopped);
    if FQueueOffset = Length(FQueue) then
      FQueueOffset := 0;
  finally
    AQueueSize := FQueueSize;
    TMonitor.Exit(FQueueLock);
  end;
end;

The main thing here is that we are now using the simpler variant of Wait. As an aside, it’s OK to ‘pulse’ as early as we do, since waiting threads only get notified once we’ve exited the monitor.

7. Change the implementation of the substantive PushItem overload to this:

function TThreadedQueue<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
begin
  TMonitor.Enter(FQueueLock);
  try
    while (Length(FQueue) = FQueueSize) and not FShutDown do
      if not TMonitor.Wait(FQueueLock, FPushTimeout) then Exit(wrTimeout);
    if FShutDown then Exit(wrAbandoned);
    Result := wrSignaled;
    if FQueueSize = 0 then TMonitor.PulseAll(FQueueLock);
    FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
    Inc(FQueueSize);
    Inc(FTotalItemsPushed);
  finally
    AQueueSize := FQueueSize;
    TMonitor.Exit(FQueueLock);
  end;
end;

8. Change DoShutdown to this:

procedure TThreadedQueue<T>.DoShutDown;
begin
  FShutDown := True;
  TMonitor.PulseAll(FQueueLock);
end;

I’ve removed the locking around setting FShutdown since I don’t see what good it does (no one will be trying to set it to False).

9. Rerun the problem project: hopefully, it will now work (meaning, it will count up to 1000 without crashing), notwithstanding the fact we haven’t actually fixed the bug in TMonitor itself…

Advertisements

10 thoughts on “Fixing TThreadedQueue…? (Or in other words: TMonitor… again!)

    • Well it does for me – I’m asking whether any one else could confirm it does for them too (‘I would be grateful if anyone else could verify…’).

  1. There is still a bug in TMonitor that can cause failures in TThreadedQueue. The issue lies in TMonitor.RemoveWaiter. The problem is a race condition where the if FWaitQueue nil then statement is true, but before the lock can be acquired, another thread sneaks past and removes the final waiter (through a Pulse or PulseAll) and FWaitQueue is set to nil. The solution is to perform a test/lock/test sequence. IOW, retest the FWaitQueue field once the lock is acquired.

    Another related bug was discovered in the TMonitor support code plugged in by System.SysUtils.pas. There was a very subtle race condition that would cause the lock cache to get corrupted.

    Both of these have been addressed and will be in the next XE2 update, which should be available within the next week or two.

  2. Just rerun my test with your revised TThreadedQueue. All is good (at last).
    Thank’s for digging into the subject again.
    I will run some speed tests later.

    The whole story got me thinking about what type of unit tests is used at Embarcadero.
    I mean the quality of the testing. If the test code was public, it would be easier to pinpoint the weak spots and the community could contribute to the platform quality.
    Just a thought.

  3. Thanks for looking into this as we all benefit from the gains made. I just ran your changes on my system and my initial tests passed even after letting them run for a few hours. However, being determined to kill the thing, I increased the pressure on the queue even moreso and I can fairly consistently receive an error on a PushItem after a few minutes of processing. (EMonitorLockException: Object lock not owned)

    On my test Windows 7 64-bit 4-processor VM, I assigned 4 threads to be reading from the queue and 40 threads to writing. The queue was created with a max length of 1024 along with a Push timeout of 1 and a Pop timeout of 1. There’s a definite ‘sweet-spot’ needed based on system config in order to draw the error out…too many or too few threads and it never seems to die. (At least I don’t have the patience to wait long enough for it to die.)

    Hopefully the next update fixes this.

    • Interesting. If I can repeat it, I’ll have a go swapping out TMonitor for the API primitives. Have you tried that yourself? It will be using either TRTLCriticalSection or TCriticalSection in combination with TConditionVariableCS. (You can also do the same on the Mac, however there it should be TMutex in combination with TConditionVariableMutex.)

    • Re swapping out TMonitor for the API primitives:

      1. Start from my revised TThreadedQueue.

      2. Change FQueueLock to a TCriticalSection, and add FQueueCondVar: TConditionVariableCS; create the new object in the constructor and free it in the destructor.

      3. Change all TMonitor.Enter and TMonitor.Exit calls to FQueueLock.Enter and FQueueLock.Leave.

      4. Change all TMonitor.PulseAll calls to FQueueCondVar.ReleaseAll ones.

      5. PopItem now becomes this:

      function TThreadedQueue<T>.PopItem(var AQueueSize: Integer; var AItem: T): TWaitResult;
      begin
        AItem := Default(T);
        FQueueLock.Enter;
        try
          while (FQueueSize = 0) and not FShutDown do
          begin
            Result := FQueueCondVar.WaitFor(FQueueLock, FPopTimeout);
            if Result <> wrSignaled then Exit;
          end;
          if FShutDown then Exit(wrAbandoned);
          AItem := FQueue[FQueueOffset];
          if FQueueSize = Length(FQueue) then
            FQueueCondVar.ReleaseAll;
          Dec(FQueueSize);
          Inc(FQueueOffset);
          Inc(FTotalItemsPopped);
          if FQueueOffset = Length(FQueue) then
            FQueueOffset := 0;
        finally
          AQueueSize := FQueueSize;
          FQueueLock.Leave;
        end;
      end;
      

      6. Amend PushItem to this:

      function TThreadedQueue<T>.PushItem(const AItem: T; var AQueueSize: Integer): TWaitResult;
      begin
        FQueueLock.Enter;
        try
          while (Length(FQueue) = FQueueSize) and not FShutDown do
          begin
            Result := FQueueCondVar.WaitFor(FQueueLock, FPushTimeout);
            if Result <> wrSignaled then Exit;
          end;
          if FShutDown then Exit(wrAbandoned);
          if FQueueSize = 0 then FQueueCondVar.ReleaseAll;
          FQueue[(FQueueOffset + FQueueSize) mod Length(FQueue)] := AItem;
          Inc(FQueueSize);
          Inc(FTotalItemsPushed);
        finally
          AQueueSize := FQueueSize;
          FQueueLock.Leave;
        end;
      end;
      

      Using API primitives on OS X is identical but for TMutex and TConditionVariableMutex being used instead of TCriticalSection and TConditionVariableCS.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s