Compare commits

..

No commits in common. "actorbintree" and "wikipedia" have entirely different histories.

12 changed files with 326 additions and 338 deletions

4
.gitignore vendored
View File

@ -14,3 +14,7 @@ target/
# Dotty IDE
/.dotty-ide-artifact
/.dotty-ide.json
# datasets
stackoverflow-grading.csv
wikipedia-grading.dat

View File

@ -25,7 +25,7 @@ grade:
tags:
- cs206
image:
name: smarter3/moocs:reactive-actorbintree-2020-04-15
name: smarter3/moocs:bigdata-wikipedia-2020-04-30-3
entrypoint: [""]
allow_failure: true
before_script:

View File

@ -1,4 +1,9 @@
// Student tasks (i.e. submit, packageSubmission)
enablePlugins(StudentTasks)
courseraId := ch.epfl.lamp.CourseraId(
key = "EH8wby4kEeawURILfHIqjw",
itemId = "CfQX2",
premiumItemId = Some("QcWcs"),
partId = "5komc"
)

View File

@ -1,24 +1,20 @@
course := "reactive"
assignment := "actorbintree"
course := "bigdata"
assignment := "wikipedia"
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-a", "-v", "-s")
parallelExecution in Test := false
val akkaVersion = "2.6.0"
scalaVersion := "0.23.0-bin-20200211-5b006fb-NIGHTLY"
scalacOptions ++= Seq(
"-feature",
"-deprecation",
"-encoding", "UTF-8",
"-unchecked",
"-language:implicitConversions"
scalaVersion := "0.24.0-RC1"
scalacOptions ++= Seq("-language:implicitConversions", "-deprecation")
libraryDependencies ++= Seq(
"com.novocode" % "junit-interface" % "0.11" % Test,
("org.apache.spark" %% "spark-core" % "3.0.0-X1").withDottyCompat(scalaVersion.value),
)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test,
"com.novocode" % "junit-interface" % "0.11" % Test
).map(_.withDottyCompat(scalaVersion.value))
testSuite := "actorbintree.BinaryTreeSuite"
// Contains Spark 3 snapshot built against 2.13: https://github.com/smarter/spark/tree/scala-2.13
resolvers += Resolver.bintrayRepo("smarter", "maven")
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-a", "-v", "-s")
testSuite := "wikipedia.WikipediaSuite"
// Without forking, ctrl-c doesn't actually fully stop Spark
fork in run := true
fork in Test := true

Binary file not shown.

View File

View File

