Add wikipedia assignment

This commit is contained in:
Guillaume Martres 2019-02-19 20:44:23 +01:00
commit e408ad50ae
16 changed files with 744 additions and 0 deletions

20
.gitignore vendored Normal file
View File

@ -0,0 +1,20 @@
# General
*.DS_Store
*.swp
*~
# Dotty
*.class
*.tasty
*.hasTasty
# sbt
target/
# Dotty IDE
/.dotty-ide-artifact
/.dotty-ide.json
# datasets
stackoverflow-grading.csv
wikipedia-grading.dat

36
.gitlab-ci.yml Normal file
View File

@ -0,0 +1,36 @@
# DO NOT EDIT THIS FILE
stages:
- build
- grade
compile:
stage: build
image: lampepfl/moocs:dotty-2020-02-12
except:
- tags
tags:
- cs206
script:
- sbt packageSubmission
artifacts:
expire_in: 1 day
paths:
- submission.jar
grade:
stage: grade
except:
- tags
tags:
- cs206
image:
name: smarter3/moocs:bigdata-wikipedia-2020-04-30-3
entrypoint: [""]
allow_failure: true
before_script:
- mkdir -p /shared/submission/
- cp submission.jar /shared/submission/submission.jar
script:
- cd /grader
- /grader/grade | /grader/feedback-printer

8
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,8 @@
{
"dotty": {
"trace": {
"remoteTracingUrl": "wss://lamppc36.epfl.ch/dotty-remote-tracer/upload/lsp.log",
"server": { "format": "JSON", "verbosity": "verbose" }
}
}
}

9
assignment.sbt Normal file
View File

@ -0,0 +1,9 @@
// Student tasks (i.e. submit, packageSubmission)
enablePlugins(StudentTasks)
courseraId := ch.epfl.lamp.CourseraId(
key = "EH8wby4kEeawURILfHIqjw",
itemId = "CfQX2",
premiumItemId = Some("QcWcs"),
partId = "5komc"
)

20
build.sbt Normal file
View File

@ -0,0 +1,20 @@
course := "bigdata"
assignment := "wikipedia"
scalaVersion := "0.24.0-RC1"
scalacOptions ++= Seq("-language:implicitConversions", "-deprecation")
libraryDependencies ++= Seq(
"com.novocode" % "junit-interface" % "0.11" % Test,
("org.apache.spark" %% "spark-core" % "3.0.0-X1").withDottyCompat(scalaVersion.value),
)
// Contains Spark 3 snapshot built against 2.13: https://github.com/smarter/spark/tree/scala-2.13
resolvers += Resolver.bintrayRepo("smarter", "maven")
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-a", "-v", "-s")
testSuite := "wikipedia.WikipediaSuite"
// Without forking, ctrl-c doesn't actually fully stop Spark
fork in run := true
fork in Test := true

BIN
grading-tests.jar Normal file

Binary file not shown.

View File

@ -0,0 +1,46 @@
package ch.epfl.lamp
import sbt._
import sbt.Keys._
/**
* Coursera uses two versions of each assignment. They both have the same assignment key and part id but have
* different item ids.
*
* @param key Assignment key
* @param partId Assignment partId
* @param itemId Item id of the non premium version
* @param premiumItemId Item id of the premium version (`None` if the assignment is optional)
*/
case class CourseraId(key: String, partId: String, itemId: String, premiumItemId: Option[String])
/**
* Settings shared by all assignments, reused in various tasks.
*/
object MOOCSettings extends AutoPlugin {
object autoImport {
val course = SettingKey[String]("course")
val assignment = SettingKey[String]("assignment")
val options = SettingKey[Map[String, Map[String, String]]]("options")
val courseraId = settingKey[CourseraId]("Coursera-specific information identifying the assignment")
val testSuite = settingKey[String]("Fully qualified name of the test suite of this assignment")
// Convenient alias
type CourseraId = ch.epfl.lamp.CourseraId
val CourseraId = ch.epfl.lamp.CourseraId
}
import autoImport._
override val globalSettings: Seq[Def.Setting[_]] = Seq(
// supershell is verbose, buggy and useless.
useSuperShell := false
)
override val projectSettings: Seq[Def.Setting[_]] = Seq(
parallelExecution in Test := false,
// Report test result after each test instead of waiting for every test to finish
logBuffered in Test := false,
name := s"${course.value}-${assignment.value}"
)
}

318
project/StudentTasks.scala Normal file
View File

