Flume Kafka Channel does not shutdown properly -


i'm using flume-ng-1.6.0+cdh5.11.1+160 custom source -> kafka channel -> hdfs sink pipeline. when restarting flume agents, i'll see following error logs:

error while shutting down consumer. java.util.concurrentmodificationexception: kafkaconsumer not safe multi-threaded access     @ org.apache.kafka.clients.consumer.kafkaconsumer.close(kafkaconsumer.java:1258)     @ org.apache.flume.channel.kafka.kafkachannel.stop(kafkachannel.java:150)     @ org.apache.flume.lifecycle.lifecyclesupervisor.stop(lifecyclesupervisor.java:104) 

i dig around source code , see lifecycleaware.stop() called in jvm shutdown hook, , suppose running in thread different kafkaconsumer constructed, results in concurrentmodificationexception.

what's more, in lifecyclesupervisor.stop(), lifecycleaware.stop() called sequentially without catching exceptions, if 1 stop fails, following stops not called @ all, , guess why there'll unclosed .tmp files in hdfs every time restart agents, , these data become lost.

any idea eliminate exception? thanks.


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

Qt QGraphicsScene is not accessable from QGraphicsView (on Qt 5.6.1) -

What is happening when Matlab is starting a "parallel pool"? -