MailboxProcessor.Scan<'Msg,'T> Method (F#)
Scans for a message by looking through messages in arrival order until a provided function returns a Some value. Other messages remain in the queue.
Namespace/Module Path: Microsoft.FSharp.Control
Assembly: FSharp.Core (in FSharp.Core.dll)
// Signature:
member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
// Usage:
mailboxProcessor.Scan (scanner)
mailboxProcessor.Scan (scanner, timeout = timeout)
Parameters
scanner
Type: 'Msg -> Async<'T> optionA function that returns None if the message is to be skipped, or Some if the message is to be processed and removed from the queue.
timeout
Type: intAn optional timeout in milliseconds. Defaults to -1 which corresponds to Infinite.
Exceptions
Exception |
Condition |
---|---|
Thrown when the timeout is exceeded. |
Return Value
An asynchronous computation (Async object) that scanner built off the read message.
Remarks
This method is for use within the body of the agent. For each agent, at most one concurrent reader may be active, so no more than one concurrent call to Receive, TryReceive, Scan or TryScan may be active. The body of the scanner function is locked during its execution, but the lock is released before the execution of the asynchronous workflow.
Example
The following example shows how to use the Scan method. In this code, mailbox processor agents manage a series of simulated jobs that run and compute a result.
open System
open System.Threading
let random = System.Random()
// Generates mock jobs by using Async.Sleep.
let createJob(id:int, source:CancellationTokenSource) =
let job = async {
// Let the time be a random number between 1 and 10000.
// The mock computed result is a floating point value.
let time = random.Next(10000)
let result = random.NextDouble()
let count = ref 1
while (!count <= 100 && not source.IsCancellationRequested) do
do! Async.Sleep(time / 100)
count := !count + 1
return result
}
id, job, source
type Result = double
// A Job consists of a job ID and a computation that produces a single result.
type Job = int * Async<Result> * CancellationTokenSource
type Message = int * Result
let context = System.Threading.SynchronizationContext.Current
// This agent processes when jobs are completed.
let completeAgent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop n =
async {
let! (id, result) = inbox.Receive()
printfn "The result of job #%d is %f" id result
do! loop (n + 1)
}
loop (0))
// inprogressAgent maintains a queue of in-progress jobs that can be
// scanned to remove canceled jobs. It never runs its processor function,
// so we set it to do nothing.
let inprogressAgent = new MailboxProcessor<Job>(fun _ -> async { () })
// This agent starts each job in the order in which it is received.
let runAgent = MailboxProcessor<Job>.Start(fun inbox ->
let rec loop n =
async {
let! (id, job, source) = inbox.Receive()
printfn "Starting job #%d" id
// Post to the in-progress queue.
inprogressAgent.Post(id, job, source)
// Start the job.
Async.StartWithContinuations(job,
(fun result -> completeAgent.Post(id, result)),
(fun _ -> ()),
(fun cancelException -> printfn "Canceled job #%d" id),
source.Token)
do! loop (n + 1)
}
loop (0))
for id in 1 .. 10 do
let source = new CancellationTokenSource()
runAgent.Post(createJob(id, source))
let cancelJob(cancelId) =
Async.RunSynchronously(
inprogressAgent.Scan(fun (jobId, result, source) ->
let action =
async {
printfn "Canceling job #%d" cancelId
source.Cancel()
}
// Return Some(async) if the job ID matches.
if (jobId = cancelId) then
Some(action)
else
None))
printfn "Specify a job by number to cancel it, then press Enter."
let mutable finished = false
while not finished do
let input = System.Console.ReadLine()
let a = ref 0
if (Int32.TryParse(input, a) = true) then
cancelJob(!a)
else
printfn "Terminating."
finished <- true
A sample session follows.
Number Of Logical Processors: 2 Requesting job #1 Job #1 submitted. Job #1 started on procId 0. Requesting job #2 Job #2 submitted. Job #2 started on procId 1. Requesting job #3 Requesting job #4 Requesting job #5 Requesting job #6 Requesting job #7 Requesting job #8 Requesting job #9 Requesting job #10 Job #1 completed. Nth Prime for N = 5000 is 48611. Job #3 submitted. Job #3 started on procId 0. Done submitting jobs. Press Enter to exit when ready. Job #2 completed. Nth Prime for N = 10000 is 104729. Job #4 submitted. Job #4 started on procId 1. Job #3 completed. Nth Prime for N = 15000 is 163841. Job #5 submitted. Job #5 started on procId 0. Job #4 completed. Nth Prime for N = 20000 is 224737. Job #6 submitted. Job #6 started on procId 1. Job #5 completed. Nth Prime for N = 25000 is 287117.
Platforms
Windows 8, Windows 7, Windows Server 2012, Windows Server 2008 R2
Version Information
F# Core Library Versions
Supported in: 2.0, 4.0, Portable