共用方式為


Co-occurrence Approach to an Item Based Recommender

For a while I thought I would tackle the problem of creating an item-based recommender. Firstly I will start with a local variant before moving onto a MapReduce version. The current version of the code can be found at:

https://code.msdn.microsoft.com/Co-occurrence-Approach-to-57027db7

The approach taken for the item-based recommender will be to define a co-occurrence matrix based on purchased items; products purchased on an order. As an example I will use the Sales data from the Adventure Works SQL Server sample database.. The algorithm/approach is best explained, and is implemented, using a matrix.

For the matrix implementation I will be using the Math.Net Numerics libraries. To install the libraries from NuGet one can use the Manage NuGet Packages browser, or run these commands in the Package Manager Console:

Install-Package MathNet.Numerics
Install-Package MathNet.Numerics.FSharp

Co-occurrence Sample

Lets start with a simple sample. Consider the following order summary:

Order ID Product ID1 Product ID2 Product ID3
500001 1001 1003 1005
500002 1001 1002 1003
500003 1002 1003  
500004 1003 1004 1005
500005 1003 1005  
500006 1003 1004  
500007 1002 1003  
500008 1001 1003 1004
500009 1003 1004 1005

If you look at the orders containing products 1002, you will see that there is 1 occurrence of product 1001 and 3 occurrences of product 1003. From this, one can deduce that if someone purchases product 1002 then it is likely that they will want to purchase 1003, and to a lesser extent product 1001. This concept forms the crux of the item-based recommender.

So if we computed the co-occurrence for every pair of products and placed them into a square matrix we would get the following:

Product ID 1001 1002 1003 1004 1005
1001   1 3 1 1
1002 1   3    
1003 3 3   4 4
1004 1   4   2
1005 1   4 2  

There are a few things we can say about such a matrix, other than it will be square. More than likely, for a large product set the matrix will be sparse, where the number of rows and columns is equal to the number of items. Also, each row (and column as the matrix is symmetric) expresses similarities between an item and all others.

Due to the diagonal symmetric nature of the matrix (axy = ayx) one can think of the rows and columns as vectors, where similarity between items X and Y is the same as the similarity between items Y and X.

The algorithm/approach I will be outlining will essentially be a means to build and query this co-occurrence matrix. Co-occurrence is like similarity, the more two items occur together (in this case in a shopping basket), the more they are probably related.

Working Data

Before talking about the code a word is warranted on how I have created the sample data.

As mentioned before I am using the Adventure Works database sales information. The approach I have taken is to export the sales detail lines, ordered by the sales order identifier, into flat files. The rationale for this being that the processing of the matrix can then occur with a low impact to the OLTP system. Also, the code will support the parallel processing of multiple files. Thus one can take the approach of just exporting more recent data and using the previously exported archived data to generate the matrix.

To support this process I have created a simple view for exporting the data:

CREATE VIEW [Sales].[vSalesSummary]
AS
    SELECT SOH.[SalesOrderID], CAST(SOH.[OrderDate] AS date) [OrderDate], SOD.[ProductID], SOD.[OrderQty]
    FROM [Sales].[SalesOrderHeader] SOH
    INNER JOIN [Sales].[SalesOrderDetail] SOD ON SOH.[SalesOrderID] = SOD.[SalesOrderID]
GO

I have then used the BCP command to export the data into a Tab delimited file:

bcp
  "SELECT [SalesOrderID], [OrderDate], [ProductID], [OrderQty] FROM [AdventureWorks2012].[Sales].[vSalesSummary] ORDER BY [SalesOrderID], [ProductID]"
  queryout SalesOrders.dat
  -T -S (local) -c

One could easily define a similar process where a file is generated for each month, with the latest month being updated on a nightly basis. One could then easily ensure only orders for the past X months are including in the metric. The only important aspect in how the data is generated is that the output must be ordered on the sales order identifier. As you will see later, this grouping is necessary to allow the co-occurrence data to be derived.

A sample output from the Adventure Works sales data is as follows, with the fields being tab delimited:

