-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathColloborativeFilter.py
More file actions
73 lines (57 loc) · 2.71 KB
/
ColloborativeFilter.py
File metadata and controls
73 lines (57 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from pyspark import SparkContext
sc = SparkContext()
rawUserArtistData = sc.textFile('s3n://spark-bucket-am4810/audio_data/user_artist_data.txt')
rawUserArtistData.getNumPartitions()
userIDs = rawUserArtistData.map(lambda line: float(line.split()[0]))
artistIDs = rawUserArtistData.map(lambda line: float(line.split()[1]))
rawArtistData = sc.textFile('s3n://spark-bucket-am4810/audio_data/artist_data.txt')
def getArtistIDAndName(line):
"""Gets artist id and name from a line in artist_data.txt"""
# split about tab
tokens = line.split('\t')
try:
return [int(tokens[0]), tokens[1].strip()]
except Exception as e:
return None
def MapAliasToCanonical(line):
tokens = line.split('\t')
try:
return [int(tokens[0]), int(tokens[1])]
except Exception as e:
return None
id2name = rawArtistData.map(lambda line: getArtistIDAndName(line))
id2name = id2name.filter(lambda ele: ele is not None)
rawArtistAlias = sc.textFile('s3n://spark-bucket-am4810/audio_data/artist_alias.txt')
artistAlias = rawArtistAlias.map(lambda line: MapAliasToCanonical(line))
artistAlias = artistAlias.filter(lambda ele: ele is not None)
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
bArtistAlias = sc.broadcast(artistAlias.collectAsMap())
def replaceAlias(line):
"""Replaces alias with canonical artist id"""
userID, artistID, play_count = [int(ele) for ele in line.split()]
canonicalArtistID = bArtistAlias.value.get(artistID)
# If this artist id is mapped to a canonical artist id, use that, else this is already a canonical artist id
if(canonicalArtistID is not None):
artistID = canonicalArtistID
return Rating(userID, artistID, play_count)
trainData = rawUserArtistData.map(lambda line: replaceAlias(line)).cache()
model = ALS.trainImplicit(
ratings = trainData,
rank = 10,
iterations = 5,
lambda_ = 0.01,
alpha = 1.0)
## get recommendations for the userid given in book
recommendations = model.call("recommendProducts", 2093760, 10)
#
## make this list ordered to allow zipping
uniqueRecAristIDs = set(map(lambda x: x.product, recommendations))
uniqueRecArtistNames = id2name.filter(lambda artist: artist[0] in uniqueRecAristIDs).map(lambda artist: artist[1])
print("=================================== RESULTS =====================================")
print(uniqueRecAristIDs)
print("=================================================================================")
### write artistID, artistname to a file
##with open('s3n://spark-bucket-am4810/results.txt', 'w+') as f:
## results = zip(uniqueRecAristIDs, uniqueRecArtistNames.collect())
## for result in results:
## f.write('%s\n' % str(result))