共用方式為


Framework for Composing and Submitting .Net Hadoop MapReduce Jobs (Archived)

An updated version of this post can be found at:

https://blogs.msdn.com/b/carlnol/archive/2012/04/29/generic-based-framework-for-net-hadoop-mapreduce-job-submission.aspx

If you have been following my blog you will see that I have been putting together samples for writing .Net Hadoop MapReduce jobs; using Hadoop Streaming. However one thing that became apparent is that the samples could be reconstructed in a composable framework to enable one to submit .Net based MapReduce jobs whilst only writing Mappers and Reducers types.

To this end I have put together a framework that allows one to submit MapReduce jobs using the following command line syntax:

MSDN.Hadoop.Submission.Console.exe -input "mobile/data/debug/sampledata.txt" -output "mobile/querytimes/debug"
-mapper "MSDN.Hadoop.MapReduceFSharp.MobilePhoneQueryMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.MobilePhoneQueryReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduce\bin\Release\MSDN.Hadoop.MapReduceFSharp.dll"

Where the mapper and reducer parameters are .Net types that derive from a base Map and Reduce abstract classes. The input, output, and files options are analogous to the standard Hadoop streaming submissions. The mapper and reducer options (more on a combiner option later) allow one to define a .Net type derived from the appropriate abstract base classes.

Under the covers standard Hadoop Streaming is being used, where controlling executables are used to handle the StdIn and StdOut operations and activating the required .Net types. The “file” parameter is required to specify the DLL for the .Net type to be loaded at runtime, in addition to any other required files.

As an aside the framework and base classes are all written in F#; with sample Mappers and Reducers, and abstract base classes being provided both in C# and F#. The code is based off the F# Streaming samples in my previous blog posts. I will cover more of the semantics of the code in a later post, but I wanted to provide some usage samples of the code.

As always the source can be downloaded from:

https://code.msdn.microsoft.com/Framework-for-Composing-af656ef7

Mapper and Reducer Base Classes

The following definitions outline the abstract base classes from which one needs to derive.

C# Base

  1. [AbstractClass]
  2. public abstract class MapReduceBase
  3. {
  4.     protected MapReduceBase();
  5.  
  6.     public override IEnumerable<Tuple<string, object>> Cleanup();
  7.     public override void Setup();
  8. }

C# Base Mapper

  1. public abstract class MapperBaseText : MapReduceBase
  2. {
  3.     protected MapperBaseText();
  4.  
  5.     public override IEnumerable<Tuple<string, object>> Cleanup();
  6.     public abstract override IEnumerable<Tuple<string, object>> Map(string value);
  7. }

C# Base Reducer

  1. [AbstractClass]
  2. public abstract class ReducerBase : MapReduceBase
  3. {
  4.     protected ReducerBase();
  5.  
  6.     public abstract override IEnumerable<Tuple<string, object>> Reduce(string key, IEnumerable<string> values);
  7. }

F# Base

  1. [<AbstractClass>]
  2. type MapReduceBase() =
  3.  
  4.     abstract member Setup: unit -> unit
  5.     default this.Setup() = ()
  6.  
  7.     abstract member Cleanup: unit -> IEnumerable<string * obj>
  8.     default this.Cleanup() = Seq.empty

F# Base Mapper

  1. [<AbstractClass>]
  2. type MapperBaseText() =
  3.     inherit MapReduceBase()
  4.  
  5.     abstract member Map: string -> IEnumerable<string * obj>
  6.  
  7.     abstract member Cleanup: unit -> IEnumerable<string * obj>
  8.     default this.Cleanup() = Seq.empty

F# Base Reducer

  1. [<AbstractClass>]
  2. type ReducerBase() =
  3.     inherit MapReduceBase()
  4.  
  5.     abstract member Reduce: key:string -> values:IEnumerable<string> -> IEnumerable<string * obj>

The objective in defining these base classes was to not only support creating .Net Mapper and Reducers but also to provide a means for Setup and Cleanup operations to support In-Place Mapper optimizations, utilize IEnumerable and sequences for publishing data from the Mappers and Reducers, and finally provide a simple submission mechanism analogous to submitting Java based jobs.

For each class a Setup function is provided to allow one to perform tasks related to the instantiation of each Mapper and/or Reducer. The Mapper’s Map and Cleanup functions return an IEnumerable consisting of tuples with a a Key/Value pair. It is these tuples that represent the mappers output. Currently the types of the key and value’s are respectively a String and an Object. These are then converted to strings for the streaming output.