43659    2005-07-01    776    1
43659    2005-07-01    777    3
43659    2005-07-01    778    1
43660    2005-07-01    758    1
43660    2005-07-01    762    1
43661    2005-07-01    708    5
43661    2005-07-01    711    2
43661    2005-07-01    712    4
43661    2005-07-01    715    4
43661    2005-07-01    716    2
43661    2005-07-01    741    2
43661    2005-07-01    742    2
43661    2005-07-01    743    1
43661    2005-07-01    745    1
43662    2005-07-01    730    2
43662    2005-07-01    732    1
43662    2005-07-01    733    1
43662    2005-07-01    738    1

Once the data has been exported building the matrix becomes a matter of counting the co-occurrence of products associated with each sales order.

Building the Matrix

The problem of building a co-occurrence matrix is simply one of counting. The basic process is as follows:

  1. Read the file and group each sales order along with the corresponding list of product identifiers
  2. For each sales order product listing, calculate all the corresponding pairs of products
  3. Maintain/Update the running total of each pair of products found
  4. At the end of the file, output a sparse matrix based on the running totals of product pairs

To support parallelism steps 1-3 are run in parallel where the output of these steps will be a collection of the product pairs with the co-occurrence count. These collections are then combined to create a single sparse matrix.

In performing this processing it is important that the reading of the file data is such that it can efficiently create a sequence of products for each sales order identifier; the grouping key. If you have been following my Hadoop Streaming blog entries one will see that this is the same process undertaken in processing data within a reducer step.

