Flume Kafka Channel does not shutdown properly -


i'm using flume-ng-1.6.0+cdh5.11.1+160 custom source -> kafka channel -> hdfs sink pipeline. when restarting flume agents, i'll see following error logs:

error while shutting down consumer. java.util.concurrentmodificationexception: kafkaconsumer not safe multi-threaded access     @ org.apache.kafka.clients.consumer.kafkaconsumer.close(kafkaconsumer.java:1258)     @ org.apache.flume.channel.kafka.kafkachannel.stop(kafkachannel.java:150)     @ org.apache.flume.lifecycle.lifecyclesupervisor.stop(lifecyclesupervisor.java:104) 

i dig around source code , see lifecycleaware.stop() called in jvm shutdown hook, , suppose running in thread different kafkaconsumer constructed, results in concurrentmodificationexception.

what's more, in lifecyclesupervisor.stop(), lifecycleaware.stop() called sequentially without catching exceptions, if 1 stop fails, following stops not called @ all, , guess why there'll unclosed .tmp files in hdfs every time restart agents, , these data become lost.

any idea eliminate exception? thanks.


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

reflection - How to access the object-members of an object declaration in kotlin -

php - Doctrine Query Builder Error on Join: [Syntax Error] line 0, col 87: Error: Expected Literal, got 'JOIN' -