The Reducer takes in an IEnumerable of the Object String representations, created by the Mapper output, and reduces this into a Object value enumerable. Once again the Cleanup allows for return values to allow for In-Reducer optimizations.

Combiners

The support for Combiners is provided through one of two means. As is often the case, support is provided so one can reuse a Reducer as a Combiner. In addition explicit support is provided for a Combiner using the following abstract class definition:

C# Base Combiner

  1. public class MobilePhoneMinCombiner : CombinerBase
  2. {
  3.     public override IEnumerable<Tuple<string, object>> Combine(string key, IEnumerable<string> value)
  4.     {
  5.         yield return new Tuple<string, object>(key, value.Select(timespan => TimeSpan.Parse(timespan)).Min());
  6.     }
  7. }

F# Base Combiner

  1. [<AbstractClass>]
  2. type CombinerBase() =
  3.     inherit MapReduceBase()
  4.  
  5.     abstract member Combine: key:string -> values:IEnumerable<string> -> IEnumerable<string * obj>

Using a Combiner follows exactly the same pattern for using mappers and reducers, as example being:

-combiner "MSDN.Hadoop.MapReduceCSharp.MobilePhoneMinCombiner, MSDN.Hadoop.MapReduceCSharp"

The prototype for the Combiner is essentially the same as that of the Reducer except the function called for each row of data is Combine, rather than Reduce.

Binary and XML Processing

In my previous posts on Hadoop Streaming I provided samples that allowed one to perform Binary and XML based Mappers. The composable framework also provides support for submitting jobs that support Binary and XML based Mappers. To support this the following additional abstract classes have been defined:

C# Base Binary Mapper

  1. [AbstractClass]
  2. public abstract class MapperBaseBinary : MapReduceBase
  3. {
  4.     protected MapperBaseBinary();
  5.  
  6.     public abstract override IEnumerable<Tuple<string, object>> Map(string filename, Stream document);
  7. }

C# Base XML Mapper

  1. [AbstractClass]
  2. public abstract class MapperBaseXml : MapReduceBase
  3. {
  4.     protected MapperBaseXml();
  5.  
  6.     public abstract override IEnumerable<Tuple<string, object>> Map(XElement element);
  7. }

F# Base Binary Mapper

  1. [<AbstractClass>]
  2. type MapperBaseBinary() =
  3.     inherit MapReduceBase()
  4.  
  5.     abstract member Map: filename:string -> document:Stream -> IEnumerable<string * obj>

F# Base XML Mapper

  1. [<AbstractClass>]
  2. type MapperBaseXml() =
  3.     inherit MapReduceBase()
  4.  
  5.     abstract member Map: element:XElement -> IEnumerable<string * obj>

To support using Mappers and Reducers derived from these types a “format” submission parameter is required. Supported values being Text, Binary, and XML; the default value being “Text”.

To submit a binary streaming job one just has to use a Mapper derived from the MapperBaseBinary abstract class and use the binary format specification:

-format Binary

In this case  the input into the Mapper will be a Stream object that represents a complete binary document instance.

To submit an XML streaming job one just has to use a Mapper derived from the MapperBaseXml abstract class and use the XML format specification, along with a node to be processed within the XML documents:

-format XML –nodename Node

In this case the input into the Mapper will be an XElement node derived from the XML document based on the nodename parameter.

Samples

To demonstrate the submission framework here are some sample Mappers and Reducers with the corresponding command line submissions:

