Freigeben über


Using F# agents to manage background jobs

In a previous post, I started out with some simple F# agents.  As we saw previously, agents process messages in a queue and perform actions in response. In the previous post, I created a simple console application that starts an agent, and then starts a loop that reads each line of input from the console and submits it to the agent.  The intention is to build up the code to produce a small console app that I can use to launch and control background jobs.  The first change from the previous code is to allow multiple messages to be sent before waiting for a reply on the reply channel. This means that instead of using PostAndReply to send a message to the agent, I need to use PostAndAsyncReply.  Just changing the function changes the return value so that now an asynchronous workflow object (in this case Async<Message>) is returned.  I need to start this.  I have several options.  Just to get started, let's just run the workflow by using Async.RunSynchronously.  This results in code that is pretty much equivalent to PostAndReply, since the asynchronous workflow is only being used in a synchronous way.  The code will still block waiting for the reply to each job before enabling you to start another one.

module MailboxProcessorTest =

 

    open System

 

    type Message = string * AsyncReplyChannel<string>

 

    let formatString = "Message number {0} was received. Message contents: {1}"

 

    let agent = MailboxProcessor<Message>.Start(fun inbox ->

        let rec loop n =

            async {

                    let! (message, replyChannel) = inbox.Receive();

 

                    if (message = "Stop") then

                        replyChannel.Reply("Stop")

                    else

                        replyChannel.Reply(String.Format(formatString, n, message))

                    do! loop (n + 1)

            }

        loop (0))

 

    printfn "Mailbox Processor Test"

    printfn "Type some text and press Enter to submit a message."

    printfn "Type 'Stop' to terminate."

 

    let rec loop() =

        printf "> "

        let input = Console.ReadLine()

        let replyAsync = agent.PostAndAsyncReply(fun replyChannel -> input, replyChannel)

        // Wait for reply.

        let reply = Async.RunSynchronously(replyAsync);

        if (reply <> "Stop") then

            printfn "Reply: %s" reply

            loop()

        else

            ()

    loop()

 

    printfn "Press Enter to continue."

    Console.ReadLine() |> ignore

 

 

 I can think of two other options.  I could start it as a task by using Async.StartAsTask.  StartAsTask launches a .NET Framework Task and then returns control immediately, so it is asynchronous.  The return value is a Task object, and so I could start a number of tasks and maintain a collection of the tasks that are currently running, and track them when they complete.  Here is what the code looks like using tasks:

module MailboxProcessorTest =

 

    open System

 

    type Message = string * AsyncReplyChannel<string>

 

    let formatString = "Message number {0} was received. Message contents: {1}"

 

    let agent = MailboxProcessor<Message>.Start(fun inbox ->

        let rec loop n =

            async {           

                    let! (message, replyChannel) = inbox.Receive();

 

                    if (message = "Stop") then

                        replyChannel.Reply("Stop")

                    else

                        replyChannel.Reply(String.Format(formatString, n, message))

                    do! loop (n + 1)

            }

        loop (0))

 

    printfn "Mailbox Processor Test"

    printfn "Type some text and press Enter to submit a message."

    printfn "Type 'Stop' to terminate."

 

    let mutable isCompleted = false

    let mutable tasks = []

    while (not isCompleted) do

        printf "> "

        let input = Console.ReadLine()

        let replyAsync = agent.PostAndAsyncReply(fun replyChannel -> input, replyChannel)

        // Wait for reply.

        let task = Async.StartAsTask(replyAsync);

        // Add to task collection.

        tasks <- task :: tasks

        // Check tasks for completion

        // and display the results of completed tasks.

        let mutable newTaskList = []

        for task in tasks do

            if (task.IsCompleted) then

                let result = task.Result

                if (result <> "Stop") then

                    printfn "Task result: %s" result

                else

                    isCompleted <- true;

            else

                newTaskList <- task :: newTaskList

        tasks <- newTaskList

 

 

    printfn "Press enter to continue."

    Console.ReadLine() |> ignore

 

 

I think it actually is simpler to just use Async.StartWithContinuations.  Instead of having to manage a Task object, I can just specify the code I want to run when the reply is available.  This is referred to as a continuation function, and I can just specify this function as a lambda expression in the arguments to Async.StartWithContinuations.  I really like Async.StartWithContinuations.  Being able to specify code to run later in a simple lambda expression seems pretty powerful.  There are actually three different continuation functions that Async.StartWithContinuations accepts. The first is the code to run when a result is available. The second is the code to run when an exception occurs, and the third is the code to run when an operation is canceled.  In this case, we're only using the first one.  Next time we'll look at using the cancellation continuation.  The exception continuation won't be used in these examples, because the mailbox processor has a different mechanism for transmitting exceptions, using the Error event, which I showed at the end of the previous blog post

 I'll also add some code to delay when the messages return, so it is more obvious that this is working asynchronously, and because I expect the jobs to do that when I write my job agent.

