@@ -16,18 +16,20 @@ public class AsyncQueue<T>
1616 private readonly Queue < TaskCompletionSource < T > > consumerQueue = new Queue < TaskCompletionSource < T > > ( ) ;
1717 private SemaphoreSlim consumerQueueLock = new SemaphoreSlim ( 1 ) ;
1818
19+ public int Count => queue . Count ;
20+
1921 /// <summary>
2022 /// Supports multi-threaded producers.
2123 /// Time complexity: O(1).
2224 /// </summary>
23- public async Task EnqueueAsync ( T value , CancellationToken taskCancellationToken = default ( CancellationToken ) )
25+ public async Task EnqueueAsync ( T value , int millisecondsTimeout = int . MaxValue , CancellationToken taskCancellationToken = default ( CancellationToken ) )
2426 {
25- await consumerQueueLock . WaitAsync ( taskCancellationToken ) ;
27+ await consumerQueueLock . WaitAsync ( millisecondsTimeout , taskCancellationToken ) ;
2628
2729 if ( consumerQueue . Count > 0 )
2830 {
2931 var consumer = consumerQueue . Dequeue ( ) ;
30- consumer . SetResult ( value ) ;
32+ consumer . TrySetResult ( value ) ;
3133 }
3234 else
3335 {
@@ -41,22 +43,29 @@ public class AsyncQueue<T>
4143 /// Supports multi-threaded consumers.
4244 /// Time complexity: O(1).
4345 /// </summary>
44- public async Task < T > DequeueAsync ( CancellationToken taskCancellationToken = default ( CancellationToken ) )
46+ public async Task < T > DequeueAsync ( int millisecondsTimeout = int . MaxValue , CancellationToken taskCancellationToken = default ( CancellationToken ) )
4547 {
46- await consumerQueueLock . WaitAsync ( taskCancellationToken ) ;
48+ await consumerQueueLock . WaitAsync ( millisecondsTimeout , taskCancellationToken ) ;
49+
50+ TaskCompletionSource < T > consumer ;
51+
52+ try
53+ {
54+ if ( queue . Count > 0 )
55+ {
56+ var result = queue . Dequeue ( ) ;
57+ consumerQueueLock . Release ( ) ;
58+ return result ;
59+ }
4760
48- if ( queue . Count > 0 )
61+ consumer = new TaskCompletionSource < T > ( ) ;
62+ taskCancellationToken . Register ( ( ) => consumer . TrySetCanceled ( ) ) ;
63+ consumerQueue . Enqueue ( consumer ) ;
64+ }
65+ finally
4966 {
50- var result = queue . Dequeue ( ) ;
5167 consumerQueueLock . Release ( ) ;
52- return result ;
5368 }
54-
55- var consumer = new TaskCompletionSource < T > ( ) ;
56- taskCancellationToken . Register ( ( ) => consumer . TrySetCanceled ( ) ) ;
57- consumerQueue . Enqueue ( consumer ) ;
58-
59- consumerQueueLock . Release ( ) ;
6069
6170 return await consumer . Task ;
6271
0 commit comments