Udostępnij za pośrednictwem


Task Parallelism in .NET 4.0 (a practical example)

I recently needed to develop some code that would read diagnostic information from a group of servers for a production application.  This included reading a count of event log entries, reading performance counter values and also performing database queries to retrieve exception log information for the application.  The idea was to provide a high level snapshot of the overall health of the application so at a glance support personnel could see if any issues were arising with the application (yes, I know, some home grown health monitoring).

In order to make the data retrieval efficient I wanted to take as many of the readings in parallel as possible .  Prior to .NET 4.0 I would have looked at using the ThreadPool for this functionality and queueing separate user work items for each request.  With .NET 4.0 however we now have use of the Task Parallel Library (TPL) which allows for more efficient use of system resources and also provides better programmatic control than is typically available when working directly with the thread pool.

In my particular scenario I have a group of servers that each have one or more of the above readings (event log count, performance counter or database query) that need to be retrieved.  The result of each reading returns a similar output (numeric in this case – either integer or double).  Since I want to execute these tasks in a concurrent manner, that’s where the TPL and the new Task class come into play.  The Task class allows me to easily execute an asynchronous operation and return a result. 

In my case I effectively want the retrieval of each data reading to be executed as a separate Task.  I can also have each task return a result by instantiating a Task<TResult> (see How to: Return a Value from a Task for more details).  I want each Task to return the value for the reading, along with the server name and reading name (this is used to group/correlate the readings after all tasks have executed, which I’ll explain shortly).  The output of each task is therefore defined by the DataResult class shown below.

    1: /// <summary>
    2: /// Class used to return results from the executing thread (task)
    3: /// </summary>
    4: class DataResult
    5: {
    6:     public string ServerName { get; set; }
    7:     public string MeasureName { get; set; }
    8:     public double MeasureValue { get; set; }
    9: }

Since I know the number of tasks I need to execute based on my inputs (the number of servers and the number of readings) I simply create a array of tasks of type DataResult.  I then iterate through the readings and servers, executing the appropriate method for each Task based on the type of reading being obtained.

    1: // Create a separate task/thread for each measurement to be taken
    2: Task<DataResult>[] tasks = new Task<DataResult>[measureList.Count() * serverList.Count()];
    3: int inx = 0;
    4:  
    5: // Retrieve the data
    6: foreach (DataModel.Measure measure in measureList)
    7: {
    8:     DataReading reading = new DataReading();
    9:     reading.DisplayName = measure.DisplayName;
   10:     reading.ServerValues = new Dictionary<string, ServerValue>();
   11:     data.Readings.Add(reading);
   12:  
   13:     foreach (DataModel.Server server in serverList)
   14:     {
   15:         ServerValue serverData = new ServerValue();
   16:         serverData.ServerName = server.HostName;
   17:         serverData.ServerType = server.ServerType.Type;
   18:         serverData.ServerId = server.Id;
   19:         reading.ServerValues.Add(serverData.ServerName, serverData);
   20:  
   21:         MeasureInfo measureInfo = new MeasureInfo(env, server, measure);
   22:         switch (measure.Type)
   23:         {
   24:             case (int)MeasureTypes.PerfCounter:
   25:                 tasks[inx] = new Task<DataResult>(GetPerfCounterValue, measureInfo);
   26:                 break;
   27:             case (int)MeasureTypes.EventLog:
   28:                 tasks[inx] = new Task<DataResult>(GetLogEntries, measureInfo);
   29:                 break;
   30:             case (int)MeasureTypes.DBErrors:
   31:                 tasks[inx] = new Task<DataResult>(GetDbErrors, measureInfo);
   32:                 break;
   33:         }
   34:         tasks[inx].Start();
   35:         inx++;
   36:     }
   37: }

Of primary concern above is the looping construct and the switch statement that instantiates each Task instance with the appropriate method target depending on the type of reading.  The other code is primarily concerned with passing the appropriate input to the target method and setting things up for the correlation.  The target methods for each type of reading take the measureInfo as input, containing information on the data to be retrieved, and return an instance of the DataResult class.  The methods for both the event log and performance counter operations are shown below.

    1: private DataResult GetLogEntries(object data)
    2: {
    3:     MeasureInfo measureInfo = data as MeasureInfo;
    4:     DataResult result = new DataResult()
    5:     {
    6:         MeasureName = measureInfo.Measure.DisplayName,
    7:         ServerName = GetServerName(measureInfo),
    8:         MeasureValue = -1
    9:     };
   10:     DateTime startTime = DateTime.Now;
   11:  
   12:     string dateTimeFilter = _queryTimeConstraint.ToString("yyyyMMddHHmmss.000000+000");
   13:     measureInfo.Measure.Data = string.Format(measureInfo.Measure.Data, dateTimeFilter);
   14:  
   15:     ManagementObjectSearcher mos = new ManagementObjectSearcher(String.Format(@"\\{0}\root\cimv2", measureInfo.Server.HostName), measureInfo.Measure.Data);
   16:  
   17:     result.MeasureValue = mos.Get().Count;
   18:  
   19:     return result;
   20: }
   21:  
   22:  
   23: private DataResult GetPerfCounterValue(object data)
   24: {
   25:     MeasureInfo measureInfo = data as MeasureInfo;
   26:     DataResult result = new DataResult()
   27:     {
   28:         MeasureName = measureInfo.Measure.DisplayName,
   29:         ServerName = GetServerName(measureInfo),
   30:         MeasureValue = -1
   31:     };
   32:  
   33:     string[] perfInfo = measureInfo.Measure.Data.Split(';');
   34:     PerformanceCounter perfCounter = new System.Diagnostics.PerformanceCounter();
   35:     perfCounter.CategoryName = perfInfo[0];
   36:     perfCounter.CounterName = perfInfo[1];
   37:     if (perfInfo.Length > 2)
   38:     {
   39:         perfCounter.InstanceName = perfInfo[2];
   40:     }
   41:     perfCounter.MachineName = measureInfo.Server.HostName;
   42:  
   43:     result.MeasureValue = perfCounter.NextValue();
   44:  
   45:     return result;
   46: }

The returned DataResult instance ends up in the task’s Result property.  Since I need to do some correlation once all the tasks have finished executing, I can simply do a Task.WaitAll on the array of tasks (NOTE: Accessing the Result property on the task instance will also block the calling thread until the task finishes).  Once they’ve finished I can then loop through the tasks and perform my correlation using the Result property as shown. 

    1: // Wait for all tasks to complete and then copy the result to the appropriate reading
    2: Task.WaitAll(tasks);
    3: foreach (Task<DataResult> task in tasks)
    4: {
    5:     DataReading result = data.Readings.Find(delegate(DataReading reading)
    6:     {
    7:         return reading.DisplayName == task.Result.MeasureName;
    8:     });
    9:  
   10:     if (result != null)
   11:     {
   12:         ServerValue serverValue = result.ServerValues[task.Result.ServerName];
   13:         serverValue.Value = task.Result.MeasureValue;
   14:     }
   15: }

That’s it.  Multi-threaded functionality added to my app with only a few lines of code and much greater programmatic control by using Tasks, especially the ability to return a complex data type as the result of the execution of each task.  This example just scratches the surface of the TPL and using it for Task Parallelism.  For additional topics please see Parallel Programming in the .NET Framework.