Pages

Introduction to Recommendations with Map-Reduce and mrjob

Thursday, August 23, 2012


Hi all,

In this post I will present how can we use map-reduce programming model for making recommendations.   Recommender systems are quite popular among shopping sites and social network thee days. How do they do it ?   Generally, the user interaction data available from items and products in shopping sites and social networks are enough information to build a recommendation engine using classic techniques such as Collaborative Filtering.

Why Map-Reduce ?

MapReduce is a framework originally developed at Google that allows easy large scale distributed computing across a number of domains. Apache Hadoop is an open source implementation of it.  It scales well to many thousands of nodes and can handle petabytes of data. For recommendations where we have to find the similar products to a product you are interested at , we must calculate how similar pairs of items are. For instance, if someone watches the movie Matrix, the recommender would suggest the film Blade Runner. So we need to compute the similarity between two movies. One way is to find correlation between pairs of items.  But if you own a shopping site, which has 500,00 products, potentially we would have to compute over 250 billion computations. Besides the computation, the correlation data will be sparse, because it's unlikely that every pair of items will have some user interested in them. So we have a large and sparse dataset. And we have also to deal with temporal aspect since the user interest in products changes with time, so we need the correlation calculation done periodically so that the results are up to date.  For these reason the best way to handle with this scenarion and problem is going after a divide and conquer pattern, and MapReduce is a powerful framework and can be used to implement data mining algorithms.  You can take a look at this post about MapReduce or go to these video classes about Hadoop.

Map-Reduce Architecture



Meeting mrjob


mrjob is a Python package that helps you write and run Hadoop Streaming jobs. It supports Amazon's Elastic MapReduce(EMR) and it also works with your own Hadoop cluster.  It has been released as an open-source framework by Yelp and we will use it as interface for Hadoop since its legibility and ease to handle with MapReduce tasks.  Check this link to see how to to download and use it.


Movie Similarities


Imagine that you own a online movie business, and you want to suggest for your clients movie recommendations.  Your system runs a rating system, that is, people can rate movies with 1 to 5 starts, and we will assume for simplicity that all of the ratings are stored in a csv file somewhere.
Our goal is to calculate how similar pairs of movies are, so that we recommend movies similar to movies you liked.  Using the correlation we can:

  • For every pair of movies A and B, find all the people  who rated botha A and B.
  • Use these ratings to form a Movie A vector and a Movie B vector.
  • Calculate the correlation between those two vectors
  • When someone watches a movie, you can recommend the movies most correlated with it

So the first step is to get our movies file which has three columns:  (user, movie, rating). For this task we will use the MovieLens Dataset of Movie Ratings with 10.000 ratings from 1000 users on 1700 movies (you can download it at this link).

Here it is a sample of the dataset file after normalized.

We can make this file beautiful and searchable if this error is corrected: It looks like row 7 should actually have 1 column, instead of 2 in line 6.
196|Kolya (1996)|3
186|L.A. Confidential (1997)|3
22|Heavyweights (1994)|1
244|Legends of the Fall (1994)|2
166|Jackie Brown (1997)|1
298|Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)|4
115|Hunt for Red October, The (1990)|2
253|Jungle Book, The (1994)|5
305|Grease (1978)|3
view raw ratings.csv hosted with ❤ by GitHub



So let's start by reading the ratings into the MovieSimilarities job.

$ python moviesSimilarities.py ratings.csv > output.csv
view raw callcommand.sh hosted with ❤ by GitHub

You want to compute how similar pairs of movies are, so that if someone watches the movie The Matrix, you can recommend movies like BladeRunner. So how should you define the similarity between two movies ?

One possibility is to compute their correlation. The basic idea behind it is for every pair of movies A and B, find all the people who rated both A and B. Use these ratings to form a Movie A vector and a Movie B vector.  Then, calculate the correlation between these two vectors.  Now when someone watches a movie, you can now recommend him the movies most correlated with it.

So let's divide to conquer. Our first task is for each user, emit a row containing their 'postings' (item, rating). And for reducer, emit the user rating sum and count for use later steps.