@ -1,189 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package actorbintree
import akka.actor._
import scala.collection.immutable.Queue
object BinaryTreeSet {
trait Operation {
def requester: ActorRef
def id: Int
def elem: Int
}
trait OperationReply {
def id: Int
}
/** Request with identifier `id` to insert an element `elem` into the tree.
* The actor at reference `requester` should be notified when this operation
* is completed.
*/
case class Insert(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request with identifier `id` to check whether an element `elem` is present
* in the tree. The actor at reference `requester` should be notified when
* this operation is completed.
*/
case class Contains(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request with identifier `id` to remove the element `elem` from the tree.
* The actor at reference `requester` should be notified when this operation
* is completed.
*/
case class Remove(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request to perform garbage collection */
case object GC
/** Holds the answer to the Contains request with identifier `id`.
* `result` is true if and only if the element is present in the tree.
*/
case class ContainsResult(id: Int, result: Boolean) extends OperationReply
/** Message to signal successful completion of an insert or remove operation. */
case class OperationFinished(id: Int) extends OperationReply
}
class BinaryTreeSet extends Actor {
import BinaryTreeSet._
import BinaryTreeNode._
def createRoot: ActorRef = context.actorOf(BinaryTreeNode.props(0, initiallyRemoved = true))
var root = createRoot
// optional
var pendingQueue = Queue.empty[Operation]
// optional
def receive = normal
// optional
/** Accepts `Operation` and `GC` messages. */
val normal: Receive = {
case op:Operation => root ! op
case GC => {
val newRoot = createRoot;
root ! CopyTo(newRoot)
context.become(garbageCollecting(newRoot))
}
}
// optional
/** Handles messages while garbage collection is performed.
* `newRoot` is the root of the new binary tree where we want to copy
* all non-removed elements into.
*/
def garbageCollecting(newRoot: ActorRef): Receive = {
case op:Operation => pendingQueue = pendingQueue.enqueue(op)
case CopyFinished =>
pendingQueue.foreach(newRoot ! _) //foreach preserves order of a queue (same as dequeueing)
root ! PoisonPill //Will also stop all of its children
pendingQueue = Queue.empty
root = newRoot;
context.become(normal)
//Ignore GC messages here
}
}
object BinaryTreeNode {
trait Position
case object Left extends Position
case object Right extends Position
case class CopyTo(treeNode: ActorRef)
case object CopyFinished
def props(elem: Int, initiallyRemoved: Boolean) = Props(classOf[BinaryTreeNode], elem, initiallyRemoved)
}
class BinaryTreeNode(val elem: Int, initiallyRemoved: Boolean) extends Actor {
import BinaryTreeNode._
import BinaryTreeSet._
var subtrees = Map[Position, ActorRef]()
var removed = initiallyRemoved
// optional
def receive = normal
def goDownTo(elem : Int) : Position = if(elem < this.elem) Left else Right
// optional
/** Handles `Operation` messages and `CopyTo` requests. */
val normal: Receive = {
case Insert (requester, id, elem) =>
if(elem == this.elem && !removed){
requester ! OperationFinished(id)
}else{
val nextPos = goDownTo(elem)
subtrees get nextPos match{
case Some(node) => node ! Insert(requester, id, elem)
case None => {
val newActorSubtree = (nextPos, context.actorOf(BinaryTreeNode.props(elem, false)))
subtrees = subtrees + newActorSubtree
requester ! OperationFinished(id);
}
}
}
case Contains(requester, id, elem) =>
if(elem == this.elem && !removed)
requester ! ContainsResult(id, true)
else{
//Need to search subtrees
subtrees get goDownTo(elem) match{
case Some(node) => node ! Contains(requester, id, elem)
case None => requester ! ContainsResult(id, false)
}
}
case Remove (requester, id, elem) =>
if(elem == this.elem && !removed){
removed = true
requester ! OperationFinished(id)
}else{
subtrees get goDownTo(elem) match{
case Some(node) => node ! Remove(requester, id, elem)
case None => requester ! OperationFinished(id) // (elem isn't in the tree)
}
}
case CopyTo(newRoot) =>
//We are already done, nothing to do
if(removed && subtrees.isEmpty) context.parent ! CopyFinished
else{
if(!removed) newRoot ! Insert(self, elem, elem)
subtrees.values foreach(_ ! CopyTo(newRoot)) //Copy subtrees elems
//val insertConfirmed = if(removed) true else false, hence we can simply pass removed
context.become(copying(subtrees.values.toSet, removed))
}
}
// optional
/** `expected` is the set of ActorRefs whose replies we are waiting for,
* `insertConfirmed` tracks whether the copy of this node to the new tree has been confirmed.
*/
def copying(expected: Set[ActorRef], insertConfirmed: Boolean): Receive = {
//To catch the insert of this node into the new tree beeing finished
case OperationFinished(_) => {
if(expected.isEmpty) context.parent ! CopyFinished
else context.become(copying(expected, true))
}
case CopyFinished => {
val newExp = expected-sender
if(insertConfirmed && newExp.isEmpty){
context.parent ! CopyFinished
}else{
context.become(copying(newExp, insertConfirmed))
}
}
}
}

View File

@ -0,0 +1,21 @@
package wikipedia
import scala.io.Source
object WikipediaData {
private[wikipedia] def lines: List[String] = {
Option(getClass.getResourceAsStream("/wikipedia/wikipedia-grading.dat")) match {
case None => sys.error("Please download the dataset as explained in the assignment instructions")
case Some(resource) => Source.fromInputStream(resource).getLines().toList
}
}
private[wikipedia] def parse(line: String): WikipediaArticle = {
val subs = "</title><text>"
val i = line.indexOf(subs)
val title = line.substring(14, i)
val text = line.substring(i + subs.length, line.length-16)
WikipediaArticle(title, text)
}
}

View File

@ -0,0 +1,115 @@
package wikipedia
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.{Logger, Level}
import org.apache.spark.rdd.RDD
case class WikipediaArticle(title: String, text: String) {
/**
* @return Whether the text of this article mentions `lang` or not
* @param lang Language to look for (e.g. "Scala")
*/
def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}
object WikipediaRanking extends WikipediaRankingInterface {
// Reduce Spark logging verbosity
Logger.getLogger("org").setLevel(Level.ERROR)
val langs = List(
"JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
"Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("My wiki app")
val sc: SparkContext = new SparkContext(conf)
// Hint: use a combination of `sc.parallelize`, `WikipediaData.lines` and `WikipediaData.parse`
val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(WikipediaData.lines).map(WikipediaData.parse);
/** Returns the number of articles on which the language `lang` occurs.
* Hint1: consider using method `aggregate` on RDD[T].
* Hint2: consider using method `mentionsLanguage` on `WikipediaArticle`
*/
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = {
def seqOp(acc : Int, art : WikipediaArticle): Int = {
val x = if(art.mentionsLanguage(lang)) 1 else 0
acc + x
}
rdd.aggregate(0 : Int)(seqOp, _ + _);
}
/* (1) Use `occurrencesOfLang` to compute the ranking of the languages
* (`val langs`) by determining the number of Wikipedia articles that
* mention each language at least once. Don't forget to sort the
* languages by their occurrence, in decreasing order!
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
langs.map((lang : String) => (lang, occurrencesOfLang(lang, rdd))).sortBy(_._2).reverse;
/* Compute an inverted index of the set of articles, mapping each language
* to the Wikipedia pages in which it occurs.
*/
def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
(for{
art <- rdd
lang <- langs if art.mentionsLanguage(lang)
}yield{(lang, art)}).groupByKey
}
/* (2) Compute the language ranking again, but now using the inverted index. Can you notice
* a performance improvement?
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
index.mapValues(_.size).collect.sortBy(_._2).reverse.toList
/* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.
* Can you notice an improvement in performance compared to measuring *both* the computation of the index
* and the computation of the ranking? If so, can you think of a reason?
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
val reducedRDD = (for{
art <- rdd
lang <- langs if art.mentionsLanguage(lang)
}yield{(lang, 1)}).reduceByKey(_ + _)
reducedRDD.collect.sortBy(_._2).reverse.toList
}
def main(args: Array[String]): Unit = {
/* Languages ranked according to (1) */
val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))
/* An inverted index mapping languages to wikipedia pages on which they appear */
def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
/* Languages ranked according to (2), using the inverted index */
val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))
/* Languages ranked according to (3) */
val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))
/* Output the speed of each ranking */
println(timing)
sc.stop()
}
val timing = new StringBuffer
def timed[T](label: String, code: => T): T = {
val start = System.currentTimeMillis()
val result = code
val stop = System.currentTimeMillis()
timing.append(s"Processing $label took ${stop - start} ms.\n")
result
}
}

