java - Kafka Producer Request Timeout setting -
i set request time-out added request.timeout.ms parameter. bu when have broken instinctively broker connection there not timeout error occur?
what missing in configuration? need modify server setting well?
public void init() { logger.info("initializing kafkaproducer: topic name: {}", topic); system.out.println("initializing kafkaproducer: topic name: {}"); properties properties = new properties(); properties.put("bootstrap.servers", brokerlist); properties.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); properties.put("acks", "1"); properties.put("retries", "3"); properties.put("linger.ms", 5); properties.put("block.on.buffer.full", false); properties.put("request.timeout.ms", "1000"); //properties.put("metadata.fetch.timeout.ms", 1000); producer = new kafkaproducer<>(properties); } public void produce(string txnlogstr) { producerrecord<string, string> record = new producerrecord<string, string>(topic, txnlogstr); producer.send(record, new producercallback()); } private class producercallback implements callback { @override public void oncompletion(recordmetadata recordmetadata, exception e) { system.out.println("oncompletion recordmetadata:"+recordmetadata.offset()+", e:"+e); if (e != null && recordmetadata != null) { logger.error("kafka queue problem. topic: {}", topic, e); e.printstacktrace(); }else { system.out.println("no error"); } } }
kafka version : kafka_2.11-0.10.2.0
pom.xml
<dependencies> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2.11</artifactid> <version>0.10.2.0</version> </dependency> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka-clients</artifactid> <version>0.10.2.0</version> </dependency> </dependencies>
worked following property setting
properties.put("metadata.fetch.timeout.ms", "1000");
Comments
Post a Comment