TSimpleThreadedQueue variants

1. Variant of the TMonitor version that at least works (uses TCriticalSection and TSemaphore instead of TMonitor). Functionality lost here is the timeout in the Enqueue method:

unit SimpleThreadedQueueSem;

interface

uses
  SysUtils, Classes, Generics.Collections, SyncObjs;

type
  TSimpleThreadedQueue<T> = class
  strict private
    FQueue: TQueue<T>;
    FCriticalSection: TCriticalSection;
    FSemaphore: TSemaphore;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Item: T);
    function Dequeue(var Item: T; Timeout: LongWord = INFINITE): TWaitResult;
  end;

implementation

constructor TSimpleThreadedQueue<T>.Create;
begin
  inherited Create;
  FCriticalSection := TCriticalSection.Create;
  FQueue := TQueue<T>.Create;
  FSemaphore := TSemaphore.Create(nil, 0, MaxInt, '');
end;

destructor TSimpleThreadedQueue<T>.Destroy;
begin
  FreeAndNil(FQueue);
  FCriticalSection.Free;
  FSemaphore.Free;
  inherited Destroy;
end;

procedure TSimpleThreadedQueue<T>.Enqueue(const Item: T);
begin
  FCriticalSection.Enter;
  try
    FQueue.Enqueue(Item);
  finally
    FCriticalSection.Leave;
  end;
  FSemaphore.Release;
end;

function TSimpleThreadedQueue<T>.Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
begin
  Result := FSemaphore.WaitFor(Timeout);
  if Result <> wrSignaled then Exit;
  FCriticalSection.Enter;
  try
    Item := FQueue.Dequeue;
  finally
    FCriticalSection.Leave;
  end;
end;

end.

2. The above with a simple linked list instead of TQueue, and spinning enabled on the critical section (both improve performance in the Gabr test). Since TCriticalSection.Create doesn’t take an overload with a spin count, uses TRTLCriticalSection directly, forgoing the previous variants’ hypothetical cross-platformness in the process:

unit SimpleThreadedQueueLL;

interface

uses
  Windows, SysUtils, Classes, SyncObjs;

type
  TSimpleThreadedQueue<T> = class
  strict private type
    TNode<T> = record
      Next: ^TNode<T>;
      Data: T;
    end;
  strict private
    FCriticalSection: TRTLCriticalSection;
    FHeadNode, FTailNode: ^TNode<T>;
    FSemaphore: TSemaphore;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Item: T);
    function Dequeue(var Item: T; Timeout: LongWord = INFINITE): TWaitResult;
  end;

implementation

constructor TSimpleThreadedQueue<T>.Create;
begin
  inherited Create;
  InitializeCriticalSectionAndSpinCount(FCriticalSection, 4000);
  FSemaphore := TSemaphore.Create(nil, 0, MaxInt, '');
end;

destructor TSimpleThreadedQueue<T>.Destroy;
var
  Node: Pointer;
begin
  FCriticalSection.Enter;
  try
    while FHeadNode <> nil do
    begin
      Node := FHeadNode.Next;
      Finalize(FHeadNode.Data); //can't just call Dispose due to a compiler bug (QC 79818)
      FreeMem(FHeadNode);
      FHeadNode := Node;
    end;
  finally
    FCriticalSection.Leave;
  end;
  FCriticalSection.Free;
  FSemaphore.Free;
  inherited Destroy;
end;

procedure TSimpleThreadedQueue<T>.Enqueue(const Item: T);
var
  NewNode: ^TNode<T>;
begin
  NewNode := AllocMem(SizeOf(TNode<T>)); //QC 79818 again -> don't call New
  NewNode.Data := Item;
  FCriticalSection.Enter;
  try
    if FHeadNode = nil then
      FHeadNode := Pointer(NewNode)
    else
      FTailNode.Next := Pointer(NewNode);
    FTailNode := Pointer(NewNode);
  finally
    FCriticalSection.Leave;
  end;
  FSemaphore.Release;
end;

function TSimpleThreadedQueue<T>.Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
var
  OldNode: ^TNode<T>;
begin
  Result := FSemaphore.WaitFor(Timeout);
  if Result <> wrSignaled then Exit;
  FCriticalSection.Enter;
  try
    OldNode := Pointer(FHeadNode);
    Item := OldNode.Data;
    FHeadNode := Pointer(OldNode.Next);
  finally
    FCriticalSection.Leave;
  end;
  Finalize(OldNode.Data);
  FreeMem(OldNode);
end;

end.

3. Semaphore removed. This achieves optimal performance (for TSimpleThreadedQueue!) in the Gabr test, but at the significant loss of forgoing the Dequeue method’s waiting functionality:

unit SimpleThreadedQueueNoWait;

interface

uses
  Windows, SysUtils, Classes, SyncObjs;

type
  TSimpleThreadedQueue<T> = class
  strict private type
    TNode<T> = record
      Next: ^TNode<T>;
      Data: T;
    end;
  strict private
    FHeadNode, FTailNode: ^TNode<T>;
    FCriticalSection: TRTLCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Item: T);
    function Dequeue(var Item: T): Boolean;
  end;

implementation

constructor TSimpleThreadedQueue<T>.Create;
begin
  inherited Create;
  InitializeCriticalSectionAndSpinCount(FCriticalSection, 4000);
end;

destructor TSimpleThreadedQueue<T>.Destroy;
var
  Node: Pointer;
begin
  FCriticalSection.Enter;
  try
    while FHeadNode <> nil do
    begin
      Node := FHeadNode.Next;
      Finalize(FHeadNode.Data); //can't just call Dispose due to a compiler bug (QC 79818)
      FreeMem(FHeadNode);
      FHeadNode := Node;
    end;
  finally
    FCriticalSection.Leave;
  end;
  FCriticalSection.Free;
  inherited Destroy;
end;

procedure TSimpleThreadedQueue<T>.Enqueue(const Item: T);
var
  NewNode: ^TNode<T>;
begin
  NewNode := AllocMem(SizeOf(TNode<T>)); //QC 79818 again -> don't call New
  NewNode.Data := Item;
  FCriticalSection.Enter;
  try
    if FHeadNode = nil then
      FHeadNode := Pointer(NewNode)
    else
      FTailNode.Next := Pointer(NewNode);
    FTailNode := Pointer(NewNode);
  finally
    FCriticalSection.Leave;
  end;
end;

function TSimpleThreadedQueue<T>.Dequeue(var Item: T): Boolean;
var
  OldNode: ^TNode<T>;
begin
  FCriticalSection.Enter;
  try
    Result := (FHeadNode <> nil);
    if not Result then Exit;
    OldNode := Pointer(FHeadNode);
    Item := OldNode.Data;
    FHeadNode := Pointer(OldNode.Next);
  finally
    FCriticalSection.Leave;
  end;
  Finalize(OldNode.Data);
  FreeMem(OldNode);
end;

end.

One thought on “TSimpleThreadedQueue variants

  1. Pingback: Delphi TCriticalSection: Einmal global oder immer lokal erstellen? - Delphi-PRAXiS

Leave a comment