def group_by_user_rating(self, key, line):
"""
Emit the user_id and group by their ratings (item and rating)
17 70,3
35 21,1
49 19,2
49 21,1
49 70,4
87 19,1
87 21,2
98 19,2
"""
user_id, item_id, rating = line.split('|')
#yield (item_id, int(rating)), user_id
#yield item_id, (user_id, int(rating))
yield user_id, (item_id, float(rating))
#yield (user_id, item_id), int(rating)
def count_ratings_users_freq(self, user_id, values):
"""
For each user, emit a row containing their "postings"
(item,rating pairs)
Also emit user rating sum and count for use later steps.
17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(19,2 21,1 70,4)
87 2,3,(19,1 21,2)
98 1,2,(19,2)
"""
item_count = 0
item_sum = 0
final = []
for item_id, rating in values:
item_count += 1
item_sum += rating
final.append((item_id, rating))
yield user_id, (item_count, item_sum, final)



Before using these rating pairs to calculate correlation,  let's see how we can compute it.  We know that they can be formed as vectors of ratings, so we can use linear algebra to perform norms and dot products, as alo to compute the length of each vector or the sum over all elements in each vector. By representing them as matrices, we can perform several operations on those movies.
def pairwise_items(self, user_id, values):
'''
The output drops the user from the key entirely, instead it emits
the pair of items as the key:
19,21 2,1
19,70 2,4
21,70 1,4
19,21 1,2
This mapper is the main performance bottleneck. One improvement
would be to create a java Combiner to aggregate the
outputs by key before writing to hdfs, another would be to use
a vector format and SequenceFiles instead of streaming text
for the matrix data.
'''
item_count, item_sum, ratings = values
#print item_count, item_sum, [r for r in combinations(ratings, 2)]
#bottleneck at combinations
for item1, item2 in combinations(ratings, 2):
yield (item1[0], item2[0]), \
(item1[1], item2[1])
def calculate_similarity(self, pair_key, lines):
'''
Sum components of each corating pair across all users who rated both
item x and item y, then calculate pairwise pearson similarity and
corating counts. The similarities are normalized to the [0,1] scale
because we do a numerical sort.
19,21 0.4,2
21,19 0.4,2
19,70 0.6,1
70,19 0.6,1
21,70 0.1,1
70,21 0.1,1
'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
item_pair, co_ratings = pair_key, lines
item_xname, item_yname = item_pair
for item_x, item_y in lines:
sum_xx += item_x * item_x
sum_yy += item_y * item_y
sum_xy += item_x * item_y
sum_y += item_y
sum_x += item_x
n += 1
similarity = normalized_correlation(n, sum_xy, sum_x, sum_y, \
sum_xx, sum_yy)
yield (item_xname, item_yname), (similarity, n)
To summarize, each row in calculate similarity will compute the number of people who rated both movie and movie2 , the sum over all elements in each ratings vectors (sum_x, sum_y) and the squared sum of each vector (sum_xx, sum__yy). So  we can now can calculate the correlation between the movies. The correlation can be expressed as:



So that's it! Now the last step of the job that will sort the top-correlated items for each item and print it to the output.

def calculate_ranking(self, item_keys, values):
'''
Emit items with similarity in key for ranking:
19,0.4 70,1
19,0.6 21,2
21,0.6 19,2
21,0.9 70,1
70,0.4 19,1
70,0.9 21,1
'''
similarity, n = values
item_x, item_y = item_keys
if int(n) > 0:
yield (item_x, similarity), (item_y, n)
def top_similar_items(self, key_sim, similar_ns):
'''
For each item emit K closest items in comma separated file:
De La Soul;A Tribe Called Quest;0.6;1
De La Soul;2Pac;0.4;2
'''
item_x, similarity = key_sim
for item_y, n in similar_ns:
print '%s;%s;%f;%d' % (item_x, item_y, similarity, n)
view raw ranking.py hosted with ❤ by GitHub

So let's see the output. Here's a sample of the top output I got:


MovieA MovieB Correlation
Return of the Jedi (1983) Empire Strikes Back, The (1980) 0.787655
Star Trek: The Motion Picture (1979) Star Trek III: The Search for Spock (1984) 0.758751
Star Trek: Generations (1994) Star Trek V: The Final Frontier (1989) 0.72042
Star Wars (1977) Return of the Jedi (1983) 0.687749
Star Trek VI: The Undiscovered Country (1991) Star Trek III: The Search for Spock (1984) 0.635803
Star Trek V: The Final Frontier (1989) Star Trek III: The Search for Spock (1984) 0.632764
Star Trek: Generations (1994) Star Trek: First Contact (1996) 0.602729
Star Trek: The Motion Picture (1979) Star Trek: First Contact (1996) 0.593454
Star Trek: First Contact (1996) Star Trek VI: The Undiscovered Country (1991) 0.546233
Star Trek V: The Final Frontier (1989) Star Trek: Generations (1994) 0.4693
Star Trek: Generations (1994) Star Trek: The Wrath of Khan (1982) 0.424847
Star Trek IV: The Voyage Home (1986) Empire Strikes Back, The (1980) 0.38947
Star Trek III: The Search for Spock (1984) Empire Strikes Back, The (1980) 0.371294
Star Trek IV: The Voyage Home (1986) Star Trek VI: The Undiscovered Country (1991) 0.360103
Star Trek: The Wrath of Khan (1982) Empire Strikes Back, The (1980) 0.35366
Stargate (1994) Star Trek: Generations (1994) 0.347169
Star Trek VI: The Undiscovered Country (1991) Empire Strikes Back, The (1980) 0.340193
Star Trek V: The Final Frontier (1989) Stargate (1994) 0.315828
Star Trek: The Wrath of Khan (1982) Star Trek VI: The Undiscovered Country (1991) 0.222516
Star Wars (1977) Star Trek: Generations (1994) 0.219273
Star Trek V: The Final Frontier (1989) Star Trek: The Wrath of Khan (1982) 0.180544
Stargate (1994) Star Wars (1977) 0.153285
Star Trek V: The Final Frontier (1989) Empire Strikes Back, The (1980) 0.084117



As we would expect we can notice that
  • Star Trek movies are similar to other Star Trek movies.
  • The people  who likes Star Trek movies are not so fans of Star Wars and vice-versa;
  • Star Wars Fans will be always fans! :D
  • The Sci-Fi movies are quite similar to each other;
  • Star Trek III: The Search for Spock (1984) is one the best movies of Star Trek (several positive correlations)

To see the full code, checkout the Github repository here.


Book Similarities


Let's see another dataset. What about Book Ratings ? Let's see this dataset of 1 million book ratings.   Here's again a sample of it:

272263;Harry Potter and the Goblet of Fire (Book 4);8
272786;Harry Potter and the Chamber of Secrets (Book 2);8
272786;Harry Potter and the Prisoner of Azkaban (Book 3);9
272786;Harry Potter and the Goblet of Fire (Book 4);9
272786;Harry Potter and the Sorcerer's Stone (Harry Potter (Paperback));9
272810;Harry Potter and the Chamber of Secrets (Book 2);8
272810;Harry Potter and the Prisoner of Azkaban (Book 3);9
272810;Harry Potter and the Goblet of Fire (Book 4);10
272810;Harry Potter and the Order of the Phoenix (Book 5);10
272810;Harry Potter and the Sorcerer's Stone (Book 1);8
273089;Harry Potter and the Prisoner of Azkaban (Book 3);10
273089;Harry Potter and the Goblet of Fire (Book 4);10
273934;Harry Potter and the Sorcerer's Stone (Harry Potter (Paperback));10
273976;Harry Potter and the Sorcerer's Stone (Book 1);7
274105;Harry Potter and the Prisoner of Azkaban (Book 3);9
view raw books.dat hosted with ❤ by GitHub



But now we want to compute other similarity measures besides correlation. Let's take a look on them.

Cossine Similarity
def cosine(dot_product, rating_norm_squared, rating2_norm_squared):
'''
The cosine between two vectors A, B
dotProduct(A, B) / (norm(A) * norm(B))
'''
numerator = dot_product
denominator = rating_norm_squared * rating2_norm_squared
return (numerator / (float(denominator))) if denominator else 0.0
view raw cosine.py hosted with ❤ by GitHub

Another common vector-based similarity measure.


Regularized Correlation
def regularized_correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared,
virtual_cont, prior_correlation):
'''
The Regularized Correlation between two vectors A, B
RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
where w = # actualPairs / (# actualPairs + # virtualPairs).
'''
unregularizedCorrelation = correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared)
w = size / float(size + virtual_cont)
return w * unregularizedCorrelation + (1.0 - w) * prior_correlation

We could use regularized correlation by adding N virtual movie pairs that have zero correlation. This helps avoid noise if some movie pairs have very few raters in common.

Jaccard 
def jaccard(users_in_common, total_users1, total_users2):
'''
The Jaccard Similarity between 2 two vectors
|Intersection(A, B)| / |Union(A, B)|
'''
union = total_users1 + total_users2 - users_in_common
return (users_in_common / (float(union))) if union else 0.0
view raw jaccard.py hosted with ❤ by GitHub
The implicit data can be useful. In some cases only because you rate a Toy Store movie, even if you rate it quite horribly, you can still be interested in similar animation movies.  So we can ignore the value itself of each rating and use a set-based similarity measure such as the Jaccard Similarity.


Now, let's add all those similarities to our mapreduce job and make some adjustments by making a new job for counting the number of raters for each movie. It will be required for computing the jaccard similarity.

#-*-coding: utf-8 -*-
'''
This module computes the number of movies rated by each
user.
'''
__author__ = 'Marcel Caraciolo <caraciol@gmail.com>'
from mrjob.job import MRJob
class MoviesCount(MRJob):
def mapper(self, key, line):
"""
Mapper: send score from a single movie to
other movies
"""
#user_id, item_id, rating = line.split('|')
#yield item_id, (user_id, rating, 1)
user_id, item_id, rating = line.split('|')
yield item_id, (user_id, float(rating))
def reducer(self, movie, values):
#yield(movie, sum(values))
total = 0
final = []
for user_id, rating in values:
total += 1
final.append((user_id, rating))
for user_id, rating in final:
yield '%s|%s|%.2f|%d' % (user_id, movie, rating, total), None
if __name__ == '__main__':
MoviesCount.run()
view raw movies_count.py hosted with ❤ by GitHub
#-*-coding: utf-8 -*-
'''
Given a dataset of movies and their ratings by different
users, how can we compute the similarity between pairs of
movies?
This module computes similarities between movies
by representing each movie as a vector of ratings and
computing similarity scores over these vectors.
'''
__author__ = 'Marcel Caraciolo <caraciol@gmail.com>'
from mrjob.job import MRJob
from metrics import correlation
from metrics import jaccard, cosine, regularized_correlation
from math import sqrt
try:
from itertools import combinations
except ImportError:
from metrics import combinations
PRIOR_COUNT = 10
PRIOR_CORRELATION = 0
class MoviesSimilarities(MRJob):
def steps(self):
return [self.mr(self.group_by_user_rating,
self.count_ratings_users_freq),
self.mr(self.pairwise_items, self.calculate_similarity),
self.mr(self.calculate_ranking, self.top_similar_items)
]
def group_by_user_rating(self, key, line):
"""
Emit the user_id and group by their ratings (item and rating)
17 70,3
35 21,1
49 19,2
49 21,1
49 70,4
87 19,1
87 21,2
98 19,2
"""
user_id, item_id, rating, ratings_count = line.split('|')
#yield (item_id, int(rating)), user_id
#yield item_id, (user_id, int(rating))
yield user_id, (item_id, float(rating), ratings_count)
#yield (user_id, item_id), int(rating)
def count_ratings_users_freq(self, user_id, values):
"""
For each user, emit a row containing their "postings"
(item,rating pairs)
Also emit user rating sum and count for use later steps.
17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(19,2 21,1 70,4)
87 2,3,(19,1 21,2)
98 1,2,(19,2)
"""
item_count = 0
item_sum = 0
final = []
for item_id, rating, ratings_count in values:
item_count += 1
item_sum += rating
final.append((item_id, rating, ratings_count))
yield user_id, (item_count, item_sum, final)
def pairwise_items(self, user_id, values):
'''
The output drops the user from the key entirely, instead it emits
the pair of items as the key:
19,21 2,1
19,70 2,4
21,70 1,4
19,21 1,2
This mapper is the main performance bottleneck. One improvement
would be to create a java Combiner to aggregate the
outputs by key before writing to hdfs, another would be to use
a vector format and SequenceFiles instead of streaming text
for the matrix data.
'''
item_count, item_sum, ratings = values
#print item_count, item_sum, [r for r in combinations(ratings, 2)]
#bottleneck at combinations
for item1, item2 in combinations(ratings, 2):
yield (item1[0], item2[0]), \
(item1[1], item2[1], item1[2], item2[2])
def calculate_similarity(self, pair_key, lines):
'''
Sum components of each corating pair across all users who rated both
item x and item y, then calculate pairwise pearson similarity and
corating counts. The similarities are normalized to the [0,1] scale
because we do a numerical sort.
19,21 0.4,2
21,19 0.4,2
19,70 0.6,1
70,19 0.6,1
21,70 0.1,1
70,21 0.1,1
'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
n_x, n_y = 0, 0
item_pair, co_ratings = pair_key, lines
item_xname, item_yname = item_pair
for item_x, item_y, nx_count, ny_count in lines:
sum_xx += item_x * item_x
sum_yy += item_y * item_y
sum_xy += item_x * item_y
sum_y += item_y
sum_x += item_x
n += 1
n_x = int(ny_count)
n_y = int(nx_count)
corr_sim = correlation(n, sum_xy, sum_x, \
sum_y, sum_xx, sum_yy)
reg_corr_sim = regularized_correlation(n, sum_xy, sum_x, \
sum_y, sum_xx, sum_yy, PRIOR_COUNT, PRIOR_CORRELATION)
cos_sim = cosine(sum_xy, sqrt(sum_xx), sqrt(sum_yy))
jaccard_sim = jaccard(n, n_x, n_y)
yield (item_xname, item_yname), (corr_sim, \
cos_sim, reg_corr_sim, jaccard_sim, n)
def calculate_ranking(self, item_keys, values):
'''
Emit items with similarity in key for ranking:
19,0.4 70,1
19,0.6 21,2
21,0.6 19,2
21,0.9 70,1
70,0.4 19,1
70,0.9 21,1
'''
corr_sim, cos_sim, reg_corr_sim, jaccard_sim, n = values
item_x, item_y = item_keys
if int(n) > 0:
yield (item_x, corr_sim, cos_sim, reg_corr_sim, jaccard_sim), \
(item_y, n)
def top_similar_items(self, key_sim, similar_ns):
'''
For each item emit K closest items in comma separated file:
De La Soul;A Tribe Called Quest;0.6;1
De La Soul;2Pac;0.4;2
'''
item_x, corr_sim, cos_sim, reg_corr_sim, jaccard_sim = key_sim
for item_y, n in similar_ns:
yield '%s;%s;%f;%f;%f;%f;%d' % (item_x, item_y, corr_sim, cos_sim,
reg_corr_sim, jaccard_sim, n), None
if __name__ == '__main__':
MoviesSimilarities.run()
Ok,  let's take a look at the book similarities now with those new fields.



