欢迎光临
kafka报错nested exception is org.apache.kafka.common.errors.RecordTooLargeException
   

kafka报错nested exception is org.apache.kafka.common.errors.RecordTooLargeException

前言:之前使用的是feign请求单独的kafka服务,因为项目需要申请防火墙策略,所以直接使用KafkaTemplate,上周五刚改,今天就报错了。

错误信息:

Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.njwx.yq.business.kafka.listener.KafkaTaskListener.listenerTask(org.apache.kafka.clients.consumer.ConsumerRecord,org.springframework.kafka.support.Acknowledgment)' threw exception; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1270600 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1270600 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2084)
	... 10 common frames omitted
Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1270600 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:573)
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:388)
	at com.njwx.yq.business.kafka.service.KafkaService.send(KafkaService.java:26)
	at com.njwx.yq.business.kafka.controller.KafkaMessageController.sendMessage(KafkaMessageController.java:37)
	at com.njwx.yq.business.kafka.controller.KafkaMessageController$$FastClassBySpringCGLIB$$873a3c4b.invoke()
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.aop.framework.adapter.AfterReturningAdviceInterceptor.invoke(AfterReturningAdviceInterceptor.java:55)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.aop.framework.adapter.MethodBeforeAdviceInterceptor.invoke(MethodBeforeAdviceInterceptor.java:56)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
	at com.njwx.yq.business.kafka.controller.KafkaMessageController$$EnhancerBySpringCGLIB$$6600c573.sendMessage()
	at com.njwx.yq.business.kafka.service.impl.TaskQueueServiceImpl.sendAppMessage(TaskQueueServiceImpl.java:388)
	at com.njwx.yq.business.kafka.service.impl.TaskQueueServiceImpl.saveBatchTaskReceiver(TaskQueueServiceImpl.java:351)
	at com.njwx.yq.business.kafka.service.impl.TaskQueueServiceImpl.createTaskReceiver(TaskQueueServiceImpl.java:246)
	at com.njwx.yq.business.kafka.service.impl.TaskQueueServiceImpl.taskManager(TaskQueueServiceImpl.java:98)
	at com.njwx.yq.business.kafka.service.impl.TaskQueueServiceImpl$$FastClassBySpringCGLIB$$9735e29a.invoke()
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
	at com.njwx.yq.business.kafka.service.impl.TaskQueueServiceImpl$$EnhancerBySpringCGLIB$$3af4caba.taskManager()
	at com.njwx.yq.business.kafka.listener.KafkaTaskListener.listenerTask(KafkaTaskListener.java:29)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86)
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2039)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2021)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1958)
	... 8 common frames omitted
提示:The message is 1270600 bytes when serialized which is larger than 1048576,原因是因为生产者发送到Kafka的单个请求中的所有消息总大小默认的max.request.size大小是1M,所以需要在yml里面配置producer.properties.max.request.size
  producer:
    properties:
        max.request.size: 10240000
有需要可以加上
batch-size:控制默认批量大小(以字节为单位)
buffer-memory:缓冲等待发送到服务器的记录的内存总字节数
 
 无能为力的反义词  海棠果几月份可以吃  2020109期双色球开奖结果  保护环境建议书  首因效应近因效应  色界 
打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《kafka报错nested exception is org.apache.kafka.common.errors.RecordTooLargeException》
文章链接:https://goodmancom.com/wl/175763.html