View File

@ -0,0 +1,20 @@
package wikipedia
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
/**
* The interface used by the grading infrastructure. Do not change signatures
* or your submission will fail with a NoSuchMethodError.
*/
trait WikipediaRankingInterface {
def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])]
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int
def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)]
def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)]
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)]
def langs: List[String]
def sc: SparkContext
def wikiRdd: RDD[WikipediaArticle]
}

View File

@ -1,126 +0,0 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package actorbintree
import akka.actor.{ActorRef, ActorSystem, Props, actorRef2Scala, scala2ActorRef}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.junit.Test
import org.junit.Assert._
import scala.util.Random
import scala.concurrent.duration._
class BinaryTreeSuite extends TestKit(ActorSystem("BinaryTreeSuite")) with ImplicitSender {
import actorbintree.BinaryTreeSet._
def receiveN(requester: TestProbe, ops: Seq[Operation], expectedReplies: Seq[OperationReply]): Unit =
requester.within(5.seconds) {
val repliesUnsorted = for (i <- 1 to ops.size) yield try {
requester.expectMsgType[OperationReply]
} catch {
case ex: Throwable if ops.size > 10 => sys.error(s"failure to receive confirmation $i/${ops.size}\n$ex")
case ex: Throwable => sys.error(s"failure to receive confirmation $i/${ops.size}\nRequests:" + ops.mkString("\n ", "\n ", "") + s"\n$ex")
}
val replies = repliesUnsorted.sortBy(_.id)
if (replies != expectedReplies) {
val pairs = (replies zip expectedReplies).zipWithIndex filter (x => x._1._1 != x._1._2)
fail("unexpected replies:" + pairs.map(x => s"at index ${x._2}: got ${x._1._1}, expected ${x._1._2}").mkString("\n ", "\n ", ""))
}
}
def verify(probe: TestProbe, ops: Seq[Operation], expected: Seq[OperationReply]): Unit = {
val topNode = system.actorOf(Props[BinaryTreeSet])
ops foreach { op =>
topNode ! op
}
receiveN(probe, ops, expected)
// the grader also verifies that enough actors are created
}
@Test def `proper inserts and lookups (5pts)`(): Unit = {
val topNode = system.actorOf(Props[BinaryTreeSet])
topNode ! Contains(testActor, id = 1, 1)
expectMsg(ContainsResult(1, false))
topNode ! Insert(testActor, id = 2, 1)
topNode ! Contains(testActor, id = 3, 1)
expectMsg(OperationFinished(2))
expectMsg(ContainsResult(3, true))
()
}
@Test def `instruction example (5pts)`(): Unit = {
val requester = TestProbe()
val requesterRef = requester.ref
val ops = List(
Insert(requesterRef, id=100, 1),
Contains(requesterRef, id=50, 2),
Remove(requesterRef, id=10, 1),
Insert(requesterRef, id=20, 2),
Contains(requesterRef, id=80, 1),
Contains(requesterRef, id=70, 2)
)
val expectedReplies = List(
OperationFinished(id=10),
OperationFinished(id=20),
ContainsResult(id=50, false),
ContainsResult(id=70, true),
ContainsResult(id=80, false),
OperationFinished(id=100)
)
verify(requester, ops, expectedReplies)
}
@Test def `behave identically to built-in set (includes GC) (40pts)`(): Unit = {
val rnd = new Random()
def randomOperations(requester: ActorRef, count: Int): Seq[Operation] = {
def randomElement: Int = rnd.nextInt(100)
def randomOperation(requester: ActorRef, id: Int): Operation = rnd.nextInt(4) match {
case 0 => Insert(requester, id, randomElement)
case 1 => Insert(requester, id, randomElement)
case 2 => Contains(requester, id, randomElement)
case 3 => Remove(requester, id, randomElement)
}
for (seq <- 0 until count) yield randomOperation(requester, seq)
}
def referenceReplies(operations: Seq[Operation]): Seq[OperationReply] = {
var referenceSet = Set.empty[Int]
def replyFor(op: Operation): OperationReply = op match {
case Insert(_, seq, elem) =>
referenceSet = referenceSet + elem
OperationFinished(seq)
case Remove(_, seq, elem) =>
referenceSet = referenceSet - elem
OperationFinished(seq)
case Contains(_, seq, elem) =>
ContainsResult(seq, referenceSet(elem))
}
for (op <- operations) yield replyFor(op)
}
val requester = TestProbe()
val topNode = system.actorOf(Props[BinaryTreeSet])
val count = 1000
val ops = randomOperations(requester.ref, count)
val expectedReplies = referenceReplies(ops)
ops foreach { op =>
topNode ! op
if (rnd.nextDouble() < 0.1) topNode ! GC
}
receiveN(requester, ops, expectedReplies)
}
}

