004 Kafka异常处理

news/2025/2/27 11:01:34

6.异常处理

文章目录

  • 6.异常处理
      • 1.异常分类与处理原则
      • 2.生产者异常处理
        • 1. 同步发送捕获异常
        • 2. 异步发送回调处理
      • 3.消费者异常处理
        • 1.全局异常处理器
        • 2.方法级处理
        • 3.重试yml配置
      • 4.死信队列(DLQ)配置
        • 1. 启用死信队列
        • 2. 手动发送到DLQ
      • 5.事务场景异常处理
        • 1. 声明式事务
        • 2. 事务异常回滚
      • 6.监控与告警
        • 1. Actuator 健康检查
        • 2. Prometheus 指标
      • 7.完整异常处理流程
      • 8.最佳实践总结

来源参考的deepseek,如有侵权联系立删

1.异常分类与处理原则

异常类型典型场景处理建议
可恢复异常网络抖动、数据库锁冲突重试机制(有限次数 + 退避策略)
不可恢复异常消息格式错误、权限不足直接记录日志并进入死信队列
事务异常事务超时、生产者ID冲突终止事务并回滚操作

2.生产者异常处理

1. 同步发送捕获异常
public void sendSync(String topic, String message) {
    try {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.get(5, TimeUnit.SECONDS); // 阻塞等待结果
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        // 记录日志并触发补偿逻辑
        log.error("消息发送失败: {}", e.getMessage());
        throw new BusinessException("消息发送失败", e);
    }
}
2. 异步发送回调处理
public void sendAsync(String topic, String message) {
    kafkaTemplate.send(topic, message).addCallback(
        result -> {
            // 发送成功处理
            log.info("消息发送成功: topic={}", result.getRecordMetadata().topic());
        },
        ex -> {
            // 发送失败处理
            log.error("消息发送失败", ex);
            if (ex instanceof RetriableException) {
                // 可重试异常(如网络问题)
                retrySend(topic, message);
            } else {
                // 不可重试异常(如消息过大)
                deadLetterService.saveToDlq(topic, message);
            }
        }
    );
}

3.消费者异常处理

1.全局异常处理器
@Configuration
public class KafkaGlobalErrorConfig {

    // 定义全局错误处理器(支持批量/单消息模式)
    @Bean
    public CommonErrorHandler globalErrorHandler(KafkaTemplate<String, Object> template) {
        // 重试策略:3次重试,间隔5秒
        DefaultErrorHandler handler = new DefaultErrorHandler(
                new DeadLetterPublishingRecoverer(template), // 死信队列恢复器
                new FixedBackOff(5000L, 3)
        );

        // 指定可重试异常类型
        handler.addRetryableExceptions(NetworkException.class);
        handler.addNotRetryableExceptions(SerializationException.class);

        // 偏移量提交策略
        handler.setCommitRecovered(true);
        return handler;
    }



    // 容器工厂绑定全局处理器
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory,
            CommonErrorHandler globalErrorHandler) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setCommonErrorHandler(globalErrorHandler);
        return factory;
    }
}
2.方法级处理
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;

@Slf4j
@Configuration
public class KafkaExceptionConfig {

    /**
     * 自定义异常处理器
     */
    @Bean
    public ConsumerAwareListenerErrorHandler orderErrorHandler() {
        return (message, exception, consumer) -> {
            // 业务相关错误处理(如库存不足)
         /*   if (exception instanceof InventoryException) {
                retryService.scheduleRetry(message.getPayload());
            }*/
            System.out.println("异常执行:"+exception);
            return null;
        };
    }



    /**
     * 注册全局异常处理器
     */
    @Bean
    public ConsumerAwareListenerErrorHandler globalExceptionHandler() {
        return (message, exception, consumer) -> {
            log.error("捕获消费异常: topic={}, message={}",
                    message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
                    message.getPayload(),
                    exception);

            // 反序列化异常特殊处理
            if (exception.getCause() instanceof DeserializationException) {
                // 跳过错消息并提交偏移量
                return null;
            }
            throw exception; // 其他异常继续抛出
        };
    }

}

    @KafkaListener(topics = "test", groupId = "spring-group",errorHandler = "globalExceptionHandler")
    public void listenBatch(List<String> messages, Acknowledgment ack) {
        messages.forEach(msg -> System.out.println("批量消息:" + msg));
        //异常测试
        int i = 1/0;
        ack.acknowledge();
    }
3.重试yml配置
spring:
  kafka:
    listener:
      retry:
        max-attempts: 3               # 最大重试次数
        backoff:
          initial-interval: 1000     # 初始间隔(毫秒)
          multiplier: 2.0            # 间隔倍数
        exclude-exceptions:          # 不重试的异常
          - javax.validation.ValidationException

4.死信队列(DLQ)配置

1. 启用死信队列
spring:
  kafka:
    listener:
      dead-letter-publish:
        enable: true                  # 自动发布到死信队列
      dead-letter-topic: dlq-${topic} # 死信队列命名规则
2. 手动发送到DLQ
@KafkaListener(topics = "payments")
public void handlePayment(PaymentEvent event, Acknowledgment ack) {
    try {
        paymentService.process(event);
        ack.acknowledge();
    } catch (InvalidPaymentException ex) {
        // 手动发送到DLQ
        kafkaTemplate.send("dlq-payments", event);
        ack.acknowledge(); // 避免重复消费
    }
}