C# Mobile Phone Range (with In-Mapper optimization)

  1. namespace MSDN.Hadoop.MapReduceCSharp
  2. {
  3.     public class MobilePhoneRangeMapper : MapperBaseText
  4.     {
  5.         private Dictionary<string, Tuple<TimeSpan, TimeSpan>> ranges;
  6.  
  7.         private Tuple<string, TimeSpan> GetLineValue(string value)
  8.         {
  9.             try
  10.             {
  11.                 string[] splits = value.Split('\t');
  12.                 string devicePlatform = splits[3];
  13.                 TimeSpan queryTime = TimeSpan.Parse(splits[1]);
  14.                 return new Tuple<string, TimeSpan>(devicePlatform, queryTime);
  15.             }
  16.             catch (Exception)
  17.             {
  18.                 return null;
  19.             }
  20.         }
  21.  
  22.         /// <summary>
  23.         /// Define a Dictionary to hold the (Min, Max) tuple for each device platform.
  24.         /// </summary>
  25.         public override void Setup()
  26.         {
  27.             this.ranges = new Dictionary<string, Tuple<TimeSpan, TimeSpan>>();
  28.         }
  29.  
  30.         /// <summary>
  31.         /// Build the Dictionary of the (Min, Max) tuple for each device platform.
  32.         /// </summary>
  33.         public override IEnumerable<Tuple<string, object>> Map(string value)
  34.         {
  35.             var range = GetLineValue(value);
  36.             if (range != null)
  37.             {
  38.                 if (ranges.ContainsKey(range.Item1))
  39.                 {
  40.                     var original = ranges[range.Item1];
  41.                     if (range.Item2 < original.Item1)
  42.                     {
  43.                         // Update Min amount
  44.                         ranges[range.Item1] = new Tuple<TimeSpan, TimeSpan>(range.Item2, original.Item2);
  45.                     }
  46.                     if (range.Item2 > original.Item2)
  47.                     {
  48.                         //Update Max amount
  49.                         ranges[range.Item1] = new Tuple<TimeSpan, TimeSpan>(original.Item1, range.Item2);
  50.                     }
  51.                 }
  52.                 else
  53.                 {
  54.                     ranges.Add(range.Item1, new Tuple<TimeSpan, TimeSpan>(range.Item2, range.Item2));
  55.                 }
  56.             }
  57.  
  58.             return Enumerable.Empty<Tuple<string, object>>();
  59.         }
  60.  
  61.         /// <summary>
  62.         /// Return the Dictionary of the Min and Max values for each device platform.
  63.         /// </summary>
  64.         public override IEnumerable<Tuple<string, object>> Cleanup()
  65.         {
  66.             foreach (var range in ranges)
  67.             {
  68.                 yield return new Tuple<string, object>(range.Key, range.Value.Item1);
  69.                 yield return new Tuple<string, object>(range.Key, range.Value.Item2);
  70.             }
  71.         }
  72.     }
  73.  
  74.     public class MobilePhoneRangeReducer : ReducerBase
  75.     {
  76.         public override IEnumerable<Tuple<string, object>> Reduce(string key, IEnumerable<string> value)
  77.         {
  78.             var baseRange = new Tuple<TimeSpan, TimeSpan>(TimeSpan.MaxValue, TimeSpan.MinValue);
  79.             var rangeValue = value.Select(stringspan => TimeSpan.Parse(stringspan)).Aggregate(baseRange, (accSpan, timespan) =>
  80.                     new Tuple<TimeSpan, TimeSpan>((timespan < accSpan.Item1) ? timespan : accSpan.Item1, (timespan > accSpan.Item2) ? timespan : accSpan.Item2));
  81.  
  82.             yield return new Tuple<string, object>(key, rangeValue);
  83.         }
  84.     }
  85. }

MSDN.Hadoop.Submission.Console.exe -input "mobilecsharp/data" -output "mobilecsharp/querytimes"
-mapper "MSDN.Hadoop.MapReduceCSharp.MobilePhoneRangeMapper, MSDN.Hadoop.MapReduceCSharp"
-reducer "MSDN.Hadoop.MapReduceCSharp.MobilePhoneRangeReducer, MSDN.Hadoop.MapReduceCSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceCSharp\bin\Release\MSDN.Hadoop.MapReduceCSharp.dll"

C# Mobile Min (with Mapper, Combiner, Reducer)

  1. namespace MSDN.Hadoop.MapReduceCSharp
  2. {
  3.     public class MobilePhoneMinMapper : MapperBaseText
  4.     {
  5.         private Tuple<string, object> GetLineValue(string value)
  6.         {
  7.             try
  8.             {
  9.                 string[] splits = value.Split('\t');
  10.                 string devicePlatform = splits[3];
  11.                 TimeSpan queryTime = TimeSpan.Parse(splits[1]);
  12.                 return new Tuple<string, object>(devicePlatform, queryTime);
  13.             }
  14.             catch (Exception)
  15.             {
  16.                 return null;
  17.             }
  18.         }
  19.  
  20.         public override IEnumerable<Tuple<string, object>> Map(string value)
  21.         {
  22.             var returnVal = GetLineValue(value);
  23.             if (returnVal != null) yield return returnVal;
  24.         }
  25.     }
  26.  
  27.     public class MobilePhoneMinCombiner : CombinerBase
  28.     {
  29.         public override IEnumerable<Tuple<string, object>> Combine(string key, IEnumerable<string> value)
  30.         {
  31.             yield return new Tuple<string, object>(key, value.Select(timespan => TimeSpan.Parse(timespan)).Min());
  32.         }
  33.     }
  34.  
  35.     public class MobilePhoneMinReducer : ReducerBase
  36.     {
  37.         public override IEnumerable<Tuple<string, object>> Reduce(string key, IEnumerable<string> value)
  38.         {
  39.             yield return new Tuple<string, object>(key, value.Select(timespan => TimeSpan.Parse(timespan)).Min());
  40.         }
  41.     }
  42. }