@ -0,0 +1,318 @@
package ch.epfl.lamp
import sbt._
import Keys._
// import scalaj.http._
import java.io.{File, FileInputStream, IOException}
import org.apache.commons.codec.binary.Base64
// import play.api.libs.json.{Json, JsObject, JsPath}
import scala.util.{Failure, Success, Try}
/**
* Provides tasks for submitting the assignment
*/
object StudentTasks extends AutoPlugin {
override def requires = super.requires && MOOCSettings
object autoImport {
val packageSourcesOnly = TaskKey[File]("packageSourcesOnly", "Package the sources of the project")
val packageBinWithoutResources = TaskKey[File]("packageBinWithoutResources", "Like packageBin, but without the resources")
val packageSubmissionZip = TaskKey[File]("packageSubmissionZip")
val packageSubmission = inputKey[Unit]("package solution as an archive file")
val runGradingTests = taskKey[Unit]("run black-box tests used for final grading")
}
import autoImport._
import MOOCSettings.autoImport._
override lazy val projectSettings = Seq(
packageSubmissionSetting,
// submitSetting,
runGradingTestsSettings,
fork := true,
connectInput in run := true,
outputStrategy := Some(StdoutOutput),
) ++ packageSubmissionZipSettings
lazy val runGradingTestsSettings = runGradingTests := {
val testSuiteJar = "grading-tests.jar"
if (!new File(testSuiteJar).exists) {
throw new MessageOnlyException(s"Could not find tests JarFile: $testSuiteJar")
}
val classPath = s"${(Test / dependencyClasspath).value.map(_.data).mkString(File.pathSeparator)}${File.pathSeparator}$testSuiteJar"
val junitProcess =
Fork.java.fork(
ForkOptions(),
"-cp" :: classPath ::
"org.junit.runner.JUnitCore" ::
(Test / testSuite).value ::
Nil
)
// Wait for tests to complete.
junitProcess.exitValue()
}
/** **********************************************************
* SUBMITTING A SOLUTION TO COURSERA
*/
val packageSubmissionZipSettings = Seq(
packageSubmissionZip := {
val submission = crossTarget.value / "submission.zip"
val sources = (packageSourcesOnly in Compile).value
val binaries = (packageBinWithoutResources in Compile).value
IO.zip(Seq(sources -> "sources.zip", binaries -> "binaries.jar"), submission)
submission
},
artifactClassifier in packageSourcesOnly := Some("sources"),
artifact in (Compile, packageBinWithoutResources) ~= (art => art.withName(art.name + "-without-resources"))
) ++
inConfig(Compile)(
Defaults.packageTaskSettings(packageSourcesOnly, Defaults.sourceMappings) ++
Defaults.packageTaskSettings(packageBinWithoutResources, Def.task {
val relativePaths =
(unmanagedResources in Compile).value.flatMap(Path.relativeTo((unmanagedResourceDirectories in Compile).value)(_))
(mappings in (Compile, packageBin)).value.filterNot { case (_, path) => relativePaths.contains(path) }
})
)
val maxSubmitFileSize = {
val mb = 1024 * 1024
10 * mb
}
/** Check that the jar exists, isn't empty, isn't crazy big, and can be read
* If so, encode jar as base64 so we can send it to Coursera
*/
def prepareJar(jar: File, s: TaskStreams): String = {
val errPrefix = "Error submitting assignment jar: "
val fileLength = jar.length()
if (!jar.exists()) {
s.log.error(errPrefix + "jar archive does not exist\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength == 0L) {
s.log.error(errPrefix + "jar archive is empty\n" + jar.getAbsolutePath)
failSubmit()
} else if (fileLength > maxSubmitFileSize) {
s.log.error(errPrefix + "jar archive is too big. Allowed size: " +
maxSubmitFileSize + " bytes, found " + fileLength + " bytes.\n" +
jar.getAbsolutePath)
failSubmit()
} else {
val bytes = new Array[Byte](fileLength.toInt)
val sizeRead = try {
val is = new FileInputStream(jar)
val read = is.read(bytes)
is.close()
read
} catch {
case ex: IOException =>
s.log.error(errPrefix + "failed to read sources jar archive\n" + ex.toString)
failSubmit()
}
if (sizeRead != bytes.length) {
s.log.error(errPrefix + "failed to read the sources jar archive, size read: " + sizeRead)
failSubmit()
} else encodeBase64(bytes)
}
}
/** Task to package solution to a given file path */
lazy val packageSubmissionSetting = packageSubmission := {
val args: Seq[String] = Def.spaceDelimited("[path]").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val base64Jar = prepareJar(jar, s)
val path = args.headOption.getOrElse((baseDirectory.value / "submission.jar").absolutePath)
scala.tools.nsc.io.File(path).writeAll(base64Jar)
}
/*
/** Task to submit a solution to coursera */
val submit = inputKey[Unit]("submit solution to Coursera")
lazy val submitSetting = submit := {
// Fail if scalafix linting does not pass.
scalafixLinting.value
val args: Seq[String] = Def.spaceDelimited("<arg>").parsed
val s: TaskStreams = streams.value // for logging
val jar = (packageSubmissionZip in Compile).value
val assignmentDetails =
courseraId.?.value.getOrElse(throw new MessageOnlyException("This assignment can not be submitted to Coursera because the `courseraId` setting is undefined"))
val assignmentKey = assignmentDetails.key
val courseName =
course.value match {
case "capstone" => "scala-capstone"
case "bigdata" => "scala-spark-big-data"
case other => other
}
val partId = assignmentDetails.partId
val itemId = assignmentDetails.itemId
val premiumItemId = assignmentDetails.premiumItemId
val (email, secret) = args match {
case email :: secret :: Nil =>
(email, secret)
case _ =>
val inputErr =
s"""|Invalid input to `submit`. The required syntax for `submit` is:
|submit <email-address> <submit-token>
|
|The submit token is NOT YOUR LOGIN PASSWORD.
|It can be obtained from the assignment page:
|https://www.coursera.org/learn/$courseName/programming/$itemId
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
""".stripMargin
s.log.error(inputErr)
failSubmit()
}
val base64Jar = prepareJar(jar, s)
val json =
s"""|{
| "assignmentKey":"$assignmentKey",
| "submitterEmail":"$email",
| "secret":"$secret",
| "parts":{
| "$partId":{
| "output":"$base64Jar"
| }
| }
|}""".stripMargin
def postSubmission[T](data: String): Try[HttpResponse[String]] = {
val http = Http("https://www.coursera.org/api/onDemandProgrammingScriptSubmissions.v1")
val hs = List(
("Cache-Control", "no-cache"),
("Content-Type", "application/json")
)
s.log.info("Connecting to Coursera...")
val response = Try(http.postData(data)
.headers(hs)
.option(HttpOptions.connTimeout(10000)) // scalaj default timeout is only 100ms, changing that to 10s
.asString) // kick off HTTP POST
response
}
val connectMsg =
s"""|Attempting to submit "${assignment.value}" assignment in "$courseName" course
|Using:
|- email: $email
|- submit token: $secret""".stripMargin
s.log.info(connectMsg)
def reportCourseraResponse(response: HttpResponse[String]): Unit = {
val code = response.code
val respBody = response.body
/* Sample JSON response from Coursera
{
"message": "Invalid email or token.",
"details": {
"learnerMessage": "Invalid email or token."
}
}
*/
// Success, Coursera responds with 2xx HTTP status code
if (response.is2xx) {
val successfulSubmitMsg =
s"""|Successfully connected to Coursera. (Status $code)
|
|Assignment submitted successfully!
|
|You can see how you scored by going to:
|https://www.coursera.org/learn/$courseName/programming/$itemId/
|${
premiumItemId.fold("") { id =>
s"""or (for premium learners):
|https://www.coursera.org/learn/$courseName/programming/$id
""".stripMargin
}
}
|and clicking on "My Submission".""".stripMargin
s.log.info(successfulSubmitMsg)
}
// Failure, Coursera responds with 4xx HTTP status code (client-side failure)
else if (response.is4xx) {
val result = Try(Json.parse(respBody)).toOption
val learnerMsg = result match {
case Some(resp: JsObject) =>
(JsPath \ "details" \ "learnerMessage").read[String].reads(resp).get
case Some(x) => // shouldn't happen
"Could not parse Coursera's response:\n" + x
case None =>
"Could not parse Coursera's response:\n" + respBody
}
val failedSubmitMsg =
s"""|Submission failed.
|There was something wrong while attempting to submit.
|Coursera says:
|$learnerMsg (Status $code)""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera responds with 5xx HTTP status code (server-side failure)
else if (response.is5xx) {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera seems to be unavailable at the moment (Status $code)
|Check https://status.coursera.org/ and try again in a few minutes.
""".stripMargin
s.log.error(failedSubmitMsg)
}
// Failure, Coursera repsonds with an unexpected status code
else {
val failedSubmitMsg =
s"""|Submission failed.
|Coursera replied with an unexpected code (Status $code)
""".stripMargin
s.log.error(failedSubmitMsg)
}
}
// kick it all off, actually make request
postSubmission(json) match {
case Success(resp) => reportCourseraResponse(resp)
case Failure(e) =>
val failedConnectMsg =
s"""|Connection to Coursera failed.
|There was something wrong while attempting to connect to Coursera.
|Check your internet connection.
|${e.toString}""".stripMargin
s.log.error(failedConnectMsg)
}
}
*/
def failSubmit(): Nothing = {
sys.error("Submission failed")
}
/**
* *****************
* DEALING WITH JARS
*/
def encodeBase64(bytes: Array[Byte]): String =
new String(Base64.encodeBase64(bytes))
}

1
project/build.properties Normal file
View File

@ -0,0 +1 @@
sbt.version=1.3.8

View File

@ -0,0 +1,5 @@
// Used for Coursera submission (StudentPlugin)
// libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
// libraryDependencies += "com.typesafe.play" %% "play-json" % "2.7.4"
// Used for Base64 (StudentPlugin)
libraryDependencies += "commons-codec" % "commons-codec" % "1.10"

2
project/plugins.sbt Normal file
View File

@ -0,0 +1,2 @@
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.28")
addSbtPlugin("ch.epfl.lamp" % "sbt-dotty" % "0.4.0")

View File

View File

@ -0,0 +1,21 @@
package wikipedia
import scala.io.Source
object WikipediaData {
private[wikipedia] def lines: List[String] = {
Option(getClass.getResourceAsStream("/wikipedia/wikipedia-grading.dat")) match {
case None => sys.error("Please download the dataset as explained in the assignment instructions")
case Some(resource) => Source.fromInputStream(resource).getLines().toList
}
}
private[wikipedia] def parse(line: String): WikipediaArticle = {
val subs = "</title><text>"
val i = line.indexOf(subs)
val title = line.substring(14, i)
val text = line.substring(i + subs.length, line.length-16)
WikipediaArticle(title, text)
}
}

View File

@ -0,0 +1,96 @@
package wikipedia
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.{Logger, Level}
import org.apache.spark.rdd.RDD
case class WikipediaArticle(title: String, text: String) {
/**
* @return Whether the text of this article mentions `lang` or not
* @param lang Language to look for (e.g. "Scala")
*/
def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}
object WikipediaRanking extends WikipediaRankingInterface {
// Reduce Spark logging verbosity
Logger.getLogger("org").setLevel(Level.ERROR)
val langs = List(
"JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
"Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")
val conf: SparkConf = ???
val sc: SparkContext = ???
// Hint: use a combination of `sc.parallelize`, `WikipediaData.lines` and `WikipediaData.parse`
val wikiRdd: RDD[WikipediaArticle] = ???
/** Returns the number of articles on which the language `lang` occurs.
* Hint1: consider using method `aggregate` on RDD[T].
* Hint2: consider using method `mentionsLanguage` on `WikipediaArticle`
*/
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = ???
/* (1) Use `occurrencesOfLang` to compute the ranking of the languages
* (`val langs`) by determining the number of Wikipedia articles that
* mention each language at least once. Don't forget to sort the
* languages by their occurrence, in decreasing order!
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = ???
/* Compute an inverted index of the set of articles, mapping each language
* to the Wikipedia pages in which it occurs.
*/
def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = ???
/* (2) Compute the language ranking again, but now using the inverted index. Can you notice
* a performance improvement?
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = ???
/* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.
* Can you notice an improvement in performance compared to measuring *both* the computation of the index
* and the computation of the ranking? If so, can you think of a reason?
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = ???
def main(args: Array[String]): Unit = {
/* Languages ranked according to (1) */
val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))
/* An inverted index mapping languages to wikipedia pages on which they appear */
def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
/* Languages ranked according to (2), using the inverted index */
val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))
/* Languages ranked according to (3) */
val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))
/* Output the speed of each ranking */
println(timing)
sc.stop()
}
val timing = new StringBuffer
def timed[T](label: String, code: => T): T = {
val start = System.currentTimeMillis()
val result = code
val stop = System.currentTimeMillis()
timing.append(s"Processing $label took ${stop - start} ms.\n")
result
}
}

