Kafka Java Producer代码实例详解
根据业务需要可以使用Kafka提供的JavaProducerAPI进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer
Kafka的ProducerAPI主要提供下列三个方法:
- publicvoidsend(KeyedMessage
message)发送单条数据到Kafka集群 - publicvoidsend(List
>messages)发送多条数据(数据集)到Kafka集群 - publicvoidclose()关闭Kafka连接资源
一、JavaKafkaProducerPartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到Topic的那个分区中,返回分区id,范围:[0,分区数量);这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:
importkafka.producer.Partitioner; importkafka.utils.VerifiableProperties; /** *Createdbygerryon12/21. */ publicclassJavaKafkaProducerPartitionerimplementsPartitioner{ /** *无参构造函数 */ publicJavaKafkaProducerPartitioner(){ this(newVerifiableProperties()); } /** *构造函数,必须给定 * *@paramproperties上下文 */ publicJavaKafkaProducerPartitioner(VerifiablePropertiesproperties){ //nothings } @Override publicintpartition(Objectkey,intnumPartitions){ intnum=Integer.valueOf(((String)key).replaceAll("key_","").trim()); returnnum%numPartitions; } }
二、JavaKafkaProducer:通过Kafka提供的API进行数据产生操作的测试类;具体代码如下:
importkafka.javaapi.producer.Producer; importkafka.producer.KeyedMessage; importkafka.producer.ProducerConfig; importorg.apache.log4j.Logger; importjava.util.Properties; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.atomic.AtomicBoolean; importjava.util.concurrent.ThreadLocalRandom; /** *Createdbygerryon12/21. */ publicclassJavaKafkaProducer{ privateLoggerlogger=Logger.getLogger(JavaKafkaProducer.class); publicstaticfinalStringTOPIC_NAME="test"; publicstaticfinalchar[]charts="qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray(); publicstaticfinalintchartsLength=charts.length; publicstaticvoidmain(String[]args){ StringbrokerList="192.168.187.149:9092"; brokerList="192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095"; brokerList="192.168.187.146:9092"; Propertiesprops=newProperties(); props.put("metadata.broker.list",brokerList); /** *0表示不等待结果返回
*1表示等待至少有一个服务器返回数据接收标识
*-1表示必须接收到所有的服务器返回标识,及同步写入
**/ props.put("request.required.acks","0"); /** *内部发送数据是异步还是同步 *sync:同步,默认 *async:异步 */ props.put("producer.type","async"); /** *设置序列化的类 *可选:kafka.serializer.StringEncoder *默认:kafka.serializer.DefaultEncoder */ props.put("serializer.class","kafka.serializer.StringEncoder"); /** *设置分区类 *根据key进行数据分区 *默认是:kafka.producer.DefaultPartitioner==>安装key的hash进行分区 *可选:kafka.serializer.ByteArrayPartitioner==>转换为字节数组后进行hash分区 */ props.put("partitioner.class","JavaKafkaProducerPartitioner"); //重试次数 props.put("message.send.max.retries","3"); //异步提交的时候(async),并发提交的记录数 props.put("batch.num.messages","200"); //设置缓冲区大小,默认10KB props.put("send.buffer.bytes","102400"); //2.构建KafkaProducerConfiguration上下文 ProducerConfigconfig=newProducerConfig(props); //3.构建Producer对象 finalProducerproducer=newProducer (config); //4.发送数据到服务器,并发线程发送 finalAtomicBooleanflag=newAtomicBoolean(true); intnumThreads=50; ExecutorServicepool=Executors.newFixedThreadPool(numThreads); for(inti=0;i<5;i++){ pool.submit(newThread(newRunnable(){ @Override publicvoidrun(){ while(flag.get()){ //发送数据 KeyedMessagemessage=generateKeyedMessage(); producer.send(message); System.out.println("发送数据:"+message); //休眠一下 try{ intleast=10; intbound=100; Thread.sleep(ThreadLocalRandom.current().nextInt(least,bound)); }catch(InterruptedExceptione){ e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+"shutdown...."); } },"Thread-"+i)); } //5.等待执行完成 longsleepMillis=600000; try{ Thread.sleep(sleepMillis); }catch(InterruptedExceptione){ e.printStackTrace(); } flag.set(false); //6.关闭资源 pool.shutdown(); try{ pool.awaitTermination(6,TimeUnit.SECONDS); }catch(InterruptedExceptione){ }finally{ producer.close();//最后之后调用 } } /** *产生一个消息 * *@return */ privatestaticKeyedMessage generateKeyedMessage(){ Stringkey="key_"+ThreadLocalRandom.current().nextInt(10,99); StringBuildersb=newStringBuilder(); intnum=ThreadLocalRandom.current().nextInt(1,5); for(inti=0;i 三、Pom.xml依赖配置如下
0.8.2.1 org.apache.kafka kafka_2.10 ${kafka.version} 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。