Поделиться через


F# Parallel Execution and Future Tasks

In a previous post I talked about some options around performing parallel executions by providing some wrappers that allowed a collection of functions to be executed in parallel:

https://blogs.msdn.com/b/carlnol/archive/2011/07/17/f-and-running-parallel-tasks.aspx

However there is another approach one could take.

The main challenge in performing parallel code using Tasks in F# is managing and casting objects to the correct types. If one just needs to initialize a future task and gather the result later in the execution path, one option is an implementation of a simpler Future type:

type Future<'T> (func : unit -> 'T) =

    let cancellationTokenSource = new CancellationTokenSource()
    let runningTask = Task.Factory.StartNew(Func<'T>(func), cancellationTokenSource.Token)

    member this.Cancel() =
        cancellationTokenSource.Cancel()

    member this.IsDone() =
        runningTask.IsCompleted

    member this.IsCancelled() =
        runningTask.IsCanceled

    member this.IsFaulted() =
        runningTask.IsFaulted

    member this.Wait() =
        runningTask.Wait(cancellationTokenSource.Token)

    member this.Wait(timespan:TimeSpan) =
        runningTask.Wait((int timespan.TotalMilliseconds), cancellationTokenSource.Token)

    member this.Get() =
        runningTask.Result

    member this.Get(timespan:TimeSpan) =
        if (this.Wait(timespan)) then runningTask.Result
        else raise (TimeoutException("No result available in TimeSpan specified"))

This type is initialized with a function that is executed using a Task, thus returning immediately. The caller can then get the execution result, when needed, by calling the Get() method.

In the case of non-value returning function, the Func mapping still works as a call to Get() will return “unit”.

The Future type not only simplifies starting up parallel tasks but also simplifies the process of cancelling them. All one has to do is call Cancel() without the need to manage cancellation tokens.

To use this type one can write code such as:

let waiter (ms:int) =
    Thread.Sleep(ms)
    ms

let getter1 = Future<int>(fun () -> waiter 5000)
let getter2 = Future<int>(fun () -> waiter 8000)
let exec1 = Future(fun () -> Thread.Sleep(3000))

let val1 = getter1.Get()
let val2 = getter2.Get(TimeSpan(0, 0, 5))
exec1.Wait()

It may come as no surprise that this code takes 8 seconds to execute.

An alternative approach to the implementation is to define an object hierarchy of FutureTask and FutureTask<’T> such as:

type FutureTask internal (task : Task, cancellationTokenSource : CancellationTokenSource) =

    let cancellation = cancellationTokenSource
    let runningTask = task

    new(action: unit -> unit) =
        let cancellationTokenSource = new CancellationTokenSource()
        let task = Task.Factory.StartNew(Action(action), cancellationTokenSource.Token)
        FutureTask(task, cancellationTokenSource)

    member this.Cancel() =
        cancellation.Cancel()

    member this.IsDone() =
        runningTask.IsCompleted

    member this.IsCancelled() =
        runningTask.IsCanceled

    member this.IsFaulted() =
        runningTask.IsFaulted

    member this.Wait() =
        runningTask.Wait(cancellation.Token)

    member this.Wait(timespan:TimeSpan) =
        runningTask.Wait((int timespan.TotalMilliseconds), cancellation.Token)

type FutureTask<'T> internal (task : Task<'T>, cancellationTokenSource : CancellationTokenSource) =
    inherit FutureTask((task :> Task), cancellationTokenSource)

    let runningTask = task

    new(func: unit -> 'T) =
        let cancellationTokenSource = new CancellationTokenSource()
        let taskFunc = Task.Factory.StartNew(Func<'T>(func), cancellationTokenSource.Token)
        FutureTask<'T>(taskFunc, cancellationTokenSource)

    member this.Get() =
        runningTask.Result

    member this.Get(timespan:TimeSpan) =
        if (this.Wait(timespan)) then runningTask.Result
        else raise (TimeoutException("No result available in TimeSpan specified"))

In this implementation a distinction is made between running a value-returning function, FutureTask<’T>, and a non-value returning function, FutureTask. These are mapped to as Task with a Func and Action respectively.