Failed to construct kafka consumer -
there quite few answers on topic nothing working.
i trying execute following streams processor.
object simplestream extends app { val builder: kstreambuilder = new kstreambuilder val streamingconfig = { //todo - move these config val settings = new properties settings.put(streamsconfig.application_id_config, "example11") settings.put(streamsconfig.bootstrap_servers_config, "localhost:9092") // specify default (de)serializers record keys , record values. settings.put(streamsconfig.key_serde_class_config, serdes.string.getclass.getname) settings.put(streamsconfig.value_serde_class_config, serdes.bytearray.getclass.getname) settings } val users = builder.stream("tt2") users.print() val stream: kafkastreams = new kafkastreams(builder, streamingconfig) stream.start() } }
dependencies:
//kafka "org.apache.kafka" % "kafka-streams" % "0.10.2.0", "org.apache.kafka" % "kafka-clients" % "0.10.2.0"
and error:
[error] (run-main-1) org.apache.kafka.common.kafkaexception: failed construct kafka consumer org.apache.kafka.common.kafkaexception: failed construct kafka consumer @ org.apache.kafka.clients.consumer.kafkaconsumer.<init>(kafkaconsumer.java:717) @ org.apache.kafka.clients.consumer.kafkaconsumer.<init>(kafkaconsumer.java:566) @ org.apache.kafka.streams.processor.internals.defaultkafkaclientsupplier.getconsumer(defaultkafkaclientsupplier.java:38) @ org.apache.kafka.streams.processor.internals.streamthread.<init>(streamthread.java:323) @ org.apache.kafka.streams.kafkastreams.<init>(kafkastreams.java:349) @ org.apache.kafka.streams.kafkastreams.<init>(kafkastreams.java:272) @ kafka.simplestream$.runstream(simplestream.scala:36) @ kafka.simplestream$.delayedendpoint$kafka$simplestream$1(simplestream.scala:40) @ kafka.simplestream$delayedinit$body.apply(simplestream.scala:12) @ scala.function0.apply$mcv$sp(function0.scala:34) @ scala.function0.apply$mcv$sp$(function0.scala:34) @ scala.runtime.abstractfunction0.apply$mcv$sp(abstractfunction0.scala:12) @ scala.app.$anonfun$main$1$adapted(app.scala:76) @ scala.collection.immutable.list.foreach(list.scala:378) @ scala.app.main(app.scala:76) @ scala.app.main$(app.scala:74) @ kafka.simplestream$.main(simplestream.scala:12) @ kafka.simplestream.main(simplestream.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) caused by: java.lang.nosuchmethoderror: org.apache.kafka.clients.metadata.update(lorg/apache/kafka/common/cluster;j)v
i've tried different client versions, no luck. using kafka 0.10.2.0 version. below error in zookeeper.
[2017-08-18 13:08:10,260] info got user-level keeperexception when processing sessionid:0x15df53e101e0001 type:delete cxid:0x29 zxid:0x4d txntype:-1 reqpath:n/a error path:/admin/preferred_replica_election error:keepererrorcode = nonode /admin/preferred_replica_election (org.apache.zookeeper.server.preprequestprocessor) [2017-08-18 13:08:10,364] info got user-level keeperexception when processing sessionid:0x15df53e101e0001 type:create cxid:0x35 zxid:0x4e txntype:-1 reqpath:n/a error path:/brokers error:keepererrorcode = nodeexists /brokers (org.apache.zookeeper.server.preprequestprocessor) [2017-08-18 13:08:10,364] info got user-level keeperexception when processing sessionid:0x15df53e101e0001 type:create cxid:0x36 zxid:0x4f txntype:-1 reqpath:n/a error path:/brokers/ids error:keepererrorcode = nodeexists /brokers/ids (org.apache.zookeeper.server.preprequestprocessor)
not sure causing it. able consumer/produce fine though.
java.lang.nosuchmethoderror
- error happens when multiple versions of client jar available in classpath. check classpath once.
the keeperexception
thrown zookeeper not issue, it's creating nodes / folders doesn't exists in zookeeper.
Comments
Post a Comment