BookA BookB Correlation Cossine Reg Corr Jaccard Mutual Raters
The Return of the King (The Lord of The Rings, Part 3)The Voyage of the Dawn Treader (rack) (Narnia) 0 0.998274 0 0.068966 2
The Return of the King (The Lord of the Rings, Part 3) The Man in the Black Suit : 4 Dark Tales 0 1 0 0.058824 6
The Fellowship of the Ring (The Lord of the Rings, Part 1) The Hobbit : The Enchanting Prelude to The Lord of the Rings 0.796478 0.997001 0.49014 0.045714 16
The Two Towers (The Lord of the Rings, Part 2) Harry Potter and the Prisoner of Azkaban (Book 3) -0.184302 0.992536 -0.087301 0.022277 9
Disney's 101 Dalmatians (Golden Look-Look Books) Walt Disney's Lady and the Tramp (Little Golden Book) 0.88383 1 0.45999 0.166667 5
Disney's 101 Dalmatians (Golden Look-Look Books) Disney's Beauty and the Beast (Golden Look-Look Book) 0.76444 1 0.2339 0.166667 7
Disney's Pocahontas (Little Golden Book) Disney's the Lion King (Little Golden Book) 0.54595 1 0.6777 0.1 4
Disney's the Lion King (Disney Classic Series) Walt Disney Pictures presents The rescuers downunder (A Little golden book) 0.34949 1 0.83833 0.142857 3
Harry Potter and the Order of the Phoenix (Book 5) Harry Potter and the Goblet of Fire (Book 4) 0.673429 0.994688 0.559288 0.119804 49
Harry Potter and the Chamber of Secrets (Book 2) Harry Potter and the Goblet of Fire (Book 4) 0.555423 0.993299 0.496957 0.17418 85
The Return of the King (The Lord of The Rings, Part 3) Harry Potter and the Goblet of Fire (Book 4) -0.2343 0.02022 -0.08383 0.015444 4




  • Lord of The Rings, books are similar to other Lord of The Ring books
  • Walt Disney books are similar to other Walt Disney books. 
  • Lord of The Ring books does not stick together Harry Potter books.

