本文共 6599 字,大约阅读时间需要 21 分钟。
这篇文章主要介绍kafka中JAVA API的使用,这里面为了介绍配置,所以使用的是原生的javaapi操作,kafka可以与spring通过xml配置集成,或者更加简单通过spring boot引入starter,通过(AutoConfiguration)自动配置完成集成。但其实无论何种使用方式,其根本都是使用原生pai进行操作。
使用maven依赖管理,引入kafka依赖
org.apache.kafka kafka-clients 1.1.0
然后是一个生产者的demo,代码如下
public class KafkaProducerDemo extends Thread{ private KafkaProducerkafkaProducer; private String topic; private boolean isAsync; public KafkaProducerDemo(String topic, boolean isAysnc) { //配置kafka生产者的属性配置 Properties properties = new Properties(); //集群broker地址,多个broker地址逗号隔开 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.15:9092"); //设置生产者id properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo"); //设置发送消息ack模式 properties.put(ProducerConfig.ACKS_CONFIG,"-1"); //key序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); //value序列化类 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //设置批量发送消息的size properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 5); //延迟发送的时间,延迟时间内的消息一起发送到broker' properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000); //每次请求最大的字节数 properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024); kafkaProducer=new KafkaProducer (properties); this.topic=topic; this.isAsync=isAysnc; } public KafkaProducerDemo(){ } @Override public void run() { int count = 50; if (isAsync){ do { kafkaProducer.send(new ProducerRecord (topic, count,"isAsyncSend" + count), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { //异步发送回调函数,异步发送过程是类似队列消费过程,先将消息放到列表,然后有一个线程扫描这个列表,发现有消息则进行发送消费 if(recordMetadata!=null){ System.out.println("分区"+recordMetadata.partition()+ "\n 偏移"+recordMetadata.offset()); } } }); count--; }while (count >0); }else { //同步发送消息,get是阻塞进行的 do { try { RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord (topic, count,"isAsyncSend" + count)).get(); System.out.println("分区"+recordMetadata.partition()+ "\n 偏移"+recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } count--; }while (count >0); } } public void init() { } public static void main(String[] args) { KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo("test", true); kafkaProducerDemo.start(); }}
拿出比较重要的配置详细介绍,其实上面的配置常量,在对应的源码里面都有很详细的作用介绍,有兴趣可以自己去看。
ProducerConfig.ACKS_CONFIG 0:表示producer不会发送消息后等待任何broker的响应,无法保证消息是否成功发送到broker中。 1:producer只需要得到分区副本中leader的确认就可以。 all:producer需要等到分区副本中所有的副本对消息确认,才可以进行确认。ProducerConfig.BATCH_SIZE_CONFIG
属于一种优化策略,批量发送消息 为了减少网络请求次数,采取批量发送消息的策略。同时批量发送时消息里面可能有发送到不同分区的消息,而分区也可能落在不同的broker,所以发送时时按不同分区来分发的。 ProducerConfig.LINGER_MS_CONFIG 同样属于一种优化策略,表示延时时间,在延时时间内进行一次消息批量发送,可以配合batch_size使用kafka生产者的同步发送与异步发送
这里的生产者发送消息存在两种模式:同步和异步。
同步发送的时候,调用send方法后利用futrue阻塞等待返回结果,get()获得发送结果。 异步发送时,先把要发送的消息放到一个列表,然后有一个工作线程负责扫描列表,发现有待发送消息的话就进行消息发送,同时获得发送结果后调用回调函数。然后是kafka消费者api使用demo,代码如下。
public class KafkaConsumerDemo implements Runnable{ private KafkaConsumer kafkaConsumer = null; public KafkaConsumerDemo(String topic) { Properties properties=new Properties(); //连接broker集群地址,多个broker逗号隔开 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.15:9092"); //配置消费者所属的分组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo"); //是否自动ack properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //若果是自动ack,那么自动ack的频率是多长 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //反序列化key properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //反序列化value properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //消费者加入group时,消费offset设置策略,earliest重置offset为最早的偏移地址,latest重置ofsset为已经消费的最大偏移,none没有offset时抛异常 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //每次允许拉取最大的消息数量 properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5); kafkaConsumer=new KafkaConsumer(properties); kafkaConsumer.subscribe(Collections.singletonList(topic)); } public KafkaConsumerDemo() { } @Override public void run() { while(true){ //拉取消息,暂时不确认消息 ConsumerRecordsconsumerRecord=kafkaConsumer.poll(1000); for(ConsumerRecord record:consumerRecord){ System.out.println("-------------message receive:"+record.value()); kafkaConsumer.commitAsync(); } } } public static void main(String[] args) { new Thread(new KafkaConsumerDemo("test")).start(); }}
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
跟别的mq中间件类似,消费者消费消息也有消息确认机制,这个配置true表示开启自动消息确认机制,否则需要手动确认消息ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
假如开启了自动ack的话,表示多久进行一次批量ack,批量ack也是一种优化策略。类似 activeMQ的 optimizeAcknowledge批量优化。ConsumerConfig.MAX_POLL_RECORDS_CONFIG
表示一次可以最多拉取多少条消息进行消费,这个属于批量获取消息,类似activeMQ的prefecthSize.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
这个配置是针对新加入消费者组的消费者配置的,配置新消费者从哪个offset开始消费消息。public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in been deleted):
看了上面一段解释,可以知道这个设置是为了没有offset(初始化时或者对应 的offset消息被删了)的comsumer重置offset的策略
earliest:重置offset为最小的未被消费的offset latest:重置offset为最新的offset none:没有offset时直接抛出异常 原文:https://blog.csdn.net/qq_20597727/article/details/81639283