module MailboxProcessorTest =

 

    open System

 

    type Message = string * AsyncReplyChannel<string>

 

    let formatString = "Message number {0} was received. Message contents: {1}"

 

    let agent = MailboxProcessor<Message>.Start(fun inbox ->

        let rec loop n =

            async {           

                    let! (message, replyChannel) = inbox.Receive();

                    // Delay so that the responses come in a different order.

                    do! Async.Sleep( 5000 - 1000 * n);

                    replyChannel.Reply(String.Format(formatString, n, message))

                    do! loop (n + 1)

            }

        loop (0))

 

    printfn "Mailbox Processor Test"

    printfn "Type some text and press Enter to submit a message."

 

    let isCompleted = false

    while (not isCompleted) do

        printf "> "

        let input = Console.ReadLine()

        let messageAsync = agent.PostAndAsyncReply(fun replyChannel -> input, replyChannel)

 

        // Set up a continuation function (the first argument below) that prints the reply.

        // The second argument is the exception continuation (not used).

        // The third argument is the cancellation continuation (not used).

        Async.StartWithContinuations(messageAsync,

             (fun reply -> printfn "%s" reply),

             (fun _ -> ()),

             (fun _ -> ()))

 

    printfn "Press Enter to continue."

    Console.ReadLine() |> ignore

 

Here an example session.  It looks a bit messy, because the output is interleaved with the input.  Output just appears whenever a job completes.

Mailbox Processor Test
Type some text and press Enter to submit a message.
> test
> test2
> test3
> test4
> tMessage number 0 was received. Message contents: test
est5
> Message number 1 was received. Message contents: test2
Message number 2 was received. Message contents: test3
Message number 3 was received. Message contents: test4
Message number 4 was received. Message contents: test5

OK. So now that I have figured out the basic mechanics, I am ready to write my job agent.  I'm going to start with a mock job, which is just going to sleep for a random amount of time.  I have added a createJob function that takes a job ID and creates a simulated job.  I'm going to submit this job instead of the string as the Message, and I'm going to change the reply to include the result of the job and its ID.  To actually start the job, I'm going to use StartWithContinuations within the message loop and the reply is going to be sent in the continuation.  That way all the jobs start at once, but the replies come in a staggered fashion as the jobs finish.

module JobAgent =

 

    open System

 

    let random = System.Random()

 

    // Generates mock jobs using Async.Sleep.

    let createJob(id:int) =

        let job = async {

            // Let the time be a random number between 1 and 10000,

            // and the mock computed result is a floating point value.

            let time = random.Next(10000)

            let result = random.NextDouble()

            do! Async.Sleep(time)

            return result

            }

        id, job

 

    type Result = double

 

    // a Job consists of a job ID, and a computation that produces a single result.

    type Job = int * Async<Result>

 

    type Message = Job * AsyncReplyChannel<int * Result>

 

    let formatString = "Message number {0} was received. Message contents: {1}"

 

    let agent = MailboxProcessor<Message>.Start(fun inbox ->

        let rec loop n =

            async {         

                let! (message, replyChannel) = inbox.Receive();

                let (jobID, job) = message

                printfn "Starting job #%d" jobID

                // Set up the reply to be submitted when the job completes.

                Async.StartWithContinuations(job,

                   (fun result -> replyChannel.Reply(jobID, result)),

                   (fun _ -> ()),

                   (fun _ -> ()))

                do! loop (n + 1)

            }

        loop (0))

 

    printfn "Starting 10 jobs."  

 

    let rec loop(count) =

        let input = createJob(count)

        let replyAsync = agent.PostAndAsyncReply(fun replyChannel -> input, replyChannel)

        // Post the job request

        Async.StartWithContinuations(replyAsync, (fun (id, result) ->

            printfn "Result of job %d is %f." id result),

            (fun _ -> ()), // exception continuation not used

            (fun _ -> ()))

        if count < 10 then

            loop(count + 1)

    loop(0)

 

    printfn "Press Enter to close."

    Console.ReadLine() |> ignore

 

Here is an example of the output of the previous code:

Starting 10 jobs.
Press Enter to close.
Starting job #0
Starting job #1
Starting job #2
Starting job #3
Starting job #4
Starting job #5
Starting job #6
Starting job #7
Starting job #8
Starting job #9
Starting job #10
Result of job 5 is 0.784409.
Result of job 2 is 0.668888.
Result of job 4 is 0.747429.
Result of job 3 is 0.961764.
Result of job 1 is 0.278119.
Result of job 8 is 0.469319.
Result of job 10 is 0.495857.
Result of job 0 is 0.629329.
Result of job 7 is 0.483028.
Result of job 9 is 0.451606.
Result of job 6 is 0.514781.