The matrix building code, in full, is as follows:

  1. module MatrixBuilder =   
  2.  
  3.     // Configuration values
  4.     let qtyMaximum = 5.0                            // Maximum rating contribution for an item
  5.     let entryThreshold = 20.0                       // Minimum correlations for matrix inclusion
  6.  
  7.     let recentFactor = 2.0                          // Quantity increase factor for recent items
  8.     let baseDate = DateTime.Today.AddMonths(-3)     // Date for a recent item
  9.  
  10.     let matrixSize = 10000*500                      // Products*Correlations for Hash Table init
  11.  
  12.     // Gets the base data for building the sparse matrix
  13.     let private getMatrixData filename =
  14.        
  15.         let minItem = ref Int32.MaxValue
  16.         let maxItem = ref 0
  17.         let rcTable = Dictionary<(int*int), float>(matrixSize)
  18.  
  19.         // Define file reader properties
  20.         use reader = new StreamReader(Path.GetFullPath(filename))
  21.         let getInput() =
  22.             if reader.EndOfStream then
  23.                 None
  24.             else
  25.                 let input = reader.ReadLine()
  26.                 if not (String.IsNullOrEmpty(input)) then
  27.                     Some(Helpers.ParseInputData input)
  28.                 else
  29.                     None
  30.  
  31.         // calculates the pairs for an order
  32.         let rec pairs items = seq {   
  33.             match items with  
  34.             | head::tail -> for item in tail do yield head, item
  35.                             yield! pairs tail
  36.             | _ -> ()
  37.             }
  38.  
  39.         // Add a factor based on order properties - example recent orders
  40.         let orderFactor (order:OrderHeader) =
  41.             if DateTime.Compare(order.OrderDate, baseDate) > 0 then
  42.                 recentFactor
  43.             else
  44.                 1.0
  45.  
  46.         // Adds to the table
  47.         let addRowColumn idx1 idx2 qty =
  48.             minItem := min (min idx1 idx2) !minItem
  49.             maxItem := max (max idx1 idx2) !maxItem
  50.             if rcTable.ContainsKey (idx1, idx2) then
  51.                 rcTable.[(idx1, idx2)] <- rcTable.[(idx1, idx2)] + qty
  52.             else
  53.                 rcTable.[(idx1, idx2)] <- qty
  54.             ()
  55.  
  56.         // The main processor for the sequence for an order
  57.         let processOrder (header:OrderHeader) (orders:OrderDetail seq) =
  58.             let orderList = Seq.toList orders
  59.             // if order only has one product then no correlation can be determined
  60.             if (orderList.Length > 1) then                
  61.                 pairs orderList
  62.                 |> Seq.iter (fun (order1, order2) ->
  63.                     let qty = (min (float (max order1.OrderQty order2.OrderQty)) qtyMaximum) * (orderFactor header)
  64.                     addRowColumn order1.ProductId order2.ProductId qty
  65.                     addRowColumn order2.ProductId order1.ProductId qty)
  66.  
  67.         // Creates a sequence of the input based on the provided orderId
  68.         let lastInput = ref None
  69.         let continueDo = ref false
  70.         let inputsByKey key firstValue = seq {
  71.             // Yield any value from previous read
  72.             yield firstValue
  73.  
  74.             continueDo := true
  75.             while !continueDo do                
  76.                 match getInput() with
  77.                 | Some(orderDetail) when orderDetail.OrderId = key ->
  78.                     // Yield found value and remainder of sequence
  79.                     yield orderDetail   
  80.                 | Some(orderDetail) ->
  81.                     // Have a value but different key
  82.                     lastInput := Some(orderDetail)
  83.                     continueDo := false                                     
  84.                 | None ->
  85.                     // Have no more entries
  86.                     lastInput := None
  87.                     continueDo := false
  88.         }
  89.  
  90.         // Controls the calling of the matrix maker
  91.         let rec processInput (input:OrderDetail option) =
  92.             match input with
  93.             | Some(orderDetail) ->
  94.                 let header = {OrderHeader.OrderId = orderDetail.OrderId; OrderDate = orderDetail.OrderDate}
  95.                 processOrder header (inputsByKey orderDetail.OrderId orderDetail)
  96.                 processInput lastInput.contents
  97.             | _ -> ()
  98.  
  99.         // Build the matrix/table from the input data
  100.         processInput (getInput())
  101.  
  102.         // return the table defintion along with the size
  103.         (!minItem, !maxItem, rcTable)
  104.  
  105.     /// Build a Sparse matrix from the file data
  106.     let GetMatrixParallel (filenames:string array) =       
  107.  
  108.         // In Parallel gets the RC tables
  109.         let results =
  110.             filenames
  111.             |> Array.Parallel.map (fun filename -> getMatrixData filename)           
  112.         
  113.         // define the max sparse array size
  114.         let (minItem, maxItem) =
  115.             results
  116.             |> Array.fold (fun acc (idxMin, idxMax, _) ->
  117.                 (min idxMin (fst acc), max idxMax (snd acc))) (0, 0)
  118.  
  119.         let offset = minItem
  120.         let size = maxItem + 1 - minItem
  121.  
  122.         // convert to a sparse matrix and return
  123.         let items = seq {
  124.             for (_, _, rcTable) in results do
  125.                 for item in rcTable do
  126.                     if item.Value > entryThreshold then
  127.                         yield ((fst item.Key) - offset, (snd item.Key) - offset, item.Value)
  128.             }
  129.  
  130.         (offset, SparseMatrix.ofSeq size size items)
  131.  
  132.     /// Interface for a single file
  133.     let GetMatrix (filename:string) =
  134.  
  135.         let (minItem, maxItem, rcTable) = getMatrixData filename
  136.  
  137.         let offset = minItem
  138.         let size = maxItem + 1 - minItem
  139.  
  140.         // convert to a sparse matrix and return
  141.         let items = seq {
  142.             for item in rcTable do
  143.                 if item.Value > entryThreshold then
  144.                     yield ((fst item.Key) - offset, (snd item.Key) - offset, item.Value)
  145.             }                    
  146.  
  147.         (offset, SparseMatrix.ofSeq size size items)

In this instance the file data is processed such that order data is grouped and exposed as an OrderHeader type, along with a collection of OrderDetail types. From this the pairs of products are easily calculated; using the pairs function.

To maintain a co-occurrence count for each product pair a Dictionary is used. The key for the Dictionary is a tuple of the pair of products, with the value being the co-occurrence count. The rationale for using a Dictionary is that it supports O(1) lookups; whereas maintaining an Array will incur an O(n) lookup.

The final decision to be made is of what quantity should be added into the running total. One could use a value of 1 for all co-occurrences, but other options are available. The first is the order quantity. If one has a co-occurrence for products quantities x and y, I have defined the product quantity as (max x y). The rationale for this is that having product quantities is logically equivalent to having multiple product lines in the same order. If this was the case then the co-occurrence counting would arrive at the same value. However I have treated only the maximum amount as the contributing factor. I have also limited the maximum quantity amount such that small products ordered in large volumes do not skew the numbers; such as restricting a rating from 1-5.

