scala - How to fetch Kafka Stream and print it in Spark Shell? -
first built sbt in folder in following way
val sparkversion = "1.6.3" scalaversion := "2.10.5" resolvers += "spark packages repo" @ "https://dl.bintray.com/spark-packages/maven" librarydependencies ++= seq( "org.apache.spark" %% "spark-streaming" % sparkversion, "org.apache.spark" %% "spark-streaming-kafka" % sparkversion ) librarydependencies +="datastax" % "spark-cassandra-connector" % "1.6.3-s_2.10" librarydependencies +="org.apache.spark" %% "spark-sql" % "1.1.0"
later in same folder "build.sbt" exists started spark shell in following way
>/usr/hdp/2.6.0.3-8/spark/bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.3-s_2.10 --conf spark.cassandra.connection.host=127.0.0.1
these warnings shown while spark shell started:
warn abstractlifecycle: failed selectchannelconnector@0.0.0.0:4040: java.net.bind java.net.bindexception: address in use warn abstractlifecycle: failed org.spark-project.jetty.server.server@75bf9e67: java.net.bindexception: address in use
in spark shell importing following packages
import org.apache.spark.sparkconf; import org.apache.spark.streaming.streamingcontext; import org.apache.spark.streaming.seconds; import org.apache.spark.streaming.kafka.kafkautils; import com.datastax.spark.connector._ ; import org.apache.spark.sql.cassandra._ ;
then in spark shell creating configuration in below way:
val conf = new sparkconf().setmaster("local[*]").setappname("kafkareceiver").set("spark.driver.allowmultiplecontexts", "true").setmaster("local");
after creating configuration assigning it, created new spark streaming context in below way:
val ssc = new streamingcontext(conf, seconds(10))
during creation of spark streaming context few warnings shown above raised again along other warning, shown below
warn abstractlifecycle: failed selectchannelconnector@0.0.0.0:4040: java.net.bind java.net.bindexception: address in use . . . warn abstractlifecycle: failed org.spark-project.jetty.server.server@75bf9e67: java.net.bindexception: address in use . . . warn sparkcontext: multiple running sparkcontexts detected in same jvm! org.apache.spark.sparkexception: 1 sparkcontext may running in jvm (see spark-2243). ignore error, set spark.driver.allowmulti plecontexts = true. running sparkcontext created at: org.apache.spark.sparkcontext.<init>(sparkcontext.scala:82) org.apache.spark.repl.sparkiloop.createsparkcontext(sparkiloop.scala:1017) . . . warn streamingcontext: spark.master should set local[n], n > 1 in local mode if have receivers data, otherwise spa rk jobs not resources process received data. ssc: org.apache.spark.streaming.streamingcontext = org.apache.spark.streaming.streamingcontext@616f1c2e
then using created spark streaming context created kafkastream in below way
val kafkastream = kafkautils.createstream(ssc, "localhost:2181","spark-streaming-consumer-group", map("spark-topic" -> 5))
then printing stream , starting ssc in below way
kafkastream.print() ssc.start
after use of above command in shell output shown in below images
later starts following mess ! of stream without printing values information shown in below image
output that's getting printed repeatedly here shown below !
17/08/18 10:01:30 info jobscheduler: starting job streaming job 1503050490000 ms.0 job set of time 1503050490000 ms 17/08/18 10:01:30 info jobscheduler: finished job streaming job 1503050490000 ms.0 job set of time 1503050490000 ms 17/08/18 10:01:30 info jobscheduler: total delay: 0.003 s time 1503050490000 ms (execution: 0.000 s) 17/08/18 10:01:30 info blockrdd: removing rdd 3 persistence list 17/08/18 10:01:30 info kafkainputdstream: removing blocks of rdd blockrdd[3] @ createstream @ <console>:39 of time 1503050490000 ms 17/08/18 10:01:30 info receivedblocktracker: deleting batches arraybuffer(1503050470000 ms) 17/08/18 10:01:30 info inputinfotracker: remove old batch metadata: 1503050470000 ms 17/08/18 10:01:30 info blockmanager: removing rdd 3 17/08/18 10:01:40 info jobscheduler: added jobs time 1503050500000 ms ------------------------------------------- time: 1503050500000 ms -------------------------------------------
warn abstractlifecycle: failed selectchannelconnector@0.0.0.0:4040: java.net.bind java.net.bindexception: address in use
it means port needs in use. rule, port 4040 used spark-thrifteserver. try stop thriftserver using stop-thriftserver.sh spark/sbin folder. or check else use port , free it.
Comments
Post a Comment