Now let's actually calculate something interesting.  The code changes slightly, since now we're taking a lambda for the function to run to compute a result.  Also, the result is going to be a generic quantity.  I want the generic type to be inferred, so I'm going to use the wildcard character (_) in one place where I would normally have to put in the exact type.  In this case, the result type is going to be BigInteger, since we're computing the Nth prime numbers.  There are definitely more efficient ways of computing prime numbers, but the main point here is to demonstrate running a computation, not to optimize that calculation.

module JobAgent =

 

    open System

 

    let random = System.Random()

 

    // Generates mock jobs using Async.Sleep.

    let createJob(id:int, computation, input : int) =

        let job = async {

            // Let the time be a random number between 1 and 10000,

            // and the mock computed result is a floating point value.

            let time = random.Next(10000)

            let result = computation(input)

            return result

            }

        id, job

 

    // a Job consists of a job ID, a computation that produces a single result.

    type Job<'T> = int * Async<'T>

 

    type Message<'T> = Job<'T> * AsyncReplyChannel<int * 'T>

 

    let formatString = "Message number {0} was received. Message contents: {1}"

 

    let agent = MailboxProcessor<Message<_>>.Start(fun inbox ->

        let rec loop n =

            async {         

                let! (message, replyChannel) = inbox.Receive();

                let (jobID, job) = message

                printfn "Starting job #%d" jobID

                // Set up the reply to be submitted when the job completes.

                Async.StartWithContinuations(job,

                   (fun result -> replyChannel.Reply(jobID, result)),

                   (fun _ -> ()),

                   (fun _ -> ()))

                do! loop (n + 1)

            }

        loop (0))

 

    printfn "Computing the Nth prime numbers."

 

    // Recursive isprime function.

    let isprime number =

        let rec check count =

            count > number/2 || (number % count <> 0 && check (count + 1))

        check 2

 

    let isprimeBigInt number =

        let rec check count =

            count > number/2I || (number % count <> 0I && check (count + 1I))

        check 2I

 

    let computeNthPrime (number) =

         if (number < 1) then

             invalidOp <| sprintf "Invalid input for nth prime: %s." (number.ToString())

         let mutable count = 0

         let mutable num = 1I

         let isDone = false

         while (count < number) do

             num <- num + 1I

             if (num < bigint System.Int32.MaxValue) then

                 while (not (isprime (int num))) do

                     num <- num + 1I

             else

                 while (not (isprimeBigInt num)) do

                     num <- num + 1I

             count <- count + 1

         num 

 

    let multiplier = 1000

 

    let rec loop(id) =

        let input = createJob(id, (fun n -> computeNthPrime(n) ), multiplier * id )

        let replyAsync = agent.PostAndAsyncReply(fun replyChannel -> input, replyChannel)

        // Post the job request

        Async.StartWithContinuations(replyAsync, (fun (id, result) ->

            printfn "Nth Prime for N = %d is %s." (multiplier*id) (result.ToString())),

            (fun _ -> ()), // exception continuation not used

            (fun _ -> ()))

        if id < 10 then

            loop(id + 1)

    loop(1)

 

    printfn "Press Enter to close."

    Console.ReadLine() |> ignore

 

 Here is some output.  On my computer, which is running an Intel Core Duo processor, about two jobs seem to be able to run at the same time.

Computing the Nth prime numbers.
Press Enter to close.
Starting job #1
Starting job #2
Nth Prime for N = 1000 is 7919.
StartinNth Prime for N = 2000 is 17389.
g job #3
Starting job #4
Nth Prime for N = 3000 is 27449.
Starting job #5
Nth Prime for N = 4000 is 37813.
Starting job #6
Nth Prime for N = 5000 is 48611.
StartNth Prime for N = in6000 is 59359.
g job #7
Starting job #8
Nth Prime for N = 7000 is 70657.
Starting job #9
Nth Prime for N = 8000 is 81799.
Starting job #Nth Prime for N = 9000 is 93179.
10
Nth Prime for N = 10000 is 104729.

That's all for now.  Next time, instead of using the reply channel to communicate back to the thread that posts the messages, I'll use multiple agents (one for job requests, one for running jobs, and one for completion notifications) and we'll look at how to get the currently running jobs and how to cancel jobs.  I was impatient, so I set the multiplier for N to be only 1000.  You could try copying this code and changing that number to a larger one.

Gordon