One optional quantity factor I have included is one based on the order header. The sample code applies a quantity scaling factor for recent orders. Again the rationale for this is such that recent orders have a greater affect on the co-occurrence values over older ones. All these scaling factors are optional and should be configured to give your desired results.

As mentioned, to achieve parallelism in the matrix building code the collection of input files can be processed in parallel with each parallel step independently outing its collection of product pairs and co-occurrence counts. This is achieved using an Array.Parallel.map function call. This maps each input file into the Dictionary for the specified file. Once all the Dictionary elements have been created they are then used to create a sequence of elements to create the sparse matrix.

One other critical element returned from defining the Dictionary is the maximum product identifier. It is assumed that the product identifier is an integer which is used as an index into the matrix; for both row and column values. Thus the maximum value is needed to define row and columns dimensions for the sparse matrix.

Whereas this approach works well for products ranges from 1 onwards, what if product identifiers are defined from a large integer base, such as 100,000, or are Guid’s. In the former case one has the option of calculating an offset such that the index for the matrix is the product identifier minus the starting offset; this being the approach I have taken. The other option is that the exported data is mapped such that the product numbers are defined from 1 onwards; again this can be a simple offset calculation. In the latter case for Guid’s, one would need to do a mapping from the Guid keys to a sequential integer.

Querying for Item Similarity

So once the sparse matrix has been built how does one make recommendations. There are basically two options, either recommendations based on a single product selection, or recommendations for multiple products, say based on the contents of a shopping basket.

The basic process for performing the recommendations, in either case, is as follows:

  1. Select the sparse row vectors that correspond to the products for which a recommendation is required
  2. Place the row values into a PriorityQueue where the key is the co-occurrence count and the value the product identifier
  3. Dequeue and return, as a sequence, the required number of recommendations from the PriorityQueue

For the case of a single product the recommendations are just a case of selecting the correct single vector and returning the products with the highest co-occurrence count. The process for a selection of products is almost the same, except that multiple vectors are added into the PriorityQueue. One just needs to ensure that products that are already in the selection on which the recommendation is being made are excluded from the returned values. This is easily achieved with a HashSet lookup.

So the full code that wraps building the recommendation is as follows:

  1. type MatrixQuery (filenames:string array) =
  2.  
  3.     let defaultRecommendations = 12
  4.  
  5.     let (offset, coMatrix) =
  6.         match filenames with
  7.         | [||] -> invalidArg "filename" "Filename cannot be an empty Array"
  8.         | [| filename |] -> MatrixBuilder.GetMatrix(filename)
  9.         | _ -> MatrixBuilder.GetMatrixParallel(filenames)
  10.  
  11.     let getQueueSM (products:int array) =         
  12.         // Define the priority queue and lookup table
  13.         let queue = PriorityQueue(coMatrix.ColumnCount)
  14.         let lookup = HashSet(products)
  15.  
  16.         // Add the items into a priority queue
  17.         products
  18.         |> Array.iter (fun item ->
  19.             let itemIdx = item - offset
  20.             if itemIdx >= 0 && itemIdx < coMatrix.ColumnCount then
  21.                 seq {
  22.                     for idx = 0 to (coMatrix.ColumnCount - 1) do
  23.                         let productIdx = idx + offset
  24.                         if (not (lookup.Contains(productIdx))) && (coMatrix.[itemIdx, idx] > 0.0) then
  25.                             yield KeyValuePair(coMatrix.[itemIdx, idx], productIdx)
  26.                 }
  27.                 |> queue.Merge)
  28.         // Return the queue
  29.         queue
  30.  
  31.     let getItems (queue:PriorityQueue<float, int>) (items:int) =
  32.         let toDequeue = min items queue.Count
  33.         seq { for i in 1 .. toDequeue do yield queue.Dequeue().Value }
  34.         
  35.     new(filename:string) =
  36.         MatrixQuery([| filename |])
  37.  
  38.     /// Get the requested number of Recommendations for a Product
  39.     member self.GetRecommendations(product:int, items:int) =
  40.         let queue = getQueueSM([| product |])
  41.         getItems queue items
  42.  
  43.     /// Get the requested number of Recommendations for a Product Array
  44.     member self.GetRecommendations(products:int array, items:int) =
  45.         let queue = getQueueSM(products)
  46.         getItems queue items
  47.  
  48.     /// Get the default number of Recommendations for a Product
  49.     member self.GetRecommendations(product:int) =
  50.         self.GetRecommendations(product, defaultRecommendations)
  51.  
  52.     /// Get the default number of Recommendations for a Product Array
  53.     member self.GetRecommendations(products:int array) =
  54.         self.GetRecommendations(products, defaultRecommendations)

