8Kafka 使用 · SpringCloud微服务实战 · 看云

导航


本节代码地址


前面我们讲解了RabbitMQ,也了解了RabbitMQ的工作模式,下面我们来看一下Kafka 是如何进行消息发送的,Kafka 号称性能怪兽,吞吐量非常高,当然kafka 设计之初就是为了解决高吞吐量的问题。Kafka 也一直被用与大数据、海量日志的处理等问题。

1.新建项目

新建项目模块fw-cloud-mq-kafka用来测试Kafka 的生产和消息,本文并不会测试Kafka 的海量数据,仅仅为了Kafka的使用。
31ca53986963d869f35c5248a7601a05_MD5.webp

2.maven 配置

配置中我们主要引入了spring-kafka,这是Spring 为Kafka 提供的工具包

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

3.新建启动类

本项目并没有加入到微服务中,如果有需要,读者可以自行加入,当然如果加入了可以从RESTFUL接口接收数据,再推送到对应的Topic 中。

@SpringBootApplication
public class FwKafkaMqApplication {
    public static void main(String[] args) {
        SpringApplication.run(FwKafkaMqApplication.class, args);
    }
}

4.应用配置

server:
  port: 8781
spring:
  application:
    name: fw-cloud-mq-kafka
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: 1
      retries: 0
      batch-size: 16384
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      group-id: testGroup
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer



kafka 每个配置对应的意思如下:

  • kafka.bootstrap-servers
    指定kafka server的地址,集群配多个,中间,逗号隔开
  • kafka.producer.asks
    可以设置的值为:all, -1, 0, 1
    procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
    acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
  • kafka.producer.retries
    写入失败时,重试次数。当leader节点失效,一个副本节点会替代成为leader节点,此时可能出现写入失败。
    当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
  • kafka.producer.batch-size
    每次批量发送消息的数量,produce积累到一定数据,一次发送
  • kafka.producer.key-serializer
    指定消息key编解码方式
  • kafka.producer.value-serializer
    指定消息体的编解码方式
  • kafka.consumer.group-id
    指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
  • kafka.consumer.auto-offset-reset
    smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
  • kafka.consumer.enable-auto-commit
    设置自动提交offset
  • kafka.consumer.auto-commit-interval
    如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
  • kafka.consumer.key-deserializer
    指定消息key编解码方式
  • kafka.consumer.value-deserializer
    指定消息体编解码方式
    当然kafka 还有很多配置,我们简单使用这些已经够了

5.新建发送方

发送方需要引入KafkaTemplate,用来发送消息,发送的时候我们指定消息的topic 和内容。并把消息内容打印出来。并把发送类注册为组件。


@Component
@Slf4j
public class FwSender {

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    public boolean send(){
        String message="Hello World:"+ DateUtil.now();
        log.info("FwSender:"+message);
        
        kafkaTemplate.send("fwcloud",message);
        return true;
    }
}

6.新建消费方

消费放需要添加一个公共方法并设置@KafkaListener和需要监听的Topic,就可以实现消息的监听并消费了。


@Component
@Slf4j
public class FwReceiver {

    @KafkaListener(topics = "fwcloud")
    public void onMessage(String message){
        log.info(message);
    }
}

7.启动项目

启动项目前,需要先启动kafka
然后启动单元测试FwSenderTest发送消息
通过控制台我们看推送的消息

2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34

然后我们再看一下消费的信息

2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34