scala - How to fetch Kafka Stream and print it in Spark Shell? -


first built sbt in folder in following way

val sparkversion = "1.6.3"                                                                                                                            scalaversion := "2.10.5"                                                                                                                              resolvers += "spark packages repo" @ "https://dl.bintray.com/spark-packages/maven"                                                                   librarydependencies ++= seq(                                                                                                                          "org.apache.spark" %% "spark-streaming" % sparkversion,                                                                                               "org.apache.spark" %% "spark-streaming-kafka" % sparkversion                                                                                           )                                                                                                                                                    librarydependencies +="datastax" % "spark-cassandra-connector" % "1.6.3-s_2.10"                                                                       librarydependencies +="org.apache.spark" %% "spark-sql" % "1.1.0" 

later in same folder "build.sbt" exists started spark shell in following way

 >/usr/hdp/2.6.0.3-8/spark/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.3-s_2.10 --conf spark.cassandra.connection.host=127.0.0.1 

these warnings shown while spark shell started:


warn abstractlifecycle: failed selectchannelconnector@0.0.0.0:4040: java.net.bind java.net.bindexception: address in use warn abstractlifecycle: failed org.spark-project.jetty.server.server@75bf9e67: java.net.bindexception: address in use 

in spark shell importing following packages

import org.apache.spark.sparkconf; import org.apache.spark.streaming.streamingcontext; import org.apache.spark.streaming.seconds; import org.apache.spark.streaming.kafka.kafkautils; import com.datastax.spark.connector._ ; import org.apache.spark.sql.cassandra._ ; 

then in spark shell creating configuration in below way:

val conf = new sparkconf().setmaster("local[*]").setappname("kafkareceiver").set("spark.driver.allowmultiplecontexts", "true").setmaster("local"); 

after creating configuration assigning it, created new spark streaming context in below way:

val ssc = new streamingcontext(conf, seconds(10)) 

during creation of spark streaming context few warnings shown above raised again along other warning, shown below


warn abstractlifecycle: failed selectchannelconnector@0.0.0.0:4040: java.net.bind java.net.bindexception: address in use . . . warn abstractlifecycle: failed org.spark-project.jetty.server.server@75bf9e67: java.net.bindexception: address in use . . . warn sparkcontext: multiple running sparkcontexts detected in same jvm!                                                         org.apache.spark.sparkexception: 1 sparkcontext may running in jvm (see spark-2243). ignore error, set spark.driver.allowmulti plecontexts = true. running sparkcontext created at:                                                                                org.apache.spark.sparkcontext.<init>(sparkcontext.scala:82)                                                                                           org.apache.spark.repl.sparkiloop.createsparkcontext(sparkiloop.scala:1017) . . . warn streamingcontext: spark.master should set local[n], n > 1 in local mode if have receivers data, otherwise spa rk jobs not resources process received data.                                                                                          ssc: org.apache.spark.streaming.streamingcontext = org.apache.spark.streaming.streamingcontext@616f1c2e 

then using created spark streaming context created kafkastream in below way

val kafkastream = kafkautils.createstream(ssc, "localhost:2181","spark-streaming-consumer-group", map("spark-topic" -> 5)) 

then printing stream , starting ssc in below way

kafkastream.print() ssc.start 

after use of above command in shell output shown in below images

image1- eariler part of output

image2 - after fetching jars & connecting zookeeper

issues came across in zookeeper

later starts following mess ! of stream without printing values information shown in below image final output

output that's getting printed repeatedly here shown below !


17/08/18 10:01:30 info jobscheduler: starting job streaming job 1503050490000 ms.0 job set of time 1503050490000 ms                              17/08/18 10:01:30 info jobscheduler: finished job streaming job 1503050490000 ms.0 job set of time 1503050490000 ms                              17/08/18 10:01:30 info jobscheduler: total delay: 0.003 s time 1503050490000 ms (execution: 0.000 s)                                              17/08/18 10:01:30 info blockrdd: removing rdd 3 persistence list                                                                                 17/08/18 10:01:30 info kafkainputdstream: removing blocks of rdd blockrdd[3] @ createstream @ <console>:39 of time 1503050490000 ms                 17/08/18 10:01:30 info receivedblocktracker: deleting batches arraybuffer(1503050470000 ms)                                                           17/08/18 10:01:30 info inputinfotracker: remove old batch metadata: 1503050470000 ms                                                                  17/08/18 10:01:30 info blockmanager: removing rdd 3                                                                                                   17/08/18 10:01:40 info jobscheduler: added jobs time 1503050500000 ms                                                                             -------------------------------------------                                                                                                           time: 1503050500000 ms                                                                                                                                ------------------------------------------- 

warn abstractlifecycle: failed selectchannelconnector@0.0.0.0:4040: java.net.bind    java.net.bindexception: address in use 

it means port needs in use. rule, port 4040 used spark-thrifteserver. try stop thriftserver using stop-thriftserver.sh spark/sbin folder. or check else use port , free it.


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

c# - Asp.net web api : redirect unauthorized requst to forbidden page -