Administrator
发布于 2024-11-24 / 6 阅读
0
0

spring boot引入Kafka

消费方application.yml配置

spring:
  application:
    name: kafka-consumer
  kafka:
    consumer:
      #kafka地址
      bootstrap-servers: localhost:9092
      #消费者id(传给Kafka服务器的ID,用于标识自己)
      client-id: kafka-consumer
      #消费者组id(消费者组的名称,用于标识自己所属的消费者组)
      group-id: kafka-consumer-group
      #提交偏移量时间隔(自动提交偏移量的时间间隔)
      auto-commit-interval: 1000
      #自动重置偏移量(当消费者组中没有可消费的消息时,自动重置偏移量)
      #  earliest:自动重置到最早的偏移量
      #  latest:自动重置到最新的偏移量
      #  none:不自动重置偏移量,需要手动提交偏移量
      #  exception:抛出异常,需要手动处理
      auto-offset-reset: earliest
      #是否开启自动提交偏移量
      enable-auto-commit: true
      #是否开启消费者自动重试
      enable-retry: true
      #消费者每次poll请求时,最多拉取的消息数量
      max-poll-records: 1000
      #消费者每次poll请求时,等待时间(单位:毫秒)
      poll-timeout: 10000
      #消费者每次poll请求时,请求超时时间(单位:毫秒)
      request-timeout: 40000
      #消费者每次poll请求时,请求失败重试次数
      retries: 3
      #服务器应该为消费者返回的最小字节数(单位:字节)
      fetch-min-size: 10000bytes
      #服务器应该为消费者返回的最大字节数(单位:字节)
      fetch-max-size: 1000000bytes
      #消费者的消费者线程数
      concurrency: 4
      #消费者的消费者线程池大小
      pool:
        max-size: 100
      #消费者的消费者线程池队列大小
      queue-size: 1000000
      heartbeat:
        #消费者心跳时间间隔(单位:毫秒)
        interval: 10000
        #消费者心跳超时时间(单位:毫秒)
        timeout: 10000
        #消费者心跳超时时间(单位:毫秒)
        max-retries: 3
        #消费者心跳超时时间(单位:毫秒)
        backoff: 10000
      key:
        #消费者的key反序列化器
        deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value:
        #消费者的value反序列化器
        deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
      #消费者的value反序列化器配置
        spring:
          json:
            trusted:
              #消费者所在包路径
              packages: com.test.kafka.consumer

pom引入Kafka的依赖包

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${kafka.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.client.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.client.version}</version>
</dependency>

生产者application.yml配置

spring:
  kafka:
    #生产者配置
    producer:
      #kafka地址
      bootstrap-servers: localhost:9092
      #生产者id(传给Kafka服务器的ID,用于标识自己)
      client-id: kafka-provider
      #生产者组id(生产者组的名称,用于标识自己所属的生产者组)
      group-id: kafka-provider-group
      #key序列化器
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #批量发送消息的大小(单位:字节)
      batch-size: 131072  #128kb
      #生产者缓冲区大小(单位:字节)
      buffer-memory: 67108864 #64M
      #重试次数
      retries: 1
      #acks(消息确认机制):
      #0表示生产者不等待broker确认消息,只要把消息发送出去就认为消息发送成功
      #all表示生产者等待所有ISR副本确认消息才认为消息发送成功 all≈-1
      #1表示leader确认消息(leader副本确认消息才认为消息发送成功)
      #-1表示所有ISR确认消息(Kafka中所有的副本都确认消息才认为消息发送成功)
      acks: 0
      #压缩类型(gzip、snappy、lz4、zstd)
      compression-type: gzip  #提升性能很重要
      properties:
        #单个请求的最大字节数(单位:字节)消息的最大大小,超过这个大小的消息会被拆分成多个消息发送,以避免单个请求过大导致的网络拥塞
        max.request.size: 5242880 #5M
        #发送消息的延迟时间(单位:毫秒),超过这个时间就会发送消息,以避免频繁发送消息导致的网络拥塞
        linger.ms: 5


评论