View File

@ -0,0 +1,20 @@
package wikipedia
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
/**
* The interface used by the grading infrastructure. Do not change signatures
* or your submission will fail with a NoSuchMethodError.
*/
trait WikipediaRankingInterface {
def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])]
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int
def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)]
def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)]
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)]
def langs: List[String]
def sc: SparkContext
def wikiRdd: RDD[WikipediaArticle]
}

View File

@ -0,0 +1,142 @@
package wikipedia
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.junit._
class WikipediaSuite {
def initializeWikipediaRanking(): Boolean =
try {
WikipediaRanking
true
} catch {
case ex: Throwable =>
println(ex.getMessage)
ex.printStackTrace()
false
}
import WikipediaRanking._
/**
* Creates a truncated string representation of a list, adding ", ...)" if there
* are too many elements to show
* @param l The list to preview
* @param n The number of elements to cut it at
* @return A preview of the list, containing at most n elements.
*/
def previewList[A](l: List[A], n: Int = 10): String =
if (l.length <= n) l.toString
else l.take(n).toString.dropRight(1) + ", ...)"
/**
* Asserts that all the elements in a given list and an expected list are the same,
* regardless of order. For a prettier output, given and expected should be sorted
* with the same ordering.
* @param actual The actual list
* @param expected The expected list
* @tparam A Type of the list elements
*/
def assertSameElements[A](actual: List[A], expected: List[A]): Unit = {
val givenSet = actual.toSet
val expectedSet = expected.toSet
val unexpected = givenSet -- expectedSet
val missing = expectedSet -- givenSet
val noUnexpectedElements = unexpected.isEmpty
val noMissingElements = missing.isEmpty
val noMatchString =
s"""
|Expected: ${previewList(expected)}
|Actual: ${previewList(actual)}""".stripMargin
assert(noUnexpectedElements,
s"""|$noMatchString
|The given collection contains some unexpected elements: ${previewList(unexpected.toList, 5)}""".stripMargin)
assert(noMissingElements,
s"""|$noMatchString
|The given collection is missing some expected elements: ${previewList(missing.toList, 5)}""".stripMargin)
}
// Conditions:
// (1) the language stats contain the same elements
// (2) they are ordered (and the order doesn't matter if there are several languages with the same count)
def assertEquivalentAndOrdered(actual: List[(String, Int)], expected: List[(String, Int)]): Unit = {
// (1)
assertSameElements(actual, expected)
// (2)
assert(
!(actual zip actual.tail).exists({ case ((_, occ1), (_, occ2)) => occ1 < occ2 }),
"The given elements are not in descending order"
)
}
@Test def `'occurrencesOfLang' should work for (specific) RDD with one element`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val rdd = sc.parallelize(Seq(WikipediaArticle("title", "Java Jakarta")))
val res = (occurrencesOfLang("Java", rdd) == 1)
assert(res, "occurrencesOfLang given (specific) RDD with one element should equal to 1")
}
@Test def `'rankLangs' should work for RDD with two elements`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val langs = List("Scala", "Java")
val rdd = sc.parallelize(List(WikipediaArticle("1", "Scala is great"), WikipediaArticle("2", "Java is OK, but Scala is cooler")))
val ranked = rankLangs(langs, rdd)
val res = ranked.head._1 == "Scala"
assert(res)
}
@Test def `'makeIndex' creates a simple index with two entries`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val langs = List("Scala", "Java")
val articles = List(
WikipediaArticle("1","Groovy is pretty interesting, and so is Erlang"),
WikipediaArticle("2","Scala and Java run on the JVM"),
WikipediaArticle("3","Scala is not purely functional")
)
val rdd = sc.parallelize(articles)
val index = makeIndex(langs, rdd)
val res = index.count() == 2
assert(res)
}
@Test def `'rankLangsUsingIndex' should work for a simple RDD with three elements`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val langs = List("Scala", "Java")
val articles = List(
WikipediaArticle("1","Groovy is pretty interesting, and so is Erlang"),
WikipediaArticle("2","Scala and Java run on the JVM"),
WikipediaArticle("3","Scala is not purely functional")
)
val rdd = sc.parallelize(articles)
val index = makeIndex(langs, rdd)
val ranked = rankLangsUsingIndex(index)
val res = (ranked.head._1 == "Scala")
assert(res)
}
@Test def `'rankLangsReduceByKey' should work for a simple RDD with five elements`: Unit = {
assert(initializeWikipediaRanking(), " -- did you fill in all the values in WikipediaRanking (conf, sc, wikiRdd)?")
val langs = List("Scala", "Java", "Groovy", "Haskell", "Erlang")
val articles = List(
WikipediaArticle("1","Groovy is pretty interesting, and so is Erlang"),
WikipediaArticle("2","Scala and Java run on the JVM"),
WikipediaArticle("3","Scala is not purely functional"),
WikipediaArticle("4","The cool kids like Haskell more than Java"),
WikipediaArticle("5","Java is for enterprise developers")
)
val rdd = sc.parallelize(articles)
val ranked = rankLangsReduceByKey(langs, rdd)
val res = (ranked.head._1 == "Java")
assert(res)
}
@Rule def individualTestTimeout = new org.junit.rules.Timeout(100 * 1000)
}