MSDN.Hadoop.Submission.Console.exe -input "mobilecsharp/data" -output "mobilecsharp/querytimes"
-mapper "MSDN.Hadoop.MapReduceCSharp.MobilePhoneMinMapper, MSDN.Hadoop.MapReduceCSharp"
-reducer "MSDN.Hadoop.MapReduceCSharp.MobilePhoneMinReducer, MSDN.Hadoop.MapReduceCSharp"
-combiner "MSDN.Hadoop.MapReduceCSharp.MobilePhoneMinCombiner, MSDN.Hadoop.MapReduceCSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceCSharp\bin\Release\MSDN.Hadoop.MapReduceCSharp.dll"

F# Mobile Phone Query

  1. namespace MSDN.Hadoop.MapReduceFSharp
  2.  
  3. open System
  4. open MSDN.Hadoop.MapReduceBase
  5.  
  6. // Extracts the QueryTime for each Platform Device
  7. type MobilePhoneQueryMapper() =
  8.     inherit MapperBaseText()
  9.  
  10.     // Performs the split into key/value
  11.     let splitInput (value:string) =
  12.         try
  13.             let splits = value.Split('\t')
  14.             let devicePlatform = splits.[3]
  15.             let queryTime = TimeSpan.Parse(splits.[1])
  16.             Some(devicePlatform, box queryTime)
  17.         with
  18.         | :? System.ArgumentException -> None
  19.  
  20.     // Map the data from input name/value to output name/value
  21.     override self.Map (value:string) =
  22.         seq {
  23.             let result = splitInput value
  24.             if result.IsSome then
  25.                 yield result.Value
  26.         }
  27.  
  28. // Calculates the (Min, Avg, Max) of the input stream query time (based on Platform Device)
  29. type MobilePhoneQueryReducer() =
  30.     inherit ReducerBase()
  31.  
  32.     override self.Reduce (key:string) (values:seq<string>) =
  33.         let initState = (TimeSpan.MaxValue, TimeSpan.MinValue, 0L, 0L)
  34.         let (minValue, maxValue, totalValue, totalCount) =
  35.             values |>
  36.             Seq.fold (fun (minValue, maxValue, totalValue, totalCount) value ->
  37.                 (min minValue (TimeSpan.Parse(value)), max maxValue (TimeSpan.Parse(value)), totalValue + (int64)(TimeSpan.Parse(value).TotalSeconds), totalCount + 1L) ) initState
  38.  
  39.         Seq.singleton (key, box (minValue, TimeSpan.FromSeconds((float)(totalValue/totalCount)), maxValue))

MSDN.Hadoop.Submission.Console.exe -input "mobile/data" -output "mobile/querytimes"
-mapper "MSDN.Hadoop.MapReduceFSharp.MobilePhoneQueryMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.MobilePhoneQueryReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceFSharp\bin\Release\MSDN.Hadoop.MapReduceFSharp.dll"

F# Store XML (XML in Samples)

  1. namespace MSDN.Hadoop.MapReduceFSharp
  2.  
  3. open System
  4. open System.Collections.Generic
  5. open System.Linq
  6. open System.IO
  7. open System.Text
  8. open System.Xml
  9. open System.Xml.Linq
  10.  
  11. open MSDN.Hadoop.MapReduceBase
  12.  
  13. // Extracts the QueryTime for each Platform Device
  14. type StoreXmlElementMapper() =    
  15.     inherit MapperBaseXml()
  16.  
  17.     override self.Map (element:XElement) =
  18.  
  19.         let aw = "https://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey"
  20.         let demographics = element.Element(XName.Get("Demographics")).Element(XName.Get("StoreSurvey", aw))
  21.  
  22.         seq {
  23.             if not(demographics = null) then
  24.                 let business = demographics.Element(XName.Get("BusinessType", aw)).Value
  25.                 let sales = Decimal.Parse(demographics.Element(XName.Get("AnnualSales", aw)).Value) |> box
  26.                 yield (business, sales)
  27.             }
  28.  
  29. // Calculates the Total Revenue of the store demographics
  30. type StoreXmlElementReducer() =
  31.     inherit ReducerBase()
  32.  
  33.     override self.Reduce (key:string) (values:seq<string>) =
  34.         let totalRevenue =
  35.             values |>
  36.             Seq.fold (fun revenue value -> revenue + Int32.Parse(value)) 0
  37.  
  38.         Seq.singleton (key, box totalRevenue)