The possibilities are endless.

But is it possible to generalize our input and make our code to generate similarities for different inputs ? Yes it is.  Let's abstract our input. For this, we will create a VectorSimilarities Class that represents input data in the following format:

class VectorSimilarities(MRJob):
def steps(self):
return [self.mr(self.input,
self.group_by_user_rating),
self.mr(None, self.count_ratings_users_freq),
self.mr(self.pairwise_items, self.calculate_similarity),
self.mr(self.calculate_ranking, self.top_similar_items)
]
def configure_options(self):
super(VectorSimilarities, self).configure_options()
self.add_passthrough_option(
'--priorcount', dest='prior_count', default=10, type='int',
help='PRIOR_COUNT: Parameter to regularize correlation')
self.add_passthrough_option(
'--priorcorrelation', dest='prior_correlation', default=0,
type='int',
help='PRIOR_CORRELATION: Parameter to regularize correlation')
self.add_passthrough_option(
'--minraters', dest='min_num_raters', default=3, type='int',
help='the minimum number of raters')
self.add_passthrough_option(
'--maxraters', dest='max_num_raters', default=10000, type='int',
help='the maximum number of raters')
self.add_passthrough_option(
'--minintersec', dest='min_intersection', default=0, type='int',
help='the minimum intersection')
def input(self, key, line):
'''
Subclasses should override this to define their own input
'''
raise NotImplementedError('Implement this in the subclass')
...

