Nedenfor er svaret:
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.math.indexeddataset.Schema
import org.apache.mahout.sparkbindings
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.RDD
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat
object SparkExample extends App {
implicit val mc = sparkbindings.mahoutSparkContext(masterUrl = "local", appName = "RowSimilarity")
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://hostname:27017/db.collection")
val documents: RDD[(Object, BSONObject)] = mc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject]
)
val documents_Array: RDD[(String, Array[String])] = documents.map(
doc1 => (
doc1._2.get("product_id").toString(),
doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-").mkString(" "))
)
)
val new_doc: RDD[(String, String)] = documents_Array.flatMapValues(x => x)
val myIDs = IndexedDatasetSpark(new_doc)(mc)
val readWriteSchema = new Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"omitScore" -> false,
"elementDelim" -> " "
)
SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://hadoop:9000/mongo-hadoop-rowsimilarity", readWriteSchema)(mc)
}
build.sbt:
name := "scala-mongo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
Jeg har brugt mongo-hadoop at hente data fra Mongo og bruge dem. Da mine data havde et array, var jeg nødt til at bruge flatMapValues til at udjævne det og derefter videregive til IDS for korrekt output.
PS:Jeg postede svaret her og ikke det linkede spørgsmål fordi denne Q&A dækker det fulde omfang af indhentning og behandling af data.