MSDN.Hadoop.Submission.Console.exe -input "stores/demographics" -output "stores/banking"
-mapper "MSDN.Hadoop.MapReduceFSharp.StoreXmlElementMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.StoreXmlElementReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceFSharp\bin\Release\MSDN.Hadoop.MapReduceFSharp.dll"
-nodename Store -format Xml

F# Binary Document (Word and PDF Documents)

  1. namespace MSDN.Hadoop.MapReduceFSharp
  2.  
  3. open System
  4. open System.Collections.Generic
  5. open System.Linq
  6. open System.IO
  7. open System.Text
  8. open System.Xml
  9. open System.Xml.Linq
  10.  
  11. open DocumentFormat.OpenXml
  12. open DocumentFormat.OpenXml.Packaging
  13. open DocumentFormat.OpenXml.Wordprocessing
  14.  
  15. open iTextSharp.text
  16. open iTextSharp.text.pdf
  17.  
  18. open MSDN.Hadoop.MapReduceBase
  19.  
  20. // Calculates the pages per author for a Word document
  21. type OfficePageMapper() =
  22.     inherit MapperBaseBinary()
  23.  
  24.     let (|WordDocument|PdfDocument|UnsupportedDocument|) extension =
  25.         if String.Equals(extension, ".docx", StringComparison.InvariantCultureIgnoreCase) then
  26.             WordDocument
  27.         else if String.Equals(extension, ".pdf", StringComparison.InvariantCultureIgnoreCase) then
  28.             PdfDocument
  29.         else
  30.             UnsupportedDocument
  31.  
  32.     let dc = XNamespace.Get("https://purl.org/dc/elements/1.1/")
  33.     let cp = XNamespace.Get("https://schemas.openxmlformats.org/package/2006/metadata/core-properties")
  34.     let unknownAuthor = "unknown author"
  35.     let authorKey = "Author"
  36.  
  37.     let getAuthorsWord (document:WordprocessingDocument) =
  38.         let coreFilePropertiesXDoc = XElement.Load(document.CoreFilePropertiesPart.GetStream())
  39.           
  40.         // Take the first dc:creator element and split based on a ";"
  41.         let creators = coreFilePropertiesXDoc.Elements(dc + "creator")
  42.         if Seq.isEmpty creators then
  43.             [| unknownAuthor |]
  44.         else
  45.             let creator = (Seq.head creators).Value
  46.             if String.IsNullOrWhiteSpace(creator) then
  47.                 [| unknownAuthor |]
  48.             else
  49.                 creator.Split(';')
  50.  
  51.     let getPagesWord (document:WordprocessingDocument) =
  52.         // return page count
  53.         Int32.Parse(document.ExtendedFilePropertiesPart.Properties.Pages.Text)
  54.  
  55.     let getAuthorsPdf (document:PdfReader) =          
  56.         // For PDF documents perform the split on a ","
  57.         if document.Info.ContainsKey(authorKey) then
  58.             let creators = document.Info.[authorKey]
  59.             if String.IsNullOrWhiteSpace(creators) then
  60.                 [| unknownAuthor |]
  61.             else
  62.                 creators.Split(',')
  63.         else
  64.             [| unknownAuthor |]
  65.  
  66.     let getPagesPdf (document:PdfReader) =
  67.         // return page count
  68.         document.NumberOfPages
  69.  
  70.     // Map the data from input name/value to output name/value
  71.     override self.Map (filename:string) (document:Stream) =
  72.  
  73.         let result =
  74.             match Path.GetExtension(filename) with
  75.             | WordDocument ->
  76.                 // Get access to the word processing document from the input stream
  77.                 use document = WordprocessingDocument.Open(document, false)
  78.                 // Process the word document with the mapper
  79.                 let pages = getPagesWord document
  80.                 let authors = (getAuthorsWord document)
  81.                 // close document
  82.                 document.Close()
  83.                 Some(pages, authors)
  84.             | PdfDocument ->
  85.                 // Get access to the pdf processing document from the input stream
  86.                 let document = new PdfReader(document)
  87.                 // Process the pdf document with the mapper
  88.                 let pages = getPagesPdf document
  89.                 let authors = (getAuthorsPdf document)       
  90.                 // close document
  91.                 document.Close()
  92.                 Some(pages, authors)
  93.             | UnsupportedDocument ->
  94.                 None
  95.  
  96.         if result.IsSome then
  97.             snd result.Value
  98.             |> Seq.map (fun author -> (author, (box << fst) result.Value))
  99.         else
  100.             Seq.empty
  101.  
  102. // Calculates the total pages per author
  103. type OfficePageReducer() =
  104.     inherit ReducerBase()
  105.  
  106.     override self.Reduce (key:string) (values:seq<string>) =
  107.         let totalPages =
  108.             values |>
  109.             Seq.fold (fun pages value -> pages + Int32.Parse(value)) 0
  110.  
  111.         Seq.singleton (key, box totalPages)

