Flume Kafka Channel does not shutdown properly -


i'm using flume-ng-1.6.0+cdh5.11.1+160 custom source -> kafka channel -> hdfs sink pipeline. when restarting flume agents, i'll see following error logs:

error while shutting down consumer. java.util.concurrentmodificationexception: kafkaconsumer not safe multi-threaded access     @ org.apache.kafka.clients.consumer.kafkaconsumer.close(kafkaconsumer.java:1258)     @ org.apache.flume.channel.kafka.kafkachannel.stop(kafkachannel.java:150)     @ org.apache.flume.lifecycle.lifecyclesupervisor.stop(lifecyclesupervisor.java:104) 

i dig around source code , see lifecycleaware.stop() called in jvm shutdown hook, , suppose running in thread different kafkaconsumer constructed, results in concurrentmodificationexception.

what's more, in lifecyclesupervisor.stop(), lifecycleaware.stop() called sequentially without catching exceptions, if 1 stop fails, following stops not called @ all, , guess why there'll unclosed .tmp files in hdfs every time restart agents, , these data become lost.

any idea eliminate exception? thanks.


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

jquery - Responsive Navbar with Sub Navbar -