How to implement LDA in Spark and get the topic distributions of new documents ?
```scalaimport org.apache.spark.rdd._import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}import org.apache.spark.mllib.linalg.{Vector, Vectors}import scala.collection.mutable//create training document setval input = Seq("this is a document","this could be another document","these are training, not tests", "here is the final file (document)")val corpus: RDD[Array[String]] = sc.parallelize(input.map{doc => doc.split("\\s")})val termCounts: Array[(String, Long)] = corpus.flatMap(_.map(_ -> 1L)).reduceByKey(_ + _).collect().sortBy(-_._2)val vocabArray: Array[String] = termCounts.takeRight(termCounts.size).map(_._1)val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap// Convert training documents into term count vectorsval documents: RDD[(Long, Vector)] =corpus.zipWithIndex.map { case (tokens, id) =>val counts = new mutable.HashMap[Int, Double]()tokens.foreach { term =>if (vocab.contains(term)) {val idx = vocab(term)counts(idx) = counts.getOrElse(idx, 0.0) + 1.0}}(id, Vectors.sparse(vocab.size, counts.toSeq))}// Set LDA parameters and create modelval numTopics = 10val ldaModel: DistributedLDAModel = new LDA().setK(numTopics).setMaxIterations(20).run(documents).asInstanceOf[DistributedLDAModel]val localLDAModel: LocalLDAModel = ldaModel.toLocal//create test input, convert to term count, and get its topic distributionval test_input = Seq("this is my test document")val test_document:RDD[(Long,Vector)] = sc.parallelize(test_input.map(doc=>doc.split("\\s"))).zipWithIndex.map{ case (tokens, id) =>val counts = new mutable.HashMap[Int, Double]()tokens.foreach { term =>if (vocab.contains(term)) {val idx = vocab(term)counts(idx) = counts.getOrElse(idx, 0.0) + 1.0}}(id, Vectors.sparse(vocab.size, counts.toSeq))}val topicDistributions = localLDAModel.topicDistributions(test_document)println("first topic distribution:"+topicDistributions.first._2.toArray.mkString(", "))```