博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafkaAPI使用以及常用配置介绍
阅读量:4218 次
发布时间:2019-05-26

本文共 6599 字,大约阅读时间需要 21 分钟。

这篇文章主要介绍kafka中JAVA API的使用,这里面为了介绍配置,所以使用的是原生的javaapi操作,kafka可以与spring通过xml配置集成,或者更加简单通过spring boot引入starter,通过(AutoConfiguration)自动配置完成集成。但其实无论何种使用方式,其根本都是使用原生pai进行操作。

使用maven依赖管理,引入kafka依赖

org.apache.kafka
kafka-clients
1.1.0

然后是一个生产者的demo,代码如下

public class KafkaProducerDemo extends Thread{    private KafkaProducer
kafkaProducer; private String topic; private boolean isAsync; public KafkaProducerDemo(String topic, boolean isAysnc) { //配置kafka生产者的属性配置 Properties properties = new Properties(); //集群broker地址,多个broker地址逗号隔开 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.15:9092"); //设置生产者id properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo"); //设置发送消息ack模式 properties.put(ProducerConfig.ACKS_CONFIG,"-1"); //key序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); //value序列化类 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //设置批量发送消息的size properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 5); //延迟发送的时间,延迟时间内的消息一起发送到broker' properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000); //每次请求最大的字节数 properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024); kafkaProducer=new KafkaProducer
(properties); this.topic=topic; this.isAsync=isAysnc; } public KafkaProducerDemo(){ } @Override public void run() { int count = 50; if (isAsync){ do { kafkaProducer.send(new ProducerRecord
(topic, count,"isAsyncSend" + count), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { //异步发送回调函数,异步发送过程是类似队列消费过程,先将消息放到列表,然后有一个线程扫描这个列表,发现有消息则进行发送消费 if(recordMetadata!=null){ System.out.println("分区"+recordMetadata.partition()+ "\n 偏移"+recordMetadata.offset()); } } }); count--; }while (count >0); }else { //同步发送消息,get是阻塞进行的 do { try { RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord
(topic, count,"isAsyncSend" + count)).get(); System.out.println("分区"+recordMetadata.partition()+ "\n 偏移"+recordMetadata.offset()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } count--; }while (count >0); } } public void init() { } public static void main(String[] args) { KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo("test", true); kafkaProducerDemo.start(); }}

拿出比较重要的配置详细介绍,其实上面的配置常量,在对应的源码里面都有很详细的作用介绍,有兴趣可以自己去看。 

ProducerConfig.ACKS_CONFIG 
0:表示producer不会发送消息后等待任何broker的响应,无法保证消息是否成功发送到broker中。 
1:producer只需要得到分区副本中leader的确认就可以。 
all:producer需要等到分区副本中所有的副本对消息确认,才可以进行确认。

ProducerConfig.BATCH_SIZE_CONFIG 

属于一种优化策略,批量发送消息 
为了减少网络请求次数,采取批量发送消息的策略。同时批量发送时消息里面可能有发送到不同分区的消息,而分区也可能落在不同的broker,所以发送时时按不同分区来分发的。 
ProducerConfig.LINGER_MS_CONFIG 
同样属于一种优化策略,表示延时时间,在延时时间内进行一次消息批量发送,可以配合batch_size使用

kafka生产者的同步发送与异步发送 

这里的生产者发送消息存在两种模式:同步和异步。 

同步发送的时候,调用send方法后利用futrue阻塞等待返回结果,get()获得发送结果。 
异步发送时,先把要发送的消息放到一个列表,然后有一个工作线程负责扫描列表,发现有待发送消息的话就进行消息发送,同时获得发送结果后调用回调函数。

然后是kafka消费者api使用demo,代码如下。

public class KafkaConsumerDemo implements Runnable{    private KafkaConsumer kafkaConsumer = null;    public KafkaConsumerDemo(String topic) {        Properties properties=new Properties();        //连接broker集群地址,多个broker逗号隔开        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.15:9092");        //配置消费者所属的分组id        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");        //是否自动ack        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");        //若果是自动ack,那么自动ack的频率是多长        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");        //反序列化key        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,                "org.apache.kafka.common.serialization.IntegerDeserializer");        //反序列化value        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,                "org.apache.kafka.common.serialization.StringDeserializer");        //消费者加入group时,消费offset设置策略,earliest重置offset为最早的偏移地址,latest重置ofsset为已经消费的最大偏移,none没有offset时抛异常        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");        //每次允许拉取最大的消息数量        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);        kafkaConsumer=new KafkaConsumer(properties);        kafkaConsumer.subscribe(Collections.singletonList(topic));    }    public KafkaConsumerDemo() {    }    @Override    public void run() {        while(true){            //拉取消息,暂时不确认消息            ConsumerRecords
consumerRecord=kafkaConsumer.poll(1000); for(ConsumerRecord record:consumerRecord){ System.out.println("-------------message receive:"+record.value()); kafkaConsumer.commitAsync(); } } } public static void main(String[] args) { new Thread(new KafkaConsumerDemo("test")).start(); }}

ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 

跟别的mq中间件类似,消费者消费消息也有消息确认机制,这个配置true表示开启自动消息确认机制,否则需要手动确认消息

ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG 

假如开启了自动ack的话,表示多久进行一次批量ack,批量ack也是一种优化策略。类似 activeMQ的 optimizeAcknowledge批量优化。

ConsumerConfig.MAX_POLL_RECORDS_CONFIG 

表示一次可以最多拉取多少条消息进行消费,这个属于批量获取消息,类似activeMQ的prefecthSize.

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 

这个配置是针对新加入消费者组的消费者配置的,配置新消费者从哪个offset开始消费消息。

public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in been deleted): 
  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if consumer.
";

看了上面一段解释,可以知道这个设置是为了没有offset(初始化时或者对应 的offset消息被删了)的comsumer重置offset的策略 

earliest:重置offset为最小的未被消费的offset 
latest:重置offset为最新的offset 
none:没有offset时直接抛出异常

原文:https://blog.csdn.net/qq_20597727/article/details/81639283 

你可能感兴趣的文章
MySQL中乐观锁和悲观锁
查看>>
Java 中包装类型的比较注意事项
查看>>
Java查看字节码文件(基于JDK和IDEA)
查看>>
Java中如何存储金额的问题
查看>>
Spring 线程池定时监控
查看>>
Java 注解的原理及自定义注解
查看>>
Spring MyBatis generator自动生成配置
查看>>
java web中通过fork join来子任务拆分提高处理速度
查看>>
java面试题及答案
查看>>
常见的java查错题
查看>>
java面试题大全-代码与编程题
查看>>
java中equals和==的区别
查看>>
java中&与&&的区别
查看>>
JAVA数据类型间的相互转换
查看>>
js 操作select和option
查看>>
Java接口和Java抽象类
查看>>
java抽象类、接口和继承之间关系
查看>>
区分Tomcat与Web服务器、应用服务器的关系
查看>>
SQL语句效率问题的几点总结
查看>>
in和exists 区别
查看>>