So if we want to define a new input format, just subclass the VectorSimilarities class and implement the method input.

So here's the class for the book recommendations using our new VectorSimilarities.

from vectorSimilarities import VectorSimilarities
class BookSimilarities(VectorSimilarities):
def input(self, key, line):
user_id, item_id, rating = line.split(';')
yield item_id, (user_id, float(rating))
if __name__ == '__main__':
BookSimilarities.run()
And here's the class for the movies recommendations. It simply reads from a data file and lets the VectorSimilarities superclass do the work.
from vectorSimilarities import VectorSimilarities
class MovieSimilarities(VectorSimilarities):
def input(self, key, line):
user_id, item_id, rating = line.split('|')
yield item_id, (user_id, float(rating))
if __name__ == '__main__':
MovieSimilarities.run()


Conclusions

As you noticed map-reduce is a powerful technique for numerical computation and speacially when you have to compute large datasets. There are several optimization I can do in those scripts such as numpy vectorizations for computing the similarities. I will explore more these features in the next posts: one handling with recommender systems and popular social networks as also how you can use the Amazon EMR infrastructure to compute your jobs!

I'd like to thank Edwin Chen and his post using those examples with Scala and whose post inspired me to explore these examples above in Python.

All code for those examples above can be downloaded at my github repository.

Stay tunned,

I hope you enjoyed this article,

Best regards,

Marcel Caraciolo
Page 1 of 36123...36 Next Page