Flume Kafka Channel does not shutdown properly -
i'm using flume-ng-1.6.0+cdh5.11.1+160 custom source -> kafka channel -> hdfs sink pipeline. when restarting flume agents, i'll see following error logs:
error while shutting down consumer. java.util.concurrentmodificationexception: kafkaconsumer not safe multi-threaded access @ org.apache.kafka.clients.consumer.kafkaconsumer.close(kafkaconsumer.java:1258) @ org.apache.flume.channel.kafka.kafkachannel.stop(kafkachannel.java:150) @ org.apache.flume.lifecycle.lifecyclesupervisor.stop(lifecyclesupervisor.java:104) i dig around source code , see lifecycleaware.stop() called in jvm shutdown hook, , suppose running in thread different kafkaconsumer constructed, results in concurrentmodificationexception.
what's more, in lifecyclesupervisor.stop(), lifecycleaware.stop() called sequentially without catching exceptions, if 1 stop fails, following stops not called @ all, , guess why there'll unclosed .tmp files in hdfs every time restart agents, , these data become lost.
any idea eliminate exception? thanks.
Comments
Post a Comment