Flume Kafka Channel does not shutdown properly -
i'm using flume-ng-1.6.0+cdh5.11.1+160
custom source -> kafka channel -> hdfs sink pipeline. when restarting flume agents, i'll see following error logs:
error while shutting down consumer. java.util.concurrentmodificationexception: kafkaconsumer not safe multi-threaded access @ org.apache.kafka.clients.consumer.kafkaconsumer.close(kafkaconsumer.java:1258) @ org.apache.flume.channel.kafka.kafkachannel.stop(kafkachannel.java:150) @ org.apache.flume.lifecycle.lifecyclesupervisor.stop(lifecyclesupervisor.java:104)
i dig around source code , see lifecycleaware.stop()
called in jvm shutdown hook, , suppose running in thread different kafkaconsumer constructed, results in concurrentmodificationexception
.
what's more, in lifecyclesupervisor.stop()
, lifecycleaware.stop()
called sequentially without catching exceptions, if 1 stop
fails, following stop
s not called @ all, , guess why there'll unclosed .tmp
files in hdfs every time restart agents, , these data become lost.
any idea eliminate exception? thanks.
Comments
Post a Comment