Kafka producer, consumer with Avro does not work. -


i trying produce / consume avro messages using kafka mentioned in http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

kafkaproducer

package com.company.telemetry.avro;    import io.confluent.kafka.serializers.kafkaavroserializer;   import io.confluent.kafka.serializers.kafkaavroserializerconfig;   import org.apache.avro.schema;   import org.apache.avro.generic.genericdata;   import org.apache.avro.generic.genericrecord;   import org.apache.kafka.clients.producer.kafkaproducer;   import org.apache.kafka.clients.producer.producer;   import org.apache.kafka.clients.producer.producerconfig;   import org.apache.kafka.clients.producer.producerrecord;   import org.apache.kafka.common.serialization.longserializer;    import java.util.properties;     public class kafkaavroproducer {       private void produceavrodata(){           try {               properties props = new properties();               props.put(producerconfig.bootstrap_servers_config, "1.2.3.4:9092");               props.put(producerconfig.client_id_config, "avroproducer");               props.put(producerconfig.key_serializer_class_config, longserializer.class.getname());               props.put(producerconfig.value_serializer_class_config, kafkaavroserializer.class.getname());               props.put(kafkaavroserializerconfig.schema_registry_url_config, "http://1.2.3.4:8081");                producer producer = new kafkaproducer(props);                long key = 1l;               string userschema = "{\"type\":\"record\"," +                       "\"name\":\"myrecord\"," +                       "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";               schema.parser parser = new schema.parser();               schema schema = parser.parse(userschema);               genericrecord avrorecord = new genericdata.record(schema);               avrorecord.put("f1", "value1");                producerrecord<object, object> record = new producerrecord<object, object>("topic2", key, avrorecord);                producer.send(record);               system.out.println("sent complete");           } catch(exception e) {               e.printstacktrace();           }       }        public static void main(string[] args) {           kafkaavroproducer kafkaavroproducer = new kafkaavroproducer();           kafkaavroproducer.produceavrodata();       }   } 

kafkaconsumer

package com.company.telemetry.avro;  import java.util.properties; import java.util.arrays; import org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.consumerrecord; import io.confluent.kafka.serializers.kafkaavrodeserializer; import org.apache.kafka.common.serialization.longdeserializer;  public class consumergroup {     public static void main(string[] args) throws exception {           string topic = "topic2";         string group = "group1";         properties props = new properties();         props.put("bootstrap.servers", "1.2.3.4:9092");         props.put("group.id", group);         props.put("enable.auto.commit", "true");         props.put("auto.commit.interval.ms", "1000");         props.put("session.timeout.ms", "30000");         props.put("key.deserializer",                 longdeserializer.class.getname());         props.put("value.deserializer",                 kafkaavrodeserializer.class.getname());         props.put("schema.registry.url","1.2.3.4:8081");         kafkaconsumer<string, string> consumer = new kafkaconsumer<string, string>(props);          consumer.subscribe(arrays.aslist(topic));         system.out.println("subscribed topic " + topic);         int = 0;          while (true) {             consumerrecords<string, string> records = consumer.poll(100);             (consumerrecord<string, string> record : records)                 system.out.printf("offset = %d, key = %s, value = %s\n",                         record.offset(), record.key(), record.value());         }     } } 

when run producer, see following message in schema registry

[2017-08-18 17:13:45,314] info 8.7.65.5 - - [18/aug/2017:17:13:45 +0000] "post /subjects/topic2-value/versions http/1.1" 200 10 27 (io.confluent.rest-utils.requests)

but apart that, cannot see in consumer log. appreciated !


Comments