Not able to set mapper.readValuemapper.readValue(words, classOf[InboundLocation]) in jackson as showing type mismatch in scala -


 val conf = new sparkconf().setappname("simpleexample").setmaster("local[*]")     val sc = new streamingcontext(conf,seconds(5))      val mapper = new objectmapper()      val kafkaconf = map(       "zookeeper.connect" -> "localhost:2181",       "group.id" -> "test-consumer-group",       "zookeeper.connection.timeout.ms" -> "5000"     )      val lines  = kafkautils.createstream[array[byte], string , defaultdecoder , stringdecoder](       sc ,       kafkaconf,       map("testtopickafka" -> 1) ,       storagelevel.memory_and_disk     )      val words : dstream[string]  = lines.flatmap { case (x, y) => y.split(" ") }         val out :  inboundlocation = mapper.readvalue(words, classof[inboundlocation]) //      val type12 = out.getmessage_id //      print(type12)          words.print()     sc.start()     sc.awaittermination() 

you can try following approach.

words.foreachrdd((rdd, time) => {     if (rdd.count() > 0) {        //todo: apply business logic     } }) ssc.start() ssc.awaittermination() 

hope helps!


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

jquery - Responsive Navbar with Sub Navbar -