MSDN.Hadoop.Submission.Console.exe -input "office/documents" -output "office/authors"
-mapper "MSDN.Hadoop.MapReduceFSharp.OfficePageMapper, MSDN.Hadoop.MapReduceFSharp"
-reducer "MSDN.Hadoop.MapReduceFSharp.OfficePageReducer, MSDN.Hadoop.MapReduceFSharp"
-combiner "MSDN.Hadoop.MapReduceFSharp.OfficePageReducer, MSDN.Hadoop.MapReduceFSharp"
-file "%HOMEPATH%\MSDN.Hadoop.MapReduceFSharp\bin\Release\MSDN.Hadoop.MapReduceFSharp.dll"
-file "C:\Reference Assemblies\itextsharp.dll" -format Binary

Optional Parameters

To support some additional Hadoop Streaming options a few optional parameters are supported.

-numberReducers X

As expected this specifies the maximum number of reducers to use.

-debug

The option turns on verbose mode and specifies a job configuration to keep failed task outputs.

To view the the supported options one can use a help parameters, displaying:

Command Arguments:
-input (Required=true) : Input Directory or Files
-output (Required=true) : Output Directory
-mapper (Required=true) : Mapper Class
-reducer (Required=true) : Reducer Class
-combiner (Required=false) : Combiner Class (Optional)
-format (Required=false) : Input Format |Text(Default)|Binary|Xml|
-numberReducers (Required=false) : Number of Reduce Tasks (Optional)
-file (Required=true) : Processing Files (Must include Map and Reduce Class files)
-nodename (Required=false) : XML Processing Nodename (Optional)
-debug (Required=false) : Turns on Debugging Options

UI Submission

The provided submission framework works from a command-line. However there is nothing to stop one submitting the job using a UI; albeit a command console is opened. To this end I have put together a simple UI that supports submitting Hadoop jobs.

image

This simple UI supports all the necessary options for submitting jobs.

Code Download

As mentioned the actual Executables and Source code can be downloaded from:

https://code.msdn.microsoft.com/Framework-for-Composing-af656ef7

The source includes, not only the .Net submission framework, but also all necessary Java classes for supporting the Binary and XML job submissions. This relies on a custom Streaming JAR which should be copied to the Hadoop lib directory.

To use the code one just needs to reference the EXE’s in the Release directory. This folder also contains the MSDN.Hadoop.MapReduceBase.dll that contains the abstract base class definitions.

Moving Forward

Moving forward there a few considerations for the code, that I will be looking at over time:

Currently the abstract interfaces are all based on Object return types. Moving forward it would be beneficial if the types were based on Generics. This would allow a better serialization process. Currently value serialization is based on string representation of an objects value and the key is restricted to s string. Better serialization processes, such as Protocol Buffers, or .Net Serialization, would improve performance.

Currently the code only supports a single key value, although the multiple keys are supported by the streaming interface. Various options are available for dealing with multiple keys which will next be on my investigation list.

In a separate post I will cover what is actually happening under the covers.

If you find the code useful and/or use this for your MapReduce jobs, or just have some comments, please do let me know.