Not able to set mapper.readValuemapper.readValue(words, classOf[InboundLocation]) in jackson as showing type mismatch in scala -
val conf = new sparkconf().setappname("simpleexample").setmaster("local[*]") val sc = new streamingcontext(conf,seconds(5)) val mapper = new objectmapper() val kafkaconf = map( "zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group", "zookeeper.connection.timeout.ms" -> "5000" ) val lines = kafkautils.createstream[array[byte], string , defaultdecoder , stringdecoder]( sc , kafkaconf, map("testtopickafka" -> 1) , storagelevel.memory_and_disk ) val words : dstream[string] = lines.flatmap { case (x, y) => y.split(" ") } val out : inboundlocation = mapper.readvalue(words, classof[inboundlocation]) // val type12 = out.getmessage_id // print(type12) words.print() sc.start() sc.awaittermination()
you can try following approach.
words.foreachrdd((rdd, time) => { if (rdd.count() > 0) { //todo: apply business logic } }) ssc.start() ssc.awaittermination()
hope helps!
Comments
Post a Comment