diff --git a/src/main/scala/wikipedia/WikipediaRanking.scala b/src/main/scala/wikipedia/WikipediaRanking.scala index 1306b61..a035391 100644 --- a/src/main/scala/wikipedia/WikipediaRanking.scala +++ b/src/main/scala/wikipedia/WikipediaRanking.scala @@ -23,16 +23,22 @@ object WikipediaRanking extends WikipediaRankingInterface { "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS", "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy") - val conf: SparkConf = ??? - val sc: SparkContext = ??? + val conf: SparkConf = new SparkConf().setMaster("local").setAppName("My wiki app") + val sc: SparkContext = new SparkContext(conf) // Hint: use a combination of `sc.parallelize`, `WikipediaData.lines` and `WikipediaData.parse` - val wikiRdd: RDD[WikipediaArticle] = ??? + val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(WikipediaData.lines).map(WikipediaData.parse); /** Returns the number of articles on which the language `lang` occurs. * Hint1: consider using method `aggregate` on RDD[T]. * Hint2: consider using method `mentionsLanguage` on `WikipediaArticle` */ - def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = ??? + def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = { + def seqOp(acc : Int, art : WikipediaArticle): Int = { + val x = if(art.mentionsLanguage(lang)) 1 else 0 + acc + x + } + rdd.aggregate(0 : Int)(seqOp, _ + _); + } /* (1) Use `occurrencesOfLang` to compute the ranking of the languages * (`val langs`) by determining the number of Wikipedia articles that @@ -42,12 +48,18 @@ object WikipediaRanking extends WikipediaRankingInterface { * Note: this operation is long-running. It can potentially run for * several seconds. */ - def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = ??? + def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = + langs.map((lang : String) => (lang, occurrencesOfLang(lang, rdd))).sortBy(_._2).reverse; /* Compute an inverted index of the set of articles, mapping each language * to the Wikipedia pages in which it occurs. */ - def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = ??? + def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = { + (for{ + art <- rdd + lang <- langs if art.mentionsLanguage(lang) + }yield{(lang, art)}).groupByKey + } /* (2) Compute the language ranking again, but now using the inverted index. Can you notice * a performance improvement? @@ -55,7 +67,8 @@ object WikipediaRanking extends WikipediaRankingInterface { * Note: this operation is long-running. It can potentially run for * several seconds. */ - def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = ??? + def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = + index.mapValues(_.size).collect.sortBy(_._2).reverse.toList /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined. * Can you notice an improvement in performance compared to measuring *both* the computation of the index @@ -64,7 +77,13 @@ object WikipediaRanking extends WikipediaRankingInterface { * Note: this operation is long-running. It can potentially run for * several seconds. */ - def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = ??? + def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = { + val reducedRDD = (for{ + art <- rdd + lang <- langs if art.mentionsLanguage(lang) + }yield{(lang, 1)}).reduceByKey(_ + _) + reducedRDD.collect.sortBy(_._2).reverse.toList + } def main(args: Array[String]): Unit = {