Kafka producer, consumer with Avro does not work. -
i trying produce / consume avro messages using kafka mentioned in http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
kafkaproducer
package com.company.telemetry.avro; import io.confluent.kafka.serializers.kafkaavroserializer; import io.confluent.kafka.serializers.kafkaavroserializerconfig; import org.apache.avro.schema; import org.apache.avro.generic.genericdata; import org.apache.avro.generic.genericrecord; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producer; import org.apache.kafka.clients.producer.producerconfig; import org.apache.kafka.clients.producer.producerrecord; import org.apache.kafka.common.serialization.longserializer; import java.util.properties; public class kafkaavroproducer { private void produceavrodata(){ try { properties props = new properties(); props.put(producerconfig.bootstrap_servers_config, "1.2.3.4:9092"); props.put(producerconfig.client_id_config, "avroproducer"); props.put(producerconfig.key_serializer_class_config, longserializer.class.getname()); props.put(producerconfig.value_serializer_class_config, kafkaavroserializer.class.getname()); props.put(kafkaavroserializerconfig.schema_registry_url_config, "http://1.2.3.4:8081"); producer producer = new kafkaproducer(props); long key = 1l; string userschema = "{\"type\":\"record\"," + "\"name\":\"myrecord\"," + "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; schema.parser parser = new schema.parser(); schema schema = parser.parse(userschema); genericrecord avrorecord = new genericdata.record(schema); avrorecord.put("f1", "value1"); producerrecord<object, object> record = new producerrecord<object, object>("topic2", key, avrorecord); producer.send(record); system.out.println("sent complete"); } catch(exception e) { e.printstacktrace(); } } public static void main(string[] args) { kafkaavroproducer kafkaavroproducer = new kafkaavroproducer(); kafkaavroproducer.produceavrodata(); } } kafkaconsumer
package com.company.telemetry.avro; import java.util.properties; import java.util.arrays; import org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.consumerrecord; import io.confluent.kafka.serializers.kafkaavrodeserializer; import org.apache.kafka.common.serialization.longdeserializer; public class consumergroup { public static void main(string[] args) throws exception { string topic = "topic2"; string group = "group1"; properties props = new properties(); props.put("bootstrap.servers", "1.2.3.4:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", longdeserializer.class.getname()); props.put("value.deserializer", kafkaavrodeserializer.class.getname()); props.put("schema.registry.url","1.2.3.4:8081"); kafkaconsumer<string, string> consumer = new kafkaconsumer<string, string>(props); consumer.subscribe(arrays.aslist(topic)); system.out.println("subscribed topic " + topic); int = 0; while (true) { consumerrecords<string, string> records = consumer.poll(100); (consumerrecord<string, string> record : records) system.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } when run producer, see following message in schema registry
[2017-08-18 17:13:45,314] info 8.7.65.5 - - [18/aug/2017:17:13:45 +0000] "post /subjects/topic2-value/versions http/1.1" 200 10 27 (io.confluent.rest-utils.requests)
but apart that, cannot see in consumer log. appreciated !
Comments
Post a Comment