Share via


IReliableConcurrentQueue<T>.EnqueueAsync Method

Definition

Stage the enqueue of a value into the queue.

public System.Threading.Tasks.Task EnqueueAsync (Microsoft.ServiceFabric.Data.ITransaction tx, T value, System.Threading.CancellationToken cancellationToken = default, TimeSpan? timeout = default);
abstract member EnqueueAsync : Microsoft.ServiceFabric.Data.ITransaction * 'T * System.Threading.CancellationToken * Nullable<TimeSpan> -> System.Threading.Tasks.Task
Public Function EnqueueAsync (tx As ITransaction, value As T, Optional cancellationToken As CancellationToken = Nothing, Optional timeout As Nullable(Of TimeSpan) = Nothing) As Task

Parameters

tx
ITransaction

Transaction to associate this operation with.

value
T

The value to add to the end of the queue. The value can be null for reference types.

cancellationToken
CancellationToken

The token to monitor for cancellation requests. The default is None.

timeout
Nullable<TimeSpan>

The amount of time to wait for the operation to complete before throwing a TimeoutException. The default is null. If null is passed, a default timeout will be used.

Returns

Task that represents the asynchronous enqueue operation.

Exceptions

The replica is no longer in .

The replica is currently not readable.

The replica saw a transient failure. Retry the operation on a new transaction

The replica saw a non retriable failure other than the types defined above. Cleanup and rethrow the exception

The operation was unable to be completed within the given timeout. The transaction should be aborted and a new transaction should be created to retry.

tx is null.

The operation was canceled via cancellationToken.

The transaction has been internally faulted by the system. Retry the operation on a new transaction

Thrown when a method call is invalid for the object's current state. Example, transaction used is already terminated: committed or aborted by the user. If this exception is thrown, it is highly likely that there is a bug in the service code of the use of transactions.

Examples

This example shows how to use EnqueueAsync(ITransaction, T, CancellationToken, Nullable<TimeSpan>) to enqueue a value with retry.

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var concurrentQueue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<long>>(new Uri("fabric:/concurrentQueue"));

    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        try
        {
            using (var tx = this.StateManager.CreateTransaction())
            {
                await concurrentQueue.EnqueueAsync(tx, 12L, cancellationToken);
                await tx.CommitAsync();

                return;
            }
        }
        catch (TransactionFaultedException e)
        {
            // This indicates that the transaction was internally faulted by the system. One possible cause for this is that the transaction was long running
            // and blocked a checkpoint. Increasing the "ReliableStateManagerReplicatorSettings.CheckpointThresholdInMB" will help reduce the chances of running into this exception
            Console.WriteLine("Transaction was internally faulted, retrying the transaction: " + e);
        }
        catch (FabricNotPrimaryException e)
        {
            // Gracefully exit RunAsync as the new primary should have RunAsync invoked on it and continue work.
            // If instead enqueue was being executed as part of a client request, the client would be signaled to re-resolve.
            Console.WriteLine("Replica is not primary, exiting RunAsync: " + e);
            return;
        }
        catch (FabricNotReadableException e)
        {
            // Retry until the queue is readable or a different exception is thrown.
            Console.WriteLine("Queue is not readable, retrying the transaction: " + e);
        }
        catch (FabricObjectClosedException e)
        {
            // Gracefully exit RunAsync as this is happening due to replica close.
            // If instead enqueue was being executed as part of a client request, the client would be signaled to re-resolve.
            Console.WriteLine("Replica is closing, exiting RunAsync: " + e);
            return;
        }
        catch (TimeoutException e)
        {
            Console.WriteLine("Encountered TimeoutException during EnqueueAsync, retrying the transaction: " + e);
        }
        catch (FabricTransientException e)
        {
            // Retry until the queue is writable or a different exception is thrown.
            Console.WriteLine("Queue is currently not writable, retrying the transaction: " + e);
        }

        // Delay and retry.
        await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
    }
}

Remarks

A TryDequeueAsync(ITransaction, CancellationToken, Nullable<TimeSpan>) operation cannot return any value for which its EnqueueAsync(ITransaction, T, CancellationToken, Nullable<TimeSpan>) has not yet been committed. This includes the transaction in which the value was enqueued; as a consequence, IReliableConcurrentQueue<T> does not support Read-Your-Writes.

Applies to