Using the code is a simply a matter of creating the MatrixQuery type, with the files to load, and then calling the GetRecommendations() operator for the required products (shopping basket):

let filenames = [|
    @"C:\DataExport\SalesOrders201203.dat";
    @"C:\DataExport\SalesOrders201204.dat";
    @"C:\DataExport\SalesOrders201205.dat";
    @"C:\DataExport\SalesOrders201206.dat" |]

let itemQuery = MatrixQuery(filenames)
let products = itemQuery.GetRecommendations([| 860; 870; 873 |], 25)

In extracting the row vector associated with the required product one could just have used coMatrix.Row(item); but this creates a copy of the vector. To avoid this the code just does an enumeration of the required matrix row. Internally the sparse vector maintains three separate arrays for the column and row offsets and sparse value. Using these internal arrays and the associated element based operations would speed up obtaining recommendations; but currently these properties are not exposed. If one uses the sparse matrix from the F# power Pack then one can operate in this fashion using the following code:

  1. let getQueueSM (products:int array) =         
  2.     // Define the priority queue and lookup table
  3.     let queue = PriorityQueue(coMatrix.NumCols)
  4.     let lookup = HashSet(products)
  5.  
  6.     // Add the items into a priority queue
  7.     products
  8.     |> Array.iter (fun item ->
  9.         let last = coMatrix.InternalSparseRowOffsets.Length - 1
  10.         if item >= 0 && item <= last then
  11.             let (startI, endI) =
  12.                 if item = last then (coMatrix.InternalSparseRowOffsets.[item], coMatrix.InternalSparseRowOffsets.[item])
  13.                 else (coMatrix.InternalSparseRowOffsets.[item], coMatrix.InternalSparseRowOffsets.[item + 1] - 1)
  14.             seq {
  15.                 for idx = startI to endI do                    
  16.                     if not (lookup.Contains(idx)) then
  17.                         let product = coMatrix.InternalSparseColumnValues.[idx]
  18.                         yield KeyValuePair(-coMatrix.InternalSparseValues.[idx], product)
  19.             }
  20.             |> queue.Merge
  21.         )
  22.     // Return the queue
  23.     queue

In considering a shopping basket I have assumed that each product has been selected only once. You could have the situation where one wanted to take into consideration the quantity of each product selected. In this instance one would take the approach of performing a scalar multiplication of the corresponding product vectors. What this would achieve is, for recommendations, prioritizing products for which a user has purchased multiple items.

Although this code does a lot of element-wise operations, as mentioned earlier, one can think of the operations in pure matrix terms. In this case one would just multiply the sparse matrix by a sparse column vector representing the items from which to make the recommendations.

Consider the previous sparse matrix example, and say one had a basket consisting of 2 products 1002 and 1 product 1004:

Product ID 1001 1002 1003 1004 1005   Product ID Qty   Product ID Rec
1001   1 3 1 1   1001     1001 3
1002 1   3       1002 2   1002  
1003 3 3   4 4 × 1003   = 1003 10
1004 1   4   2   1004 1   1004  
1005 1   4 2     1005     1005 2

In this case it should be easy to see the recommendation for products should be 1003, 1001, and lastly 1005.

Conclusion

The approach mentioned here will probably work well for the cases where one has 100,000’s products with similarities between 1000’s of items, with 10’s millions of orders to be considered. For most situations this should suffice. However, if you are not in this bracket, in my next post, I will show how this approach can be mapped over to Hadoop and MapReduce, allowing for even greater scale.

Also, in a future post I will port the code to use the Cloud Numerics implementation of matrices.