Java使用kafka发送和生产消息的示例
1.maven依赖包
org.apache.kafka kafka-clients 0.9.0.1
2.生产者代码
packagecom.lnho.example.kafka; importorg.apache.kafka.clients.producer.KafkaProducer; importorg.apache.kafka.clients.producer.Producer; importorg.apache.kafka.clients.producer.ProducerRecord; importjava.util.Properties; publicclassKafkaProducerExample{ publicstaticvoidmain(String[]args){ Propertiesprops=newProperties(); props.put("bootstrap.servers","master:9092"); props.put("acks","all"); props.put("retries",0); props.put("batch.size",16384); props.put("linger.ms",1); props.put("buffer.memory",33554432); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); Producerproducer=newKafkaProducer<>(props); for(inti=0;i<100;i++) producer.send(newProducerRecord<>("topic1",Integer.toString(i),Integer.toString(i))); producer.close(); } }
3.消费者代码
packagecom.lnho.example.kafka; importorg.apache.kafka.clients.consumer.ConsumerRecord; importorg.apache.kafka.clients.consumer.ConsumerRecords; importorg.apache.kafka.clients.consumer.KafkaConsumer; importjava.util.Arrays; importjava.util.Properties; publicclassKafkaConsumerExample{ publicstaticvoidmain(String[]args){ Propertiesprops=newProperties(); props.put("bootstrap.servers","master:9092"); props.put("group.id","test"); props.put("enable.auto.commit","true"); props.put("auto.commit.interval.ms","1000"); props.put("session.timeout.ms","30000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer=newKafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic1")); while(true){ ConsumerRecords records=consumer.poll(100); for(ConsumerRecord record:records) System.out.printf("offset=%d,key=%s,value=%s\n",record.offset(),record.key(),record.value()); } } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。