Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 1524|回复: 0

[默认分类] kafkaStream解析json出错导致程序中断的解决方法

[复制链接]
  • TA的每日心情
    开心
    2021-12-13 21:45
  • 签到天数: 15 天

    [LV.4]偶尔看看III

    发表于 2020-8-17 10:16:42 | 显示全部楼层 |阅读模式
    出错在 KStreamFlatMapValues 方法执行时,由于json异常数据无法解析,结果生成的值为null,报错信息如下:
    1. [code]2018-04-18 19:21:04,776 ERROR [app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1] com.gw.stream.KStream103.lambda$main$1(100) | 捕获到异常:hello world hello world king
    2. Exception in thread "app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1" java.lang.NullPointerException
    3.         at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41)
    4.         at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    5.         at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    6.         at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    7.         at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
    8.         at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    9.         at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
    10.         at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    11.         at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
    12.         at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
    13.         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
    14.         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
    15.         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
    复制代码
    [/code]
    问题解决方案:

    对json解析的bean添加未知字段忽略
    1. [code]
    2. import java.util.List;
    3. import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    4. @JsonIgnoreProperties(ignoreUnknown = true)
    5. public class Bean103 {
    6.     private List<String> key1;
    7.     private List<List<String>> key2;
    8.     public void setKey1(List<String> key1) {
    9.         this.key1 = key1;
    10.     }
    11.     public List<String> getKey1() {
    12.         return key1;
    13.     }
    14.     public void setKey2(List<List<String>> key2) {
    15.         this.key2 = key2;
    16.     }
    17.     public List<List<String>> getKey2() {
    18.         return key2;
    19.     }
    20. }
    复制代码
    [/code]
    由于报空指针错误,所以解决空指针问题,即判断为null时创建一个空对象.
    1. [code]return list == null ? new ArrayList<String>():list;
    复制代码
    [/code]
    完整的示例代码如下:
    1. [code]package com.gw.stream;
    2. import java.util.ArrayList;
    3. import java.util.List;
    4. import java.util.Properties;
    5. import java.util.stream.Collectors;
    6. import org.apache.kafka.clients.consumer.ConsumerConfig;
    7. import org.apache.kafka.common.serialization.Serdes;
    8. import org.apache.kafka.streams.KafkaStreams;
    9. import org.apache.kafka.streams.KeyValue;
    10. import org.apache.kafka.streams.StreamsBuilder;
    11. import org.apache.kafka.streams.StreamsConfig;
    12. import org.apache.kafka.streams.kstream.KStream;
    13. import org.apache.kafka.streams.kstream.Produced;
    14. import org.apache.log4j.Logger;
    15. import com.alibaba.fastjson.JSONObject;
    16. public class KStream103 {
    17.     private static Logger log = Logger.getLogger(KStream103.class);
    18.     public static void main(String[] args) {
    19.         if(args.length < 6){
    20.             log.error("错误:参数个数不正确[application_id bootstarp_server groupid source_topic target_topic auto_offset_reset]");
    21.             return ;
    22.         }
    23.         String application_id=args[0];
    24.         String bootstarp_server = args[1];
    25.         String groupid = args[2];
    26.         String source_topic = args[3];
    27.         String target_topic = args[4];
    28.         String auto_offset_reset = args[5];
    29.         Properties props = new Properties();
    30.         // consumer group
    31.         // 指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件
    32.         props.put(StreamsConfig.APPLICATION_ID_CONFIG, application_id);
    33.         props.put(StreamsConfig.STATE_DIR_CONFIG, "./tmp/");
    34.         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,bootstarp_server);
    35.         // props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
    36.         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
    37.         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    38.         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    39.         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
    40.         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);  //自动提交
    41.         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
    42.         //针对时间异常解决方法
    43.         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
    44.         final String splitChar = "\001";
    45.         StreamsBuilder builder = new StreamsBuilder();
    46.         KStream<String, String> textLines = builder.stream(source_topic); // 接收第一个topic
    47.         textLines.flatMapValues(value -> {
    48.             Bean103 bean103 = null;
    49.             List<String> list = null;
    50.             try {
    51.                 //这里是value的业务处理逻辑...最终返回的是一个list
    52.             } catch (Exception e) {
    53.                 log.error("捕获到异常:" + value);
    54.                 log.error("error message:" + e.getMessage());
    55.             }
    56.             return list == null ? new ArrayList<String>():list;
    57.         }).filter((k,v)-> v !=null).map((k, v) -> new KeyValue<>(k, v))
    58.         .to(target_topic, Produced.with(Serdes.String(), Serdes.String()));
    59.         KafkaStreams streams = new KafkaStreams(builder.build(), props);
    60.         streams.start();
    61.     }
    62. }
    复制代码
    [/code]
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-19 06:31 , Processed in 0.392651 second(s), 37 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表