解决 spark streaming 集成 kafka 时只能读取每个 topic 的其中一个分区数据的问题

本贴最后更新于 1885 天前,其中的信息可能已经时移俗易

1. 问题描述

我创建了一个名称为 myTest 的 topic,该 topic 有三个分区,在我的应用中 spark streaming 以 direct 方式连接 kakfa,但是发现只能消费一个分区的数据,多次更换 comsumer group 依然如此。

2 环境配置

kafka 集群环境

主机 IP 操作系统 kakfa
node1 192.168.1.101 Centos 6.5 kafka_2.11-0.10.1.1
node2 192.168.1.102 Centos 6.5 kafka_2.11-0.10.1.1
node3 192.168.1.103 Centos 6.5 kafka_2.11-0.10.1.1

应用依赖:spark 版本是 2.1.1、kakfa 版本是 0.10.1.1;
maven 依赖配置如下

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>$2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.1</version>
</dependency>

相关配置代码(Java)如下:

Map<String, Object> kafkaParams  =  new  HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  
Set<String> topics  =  new  HashSet<String>(Arrays.asList("testTopic"));
JavaInputDStream<ConsumerRecord<Object, Object>> dStream  =  KafkaUtils.createDirectStream(
      jssc,
      LocationStrategies.PreferConsistent(),
      ConsumerStrategies.Subscribe(topics, kafkaParams));

3. 解决方案

经过查阅相关资料发现是由于 Kafka 0.10.1.1 的 bug 导致的。其实不仅仅是 0.10.1.1,另外 0.10.1.0 和 0.10.0.2 也有这个问题。详细描述参考 https://issues.apache.org/jira/browse/KAFKA-4547
最后我将 kafka 版本降到了 0.10.0.1,解决了这个问题。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 550 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    35 引用 • 35 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...