View File

@ -0,0 +1,142 @@
package wikipedia
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.junit._
class WikipediaSuite {
def initializeWikipediaRanking(): Boolean =
try {
WikipediaRanking
true
} catch {
case ex: Throwable =>
println(ex.getMessage)
ex.printStackTrace()
false
}
import WikipediaRanking._
/**
* Creates a truncated string representation of a list, adding ", ...)" if there
* are too many elements to show
* @param l The list to preview
* @param n The number of elements to cut it at
* @return A preview of the list, containing at most n elements.
*/
def previewList[A](l: List[A], n: Int = 10): String =
if (l.length <= n) l.toString
else l.take(n).toString.dropRight(1) + ", ...)"
/**
* Asserts that all the elements in a given list and an expected list are the same,
* regardless of order. For a prettier output, given and expected should be sorted
* with the same ordering.
* @param actual The actual list
* @param expected The expected list
* @tparam A Type of the list elements
*/
def assertSameElements[A](actual: List[A], expected: List[A]): Unit = {
val givenSet = actual.toSet
val expectedSet = expected.toSet
val unexpected = givenSet -- expectedSet
val missing = expectedSet -- givenSet
val noUnexpectedElements = unexpected.isEmpty
val noMissingElements = missing.isEmpty
val noMatchString =
s"""
|Expected: ${previewList(expected)}
|Actual: ${previewList(actual)}""".stripMargin
assert(noUnexpectedElements,
s"""|$noMatchString
|The given collection contains some unexpected elements: ${previewList(unexpected.toList, 5)}""".stripMargin)
assert(noMissingElements,
s"""|$noMatchString
|The given collection is missing some expected elements: ${previewList(missing.toList, 5)}""".stripMargin)
}
// Conditions:
// (1) the language stats contain the same elements
// (2) they are ordered (and the order doesn't matter if there are several languages with the same count)
def assertEquivalentAndOrdered(actual: List[(String, Int)], expected: List[(String, Int)]): Unit = {
// (1)
assertSameElements(actual, expected)
// (2)
assert(
!(actual zip actual.tail).exists({ case ((_, occ1), (_, occ2)) => occ1 < occ2 }),
"The given elements are not in descending order"
)
}
@Test def `'occurrencesOfLang' should work for (specific) RDD with one element`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val rdd = sc.parallelize(Seq(WikipediaArticle("title", "Java Jakarta")))
val res = (occurrencesOfLang("Java", rdd) == 1)
assert(res, "occurrencesOfLang given (specific) RDD with one element should equal to 1")
}
@Test def `'rankLangs' should work for RDD with two elements`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val langs = List("Scala", "Java")
val rdd = sc.parallelize(List(WikipediaArticle("1", "Scala is great"), WikipediaArticle("2", "Java is OK, but Scala is cooler")))
val ranked = rankLangs(langs, rdd)
val res = ranked.head._1 == "Scala"
assert(res)
}
@Test def `'makeIndex' creates a simple index with two entries`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val langs = List("Scala", "Java")
val articles = List(
WikipediaArticle("1","Groovy is pretty interesting, and so is Erlang"),
WikipediaArticle("2","Scala and Java run on the JVM"),
WikipediaArticle("3","Scala is not purely functional")
)
val rdd = sc.parallelize(articles)
val index = makeIndex(langs, rdd)
val res = index.count() == 2
assert(res)
}
@Test def `'rankLangsUsingIndex' should work for a simple RDD with three elements`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val langs = List("Scala", "Java")
val articles = List(
WikipediaArticle("1","Groovy is pretty interesting, and so is Erlang"),
WikipediaArticle("2","Scala and Java run on the JVM"),
WikipediaArticle("3","Scala is not purely functional")
)
val rdd = sc.parallelize(articles)
val index = makeIndex(langs, rdd)
val ranked = rankLangsUsingIndex(index)
val res = (ranked.head._1 == "Scala")
assert(res)
}
@Test def `'rankLangsReduceByKey' should work for a simple RDD with five elements`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val langs = List("Scala", "Java", "Groovy", "Haskell", "Erlang")
val articles = List(
WikipediaArticle("1","Groovy is pretty interesting, and so is Erlang"),
WikipediaArticle("2","Scala and Java run on the JVM"),
WikipediaArticle("3","Scala is not purely functional"),
WikipediaArticle("4","The cool kids like Haskell more than Java"),
WikipediaArticle("5","Java is for enterprise developers")
)
val rdd = sc.parallelize(articles)
val ranked = rankLangsReduceByKey(langs, rdd)
val res = (ranked.head._1 == "Java")
assert(res)
}
@Rule def individualTestTimeout = new org.junit.rules.Timeout(100 * 1000)
}