5.事务场景异常处理

1. 声明式事务
@Transactional
public void processWithTransaction(Order order) {
    // 数据库操作
    orderRepository.save(order);
    // Kafka事务消息
    kafkaTemplate.send("orders", order.toEvent());
    // 其他业务...
}
2. 事务异常回滚
@Bean
public KafkaTransactionManager<String, Object> transactionManager(ProducerFactory<String, Object> pf) {
    return new KafkaTransactionManager<>(pf);
}

@Transactional(rollbackFor = {KafkaException.class, SQLException.class})
public void transactionalProcess() {
    // 数据库与Kafka操作
}

6.监控与告警

1. Actuator 健康检查
management:
  endpoints:
    web:
      exposure:
        include: health,kafka
  health:
    kafka:
      enabled: true
2. Prometheus 指标
@Bean
public MicrometerConsumerListener<K, V> consumerMetrics() {
    return new MicrometerConsumerListener<>("kafka.consumer");
}

@Bean
public MicrometerProducerListener<K, V> producerMetrics() {
    return new MicrometerProducerListener<>("kafka.producer");
}

7.完整异常处理流程

  1. 捕获异常 → 2. 分类判断 → 3. 重试/记录/DLQ → 4. 提交Offset → 5. 监控告警

8.最佳实践总结

  • 分层处理:全局处理器兜底 + 方法级精细控制
  • 幂等消费:确保消息重复消费时的数据安全性
  • 监控覆盖:跟踪重试次数、DLQ堆积等关键指标
  • 事务隔离@Transactional + read_committed 保证数据一致性

http://www.niftyadmin.cn/n/5870005.html

相关文章

「拼好帧」小黄鸭 Lossless Scaling 软件介绍与下载

「拼好帧」小黄鸭 Lossless Scaling 软件介绍与下载 在游戏和视频播放时&#xff0c;你是否遇到过分辨率不匹配、画质模糊的问题&#xff1f;今天给大家介绍一款神器——Lossless Scaling&#xff08;拼好帧&#xff09;&#xff0c;也被玩家们亲切地称为“小黄鸭”&#xff0…

磁场定向控制 (FOC)模型的C语言实现(STM32G4)

目录 概述 1 磁场定向控制 (FOC)介绍 1.1 FOC控制模型介绍 1.2 模型功能 2 FOC模型的几个重要的转换关系 2.1 Clarke Transform 2.2 Inverse Clarke Transform 2.3 Park Transform 2.4 Inverse Park Transform 3 STM32模拟实现FOC 3.1 FOC算法的C语言实现 3.2…

网络安全词汇

什么是注入&#xff1f; (转自360安全论坛)   随着B/S模式应用开发的发展&#xff0c;使用该模式编写程序的程序员越来越来越多&#xff0c;但是由于程序员的水平参差不齐&#xff0c;相当大一部分应用程序存在安全隐患。用户可以提交一段数据库查询代码&#xff0c…

SpringBoot——生成Excel文件

在Springboot以及其他的一些项目中&#xff0c;或许我们可能需要将数据查询出来进行生成Excel文件进行数据的展示&#xff0c;或者用于进行邮箱发送进行附件添加 依赖引入 此处demo使用maven依赖进行使用 <dependency><groupId>org.apache.poi</groupId>&…

在线会议时, 笔记本电脑的麦克风收音效果差是为什么

背景 最近在线面试. 使用腾讯会议或者飞书, 戴耳机参加在线面试, 遇到好几个面试官说我的音质不好. 一直没在意, 后来反思, 应该是电脑哪里出了问题. 排查 先买了一副品牌有线耳机, 测试后本地录制的声音仍然品质很差去掉耳机延长线后, 麦克风品质仍然很差最终找到答案, 原…

高可用、高性能、负载均衡集群的区别

维度高可用集群高性能集群负载均衡集群核心目标服务持续可用&#xff0c;减少停机加速计算任务&#xff0c;提升处理能力请求分发算法、健康检查关键技术冗余、心跳检测、鼓掌转移并行计算、高速网络、分布式存储请求分发算法、健康检查典型应用数据库主从切换、关键业务系统科…

【字符串】最长公共前缀 最长回文子串

文章目录 14. 最长公共前缀解题思路&#xff1a;模拟5. 最长回文子串解题思路一&#xff1a;动态规划解题思路二&#xff1a;中心扩散法 14. 最长公共前缀 14. 最长公共前缀 ​ 编写一个函数来查找字符串数组中的最长公共前缀。 ​ 如果不存在公共前缀&#xff0c;返回空字符…

【生成模型】【ComfyUI(三)】使用WebAPI批量调用ComfyUI

可以参考【生成模型】【ComfyUI&#xff08;一&#xff09;】Flux与Flux-Fill部署与API调用中Flux-Fill部分 1. 调整Workflow 我们要部署以下workflow 做两个修改 输入改为从Load Image(Base64) 读入图片&#xff0c;当然使用上面的从路径中读图也是可以的输出改为SaveImag…