大型的网站建设,企业网站seo运营,施工企业安全团建小游戏,深圳网站建设 推荐xtdseo分布式消息队列kafka【三】—— 生产者进阶提升 文章目录 分布式消息队列kafka【三】—— 生产者进阶提升kafka生产者发送消息源码分析kafka生产者发送消息最佳实战topic常量生产者消费者 kafka生产者重要参数讲解对于这部分常见面试题 kafka生产者和消费者拦截器实现topic常量…分布式消息队列kafka【三】—— 生产者进阶提升文章目录分布式消息队列kafka【三】—— 生产者进阶提升kafka生产者发送消息源码分析kafka生产者发送消息最佳实战topic常量生产者消费者kafka生产者重要参数讲解对于这部分常见面试题kafka生产者和消费者拦截器实现topic常量生产者拦截器生产者消费者拦截器消费者kafka生产者和消费者序列化实现kafka之序列化反序列化topic常量自定义序列化类生产者自定义反序列化类消费者kafka分区器使用与最佳实践kafka之分区器topic常量自定义分区器生产者消费者自定义分区器在实际工作中的应用kafka生产者发送消息源码分析发送消息ProducerRecordpublicclassProducerRecordK,V{privatefinalStringtopic;privatefinalIntegerpartition;privatefinalHeadersheaders;privatefinalKkey;privatefinalVvalue;privatefinalLongtimestamp;}属性详解topic消息发送的主题partition分区可以指定不指定kafka内部会计算headers自定义的一些属性key消息选择分区的时候会用这个key作计算value具体消息体内容timestamp这条消息发送到kafka的时间必要的参数配置项bootstrap.servers连接kafka集群的地址ipport组成多个用英文逗号,隔开注意并不是需要配置集群所有的broker地址生产者会从给定的broker地址找到集群其他的broker地址。但是在实际工作中不建议只写集群中某个或者某几个建议全写。因为假设这几个节点宕机生产者就找不到kafka集群了key.serializer和value.serializer因为Kafka Broker在接收消息的时候必须要以二进制的方式接收所以必须要对key以及value做序列化client.id这个属性的目的是标记kafka生产者的ID不设置默认kafka会生成一个非空的字符串简化配置KeyProducerConfig类例如ProducerConfig.BOOTSTRAP_SERVERS_CONFIGKafkaProducer是线程安全的可以在多线程中共享同一个KafkaProducer实例对象也可以将KafkaProducer池化封装供其他线程或者线程池调用KafkaConsumer不是线程安全的消息内容ProducerRecord的构造方法publicProducerRecord(Stringtopic,Integerpartition,Longtimestamp,Kkey,Vvalue,IterableHeaderheaders){if(topicnull){thrownewIllegalArgumentException(Topic cannot be null.);}elseif(timestamp!nulltimestamp0L){thrownewIllegalArgumentException(String.format(Invalid timestamp: %d. Timestamp should always be non-negative or null.,timestamp));}elseif(partition!nullpartition0){thrownewIllegalArgumentException(String.format(Invalid partition: %d. Partition number should always be non-negative or null.,partition));}else{this.topictopic;this.partitionpartition;this.keykey;this.valuevalue;this.timestamptimestamp;this.headersnewRecordHeaders(headers);}}publicProducerRecord(Stringtopic,Integerpartition,Longtimestamp,Kkey,Vvalue){this(topic,partition,timestamp,key,value,(Iterable)null);}publicProducerRecord(Stringtopic,Integerpartition,Kkey,Vvalue,IterableHeaderheaders){this(topic,partition,(Long)null,key,value,headers);}publicProducerRecord(Stringtopic,Integerpartition,Kkey,Vvalue){this(topic,partition,(Long)null,key,value,(Iterable)null);}publicProducerRecord(Stringtopic,Kkey,Vvalue){this(topic,(Integer)null,(Long)null,key,value,(Iterable)null);}publicProducerRecord(Stringtopic,Vvalue){this(topic,(Integer)null,(Long)null,(Object)null,value,(Iterable)null);}KafkaProducer发送消息的方法publicvoidsendOffsetsToTransaction(MapTopicPartition,OffsetAndMetadataoffsets,StringconsumerGroupId)throwsProducerFencedException{this.throwIfNoTransactionManager();TransactionalRequestResultresultthis.transactionManager.sendOffsetsToTransaction(offsets,consumerGroupId);this.sender.wakeup();result.await();}publicFutureRecordMetadatasend(ProducerRecordK,Vrecord){returnthis.send(record,(Callback)null);}publicFutureRecordMetadatasend(ProducerRecordK,Vrecord,Callbackcallback){ProducerRecordK,VinterceptedRecordthis.interceptors.onSend(record);returnthis.doSend(interceptedRecord,callback);}KafkaProducer消息发送重试机制retries参数设置可重试异常不可重试异常可重试异常比如网络抖动导致请求未发送成功不可重试异常磁盘满了、消息体过大导致kafka报异常kafka生产者发送消息最佳实战ps对于一个新的主题kafka默认是可以在没有主题的情况下创建的。但是自动创建主题的特性在生产环境中一定是禁用的。topic常量packagecom.bfxy.kafka.api.constant;publicinterfaceConst{StringTOPIC_NORMALtopic-normal;}生产者packagecom.bfxy.kafka.api.producer;importcom.alibaba.fastjson.JSON;importcom.bfxy.kafka.api.constant.Const;importcom.bfxy.kafka.api.model.User;importorg.apache.kafka.clients.producer.*;importorg.apache.kafka.common.errors.LeaderNotAvailableException;importorg.apache.kafka.common.errors.NetworkException;importorg.apache.kafka.common.errors.RecordTooLargeException;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.Future;publicclassNormalProducer{publicstaticvoidmain(String[]args){PropertiespropertiesnewProperties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);properties.put(ProducerConfig.CLIENT_ID_CONFIG,normal-producer);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// kafka消息的重试机制RETRIES_CONFIG参数默认是0// 作用是如果发送失败判断是否属于可重试异常如果可以重试则放入队列等待再次轮询如果不可以重试则抛出异常由业务方catch处理properties.put(ProducerConfig.RETRIES_CONFIG,3);// 可重试异常// org.apache.kafka.common.errors.NetworkException比如网络产生一些异常抖动没有收到请求或者响应或者消息没发出去// org.apache.kafka.common.errors.LeaderNotAvailableException比如当前一刹那kafka集群没有leader可能正在选举的过程中// 不可重试异常// org.apache.kafka.common.errors.RecordTooLargeException消息体过大try(KafkaProducerString,StringproducernewKafkaProducer(properties)){UserusernewUser(001,xiao xiao);ProducerRecordString,StringrecordnewProducerRecord(Const.TOPIC_NORMAL,JSON.toJSONString(user));/** * 一条消息必须通过key去计算出来实际的partition按照partition去存储的 * 新创建消息ProducerRecord( * topictopic_normal, * partitionnull, * headersRecordHeaders(headers [], isReadOnly false), * keynull, * value{id:001,name:xiao xiao}, * timestampnull) * 可以看到新创建出来的消息的key和partition是空的所以必定是在下面发送消息的send()方法中计算得到的partition可以通过查看源码发现它是通过一些算法计算出来的 */System.out.println(新创建消息record);// 一个参数的send方法本质上也是异步的因为这个方法返回的是一个java.util.concurrent.Future对象// 下面这种调用一个参数send()方法后直接调用get()方法可以认为这是同步阻塞的// FutureRecordMetadata future producer.send(record);// RecordMetadata recordMetadata future.get();// System.out.println(String.format(分区%s偏移量%s时间戳%s, recordMetadata.partition(), recordMetadata.offset(), recordMetadata.timestamp()));// 带有两个参数send()方法是完全异步的在回调Callback()方法中得到发送消息的结果producer.send(record,newCallback(){OverridepublicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione){if(null!e){e.printStackTrace();}System.out.println(String.format(分区%s偏移量%s时间戳%s,recordMetadata.partition(),recordMetadata.offset(),recordMetadata.timestamp()));}});}catch(Exceptione){e.printStackTrace();}}}消费者packagecom.bfxy.kafka.api.producer;importcom.bfxy.kafka.api.constant.Const;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Properties;publicclassNormalConsumer{publicstaticvoidmain(String[]args){PropertiespropertiesnewProperties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,normal-group);properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10000);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);try(KafkaConsumerString,StringconsumernewKafkaConsumer(properties)){consumer.subscribe(Collections.singletonList(Const.TOPIC_NORMAL));System.out.println(normal consumer started...);while(true){ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofMillis(1000));for(TopicPartitionpartition:records.partitions()){ListConsumerRecordString,StringpartitionRecordListrecords.records(partition);Stringtopicpartition.topic();intsizepartitionRecordList.size();System.out.println(String.format(--- 获取topic%s分区位置%s消息总数%s ---,topic,partition.partition(),size));for(ConsumerRecordString,StringconsumerRecord:partitionRecordList){StringvalueconsumerRecord.value();longoffsetconsumerRecord.offset();longcommitOffsetoffset1;System.out.println(String.format(-- 获取实际消息value%s消息offset%s提交offset%s ---,value,offset,commitOffset));}}}}catch(Exceptione){e.printStackTrace();}}}kafka生产者重要参数讲解acks指定发送消息后Broker端至少有多少个副本接收到该消息用于kafka在可靠性和吞吐量之间的折中选择。默认为acks1acks1生产者发送消息之后只需要leader副本成功写入消息之后就能够收到来自服务端的成功响应可以确保基本99.9%消息的可靠性投递但是可能存在这种情况leader副本写入成功后follow副本正准备同步消息之前leader副本宕机重新选取leader副本此时选取的leader副本是没有该条消息的导致这条消息投递失败acks0生产者发送消息之后不需要等待任何服务端的响应生产端不需要等待任何服务端的响应可能会出现kafka broker写入异常生产端感知不到适合不需要可靠性投递的场景acks-1/acksall生产者在消息发送后需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应适合想要保证安全和可靠性投递的场景但是这个设置也无法保证100%消息的可靠性投递因为可能ISR中只有leader副本follow副本都在OSR中还是相当于acks1的情况此时消息还可能会投递失败如果要保证100%消息的可靠性投递需要配合min.insync.replicasN使用代表消息如何才能被认为是写入成功设置大于1的数保证至少写入1个或者以上的副本才算写入消息成功。比如现在有1个leader副本和2个follow副本设置min.insync.replicas2但是若此时follow副本就是在OSR中这样就会导致kafka性能直线下降所以100%消息的可靠性投递只通过这些中间件很难权衡和保证的因为中间件是需要对性能和可靠性进行权衡的。一般可以通过同上面RabbitMQ的可靠性投递方案类似通过定时任务把未投递成功的消息再作一次重发允许消息多发但是不允许少发。这是保证100%消息的可靠性投递最好的解决方案。max.request.size该参数用来限制生产者客户端能发送的消息最大值默认值1048576即1MB一般情况默认值可以满足大多数场景不建议调整如果要调整这个参数还有一些联动的配置项也需要调整retries重试次数默认是0作用是如果发送失败判断是否属于可重试异常如果可以重试则放入队列等待再次轮询如果不可以重试则抛出异常由业务方catch处理可重试异常org.apache.kafka.common.errors.NetworkException比如网络产生一些异常抖动没有收到请求或者响应或者消息没发出去org.apache.kafka.common.errors.LeaderNotAvailableException比如当前一刹那kafka集群没有leader可能正在选举的过程中不可重试异常org.apache.kafka.common.errors.RecordTooLargeException消息体过大retry.backoff.msretries重试间隔默认100mscompression.type这个参数用来指定消息的压缩方式默认值为“none”可选配置“gzip”、“snappy和lz4”在特定情境下对于过大的消息体可以设置消息的压缩方式可以减少网络流量降低网络IO的成本提升整体的性能但是同样压缩也会占用磁盘和CPUconnections.max.idle.ms这个参数用来指定在多久之后关闭限制的连接默认值是540000ms即9分钟linger.ms这个参数用来指定生产者发送ProducerBatch之前等待更多消息ProducerRecord加入ProducerBatch的时间默认值为0batch.size累计多少条消息则一次进行批量发送buffer.memory缓存提升性能参数默认为32Mps后面这三个参数可以配合使用实际上就是设置消息的批量发送提升性能这三个参数同时配置的话之间的关系是或只要满足其中一个就会发送。而且在真正发送消息之前消息都是存在内存中的。receive.buffer.bytes这个参数用来设置Socket接收消息缓冲区SO_RECBUF的大小默认值为32768B即32KBsend.buffer.bytes这个参数用来设置Socket发送消息缓冲区SO_SNDBUF的大小默认值为131072B即128KBrequest.timeout.ms这个参数用来配置Producer等待请求响应的最长时间默认值为30000ms对于这部分常见面试题说一下acks 3个取值的含义分别适用什么样的应用场景ack-1/acksall 是不是一定能够保证100%消息的可靠性投递呢kafka生产者和消费者拦截器实现拦截器interceptorkafka对应着有生产者和消费者两种拦截器生产者实现接口org.apache.kafka.clients.producer.ProducerInterceptor消费者实现接口org.apache.kafka.clients.consumer.ConsumerInterceptortopic常量packagecom.bfxy.kafka.api.constant;publicinterfaceConst{StringTOPIC_INTERCEPTORtopic-interceptor;}生产者拦截器packagecom.bfxy.kafka.api.interceptor;importorg.apache.kafka.clients.producer.ProducerInterceptor;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importjava.util.Map;publicclassCustomProducerInterceptorimplementsProducerInterceptorString,String{privatevolatilelongsuccess0;privatevolatilelongfailure0;/** * 发送消息之前的切面拦截 * param producerRecord * return */OverridepublicProducerRecordString,StringonSend(ProducerRecordString,StringproducerRecord){System.out.println(--------------- 生产者发送消息前置拦截器 ----------------);StringmodifyValueprefix-producerRecord.value();returnnewProducerRecord(producerRecord.topic(),producerRecord.partition(),producerRecord.timestamp(),producerRecord.key(),modifyValue,producerRecord.headers());}/** * 发送消息之后的切面拦截 * param recordMetadata * param e */OverridepublicvoidonAcknowledgement(RecordMetadatarecordMetadata,Exceptione){System.out.println(--------------- 生产者发送消息后置拦截器 ----------------);if(nulle){success;}else{failure;}}Overridepublicvoidclose(){doublesuccessRatio(double)success/(successfailure);System.out.println(String.format(生产者关闭发送消息的成功率为%s %%,successRatio*100));}Overridepublicvoidconfigure(MapString,?map){}}生产者packagecom.bfxy.kafka.api.interceptor;importcom.alibaba.fastjson.JSON;importcom.bfxy.kafka.api.constant.Const;importcom.bfxy.kafka.api.model.User;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;publicclassInterceptorProducer{publicstaticvoidmain(String[]args){PropertiespropertiesnewProperties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);properties.put(ProducerConfig.CLIENT_ID_CONFIG,interceptor-producer);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加生产者拦截器属性生产者拦截器可以配置多个properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomProducerInterceptor.class.getName());try(KafkaProducerString,StringproducernewKafkaProducer(properties)){for(inti0;i10;i){UserusernewUser(00i,张三);ProducerRecordString,StringrecordnewProducerRecord(Const.TOPIC_INTERCEPTOR,JSON.toJSONString(user));producer.send(record);}}catch(Exceptione){e.printStackTrace();}}}消费者拦截器packagecom.bfxy.kafka.api.interceptor;importorg.apache.kafka.clients.consumer.ConsumerInterceptor;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.OffsetAndMetadata;importorg.apache.kafka.common.TopicPartition;importjava.util.Map;publicclassCustomConsumerInterceptorimplementsConsumerInterceptorString,String{/** * 消费者接收到消息处理之前的拦截器 * param consumerRecords * return */OverridepublicConsumerRecordsString,StringonConsume(ConsumerRecordsString,StringconsumerRecords){System.out.println(--------------- 消费者前置拦截器接收消息 ----------------);returnconsumerRecords;}/** * 消费者接收到消息处理完成后的拦截器 * 如果配置了自动提交AUTO_COMMIT_INTERVAL_MS_CONFIG没有消息处理完成也会每隔对应的时间执行一次该拦截器 * 手动提交不会则只会在接收到消息处理完成后才会执行该拦截器 * param map */OverridepublicvoidonCommit(MapTopicPartition,OffsetAndMetadatamap){map.forEach((tp,offset)-{System.out.println(消费者处理完成分区tp偏移量offset);});}Overridepublicvoidclose(){}Overridepublicvoidconfigure(MapString,?map){}}消费者packagecom.bfxy.kafka.api.interceptor;importcom.bfxy.kafka.api.constant.Const;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Properties;publicclassInterceptorConsumer{publicstaticvoidmain(String[]args){PropertiespropertiesnewProperties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,interceptor-group);properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10000);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);// 添加消费端拦截器属性properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomConsumerInterceptor.class.getName());try(KafkaConsumerString,StringconsumernewKafkaConsumer(properties)){consumer.subscribe(Collections.singletonList(Const.TOPIC_INTERCEPTOR));System.out.println(interceptor consumer started...);while(true){ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofMillis(1000));for(TopicPartitionpartition:records.partitions()){ListConsumerRecordString,StringpartitionRecordListrecords.records(partition);Stringtopicpartition.topic();intsizepartitionRecordList.size();System.out.println(String.format(--- 获取topic%s分区位置%s消息总数%s ---,topic,partition.partition(),size));for(ConsumerRecordString,StringconsumerRecord:partitionRecordList){StringvalueconsumerRecord.value();longoffsetconsumerRecord.offset();longcommitOffsetoffset1;System.out.println(String.format(-- 获取实际消息value%s消息offset%s提交offset%s ---,value,offset,commitOffset));}}}}catch(Exceptione){e.printStackTrace();}}}kafka生产者和消费者序列化实现kafka之序列化反序列化序列化反序列化生产者需要用序列化器Serializer把对象转换成字节数组才能通过网络发送给kafka而在对侧消费者需要用反序列化器Deserializer把从kafka中接收到的字节数组转换成对应的对象序列化接口org.apache.kafka.common.serialization.Serializer反序列化接口org.apache.kafka.common.serialization.Deserializertopic常量packagecom.bfxy.kafka.api.constant;publicinterfaceConst{StringTOPIC_SERIALtopic-serial;}自定义序列化类packagecom.bfxy.kafka.api.serial;importcom.bfxy.kafka.api.model.User;importorg.apache.kafka.common.serialization.Serializer;importjava.nio.ByteBuffer;importjava.nio.charset.StandardCharsets;importjava.util.Map;publicclassUserSerializerimplementsSerializerUser{Overridepublicvoidconfigure(MapString,?map,booleanb){}Overridepublicbyte[]serialize(Strings,Useruser){if(nulluser){returnnull;}byte[]idBytes,nameBytes;try{Stringiduser.getId();Stringnameuser.getName();if(null!id){idBytesid.getBytes(StandardCharsets.UTF_8);}else{idBytesnewbyte[0];}if(null!name){nameBytesname.getBytes(StandardCharsets.UTF_8);}else{nameBytesnewbyte[0];}ByteBufferbufferByteBuffer.allocate(44idBytes.lengthnameBytes.length);// 4个字节也就是一个int类型长度这个putInt(int value)方法放idBytes的实际真实长度buffer.putInt(idBytes.length);// put(byte[] src)实际放的是idBytes真实的字节数组也就是内容buffer.put(idBytes);// 4个字节也就是一个int类型长度这个putInt(int value)方法放nameBytes的实际真实长度buffer.putInt(nameBytes.length);// put(byte[] src)实际放的是nameBytes真实的字节数组也就是内容buffer.put(nameBytes);returnbuffer.array();}catch(Exceptione){e.printStackTrace();}returnnewbyte[0];}Overridepublicvoidclose(){}}生产者packagecom.bfxy.kafka.api.serial;importcom.alibaba.fastjson.JSON;importcom.bfxy.kafka.api.constant.Const;importcom.bfxy.kafka.api.interceptor.CustomProducerInterceptor;importcom.bfxy.kafka.api.model.User;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;publicclassSerializerProducer{publicstaticvoidmain(String[]args){PropertiespropertiesnewProperties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);properties.put(ProducerConfig.CLIENT_ID_CONFIG,serial-producer);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 添加序列化value使用自定义的序列化类properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,UserSerializer.class.getName());try(KafkaProducerString,UserproducernewKafkaProducer(properties)){for(inti0;i10;i){UserusernewUser(00i,张三);ProducerRecordString,UserrecordnewProducerRecord(Const.TOPIC_SERIAL,user);producer.send(record);}}catch(Exceptione){e.printStackTrace();}}}自定义反序列化类packagecom.bfxy.kafka.api.serial;importcom.bfxy.kafka.api.model.User;importorg.apache.kafka.common.errors.SerializationException;importorg.apache.kafka.common.serialization.Deserializer;importjava.nio.ByteBuffer;importjava.nio.charset.StandardCharsets;importjava.util.Map;publicclassUserDeserializerimplementsDeserializerUser{Overridepublicvoidconfigure(MapString,?map,booleanb){}OverridepublicUserdeserialize(Strings,byte[]bytes){if(nullbytes){returnnull;}if(bytes.length8){thrownewSerializationException(size is wrong, must be data.length 8);}ByteBufferbufferByteBuffer.wrap(bytes);// idBytes字节数组的真实长度intidLenbuffer.getInt();byte[]idBytesnewbyte[idLen];buffer.get(idBytes);// nameBytes字节数组的真实长度intnameLenbuffer.getInt();byte[]nameBytesnewbyte[nameLen];buffer.get(nameBytes);Stringid,name;try{idnewString(idBytes,StandardCharsets.UTF_8);namenewString(nameBytes,StandardCharsets.UTF_8);}catch(Exceptione){thrownewSerializationException(deserializing error!,e);}returnnewUser(id,name);}Overridepublicvoidclose(){}}消费者packagecom.bfxy.kafka.api.serial;importcom.bfxy.kafka.api.constant.Const;importcom.bfxy.kafka.api.interceptor.CustomConsumerInterceptor;importcom.bfxy.kafka.api.model.User;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Properties;publicclassDeserializerConsumer{publicstaticvoidmain(String[]args){PropertiespropertiesnewProperties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 反序列化属性参数配置使用自定义的序列化类properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,UserDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,serial-group);properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10000);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);try(KafkaConsumerString,UserconsumernewKafkaConsumer(properties)){consumer.subscribe(Collections.singletonList(Const.TOPIC_SERIAL));System.out.println(serial consumer started...);while(true){ConsumerRecordsString,Userrecordsconsumer.poll(Duration.ofMillis(1000));for(TopicPartitionpartition:records.partitions()){ListConsumerRecordString,UserpartitionRecordListrecords.records(partition);Stringtopicpartition.topic();intsizepartitionRecordList.size();System.out.println(String.format(--- 获取topic%s分区位置%s消息总数%s ---,topic,partition.partition(),size));for(ConsumerRecordString,UserconsumerRecord:partitionRecordList){UseruserconsumerRecord.value();longoffsetconsumerRecord.offset();longcommitOffsetoffset1;System.out.println(String.format(-- 获取实际消息value%s消息offset%s提交offset%s ---,user,offset,commitOffset));}}}}catch(Exceptione){e.printStackTrace();}}}kafka分区器使用与最佳实践kafka之分区器如图kafka producer发送消息依次经过生产者拦截器、序列化器、分区器拿到partition分区的序号后最后再存到kafka broker上。这里的record就是指ProducerRecord如果没有指定partition分区就会通过分区器内部算法根据key计算一个分区序号如果指定了partition分区就不会走分区器直接根据指定的partition分区存到kafka broker上。topic常量packagecom.bfxy.kafka.api.constant;publicinterfaceConst{StringTOPIC_PARTITIONtopic-partition;}自定义分区器packagecom.bfxy.kafka.api.partition;importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importorg.apache.kafka.common.utils.Utils;importjava.util.List;importjava.util.Map;importjava.util.concurrent.atomic.AtomicInteger;publicclassCustomPartitionerimplementsPartitioner{privatefinalAtomicIntegercounternewAtomicInteger(0);Overridepublicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){ListPartitionInfopartitionInfoListcluster.partitionsForTopic(topic);intnumOfPartitionpartitionInfoList.size();System.out.println(------ 进入自定义分区器当前分区个数numOfPartition);if(nullkeyBytes){returncounter.getAndIncrement()%numOfPartition;}else{returnUtils.toPositive(Utils.murmur2(keyBytes))%numOfPartition;}}Overridepublicvoidclose(){}Overridepublicvoidconfigure(MapString,?map){}}生产者packagecom.bfxy.kafka.api.partition;importcom.alibaba.fastjson.JSON;importcom.bfxy.kafka.api.constant.Const;importcom.bfxy.kafka.api.interceptor.CustomProducerInterceptor;importcom.bfxy.kafka.api.model.User;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;publicclassPartitionProducer{publicstaticvoidmain(String[]args){PropertiespropertiesnewProperties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);properties.put(ProducerConfig.CLIENT_ID_CONFIG,partition-producer);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加分区器配置使用自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getName());try(KafkaProducerString,StringproducernewKafkaProducer(properties)){for(inti0;i10;i){UserusernewUser(00i,张三);ProducerRecordString,StringrecordnewProducerRecord(Const.TOPIC_PARTITION,JSON.toJSONString(user));producer.send(record);}}catch(Exceptione){e.printStackTrace();}}}消费者packagecom.bfxy.kafka.api.partition;importcom.bfxy.kafka.api.constant.Const;importcom.bfxy.kafka.api.interceptor.CustomConsumerInterceptor;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Properties;publicclassPartitionConsumer{publicstaticvoidmain(String[]args){PropertiespropertiesnewProperties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.218.21:9092);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,partition-group);properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10000);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);try(KafkaConsumerString,StringconsumernewKafkaConsumer(properties)){consumer.subscribe(Collections.singletonList(Const.TOPIC_PARTITION));System.out.println(partition consumer started...);while(true){ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofMillis(1000));for(TopicPartitionpartition:records.partitions()){ListConsumerRecordString,StringpartitionRecordListrecords.records(partition);Stringtopicpartition.topic();intsizepartitionRecordList.size();System.out.println(String.format(--- 获取topic%s分区位置%s消息总数%s ---,topic,partition.partition(),size));for(ConsumerRecordString,StringconsumerRecord:partitionRecordList){StringvalueconsumerRecord.value();longoffsetconsumerRecord.offset();longcommitOffsetoffset1;System.out.println(String.format(-- 获取实际消息value%s消息offset%s提交offset%s ---,value,offset,commitOffset));}}}}catch(Exceptione){e.printStackTrace();}}}自定义分区器在实际工作中的应用假设现在我们的kafka broker的topicA存在4个partition此时还存在4个consumer代表4个供应商对应从4个partition中消费消息因为我们的partition和队列一样先进先出是实现顺序消息的。我们的消息是包含供应商ID的然后通过自定义分区器Custom Partitioner根据供应商ID作partition的选择相同的供应商ID会流入同一个partition可以保证不同供应商ID的消息会相互不冲突这样每个供应商都能及时的拿到具体的订单第一时间消费消息。假设比如partition1中有consumer1、consumer2、consumer3、consumer4的消息而consumer1这个供应商的消息特别多consumer2、consumer3、consumer4这几个供应商的消息非常少此时就存在consumer1这个供应商因为消息过多一直处于消费状态consumer2、consumer3、consumer4这几个供应商的无法及时消费消息。