弗兰兹·卡夫卡,生活于奥匈帝国统治下的捷克德语小说家,本职为保险业职员。主要作品有小说《审判》、《城堡》、《变形记》等。 another Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式``发布订阅``消息系统,它可以处理消费者规 ..

浅入浅出 kafka

本贴最后更新于 221 天前,其中的信息可能已经事过景迁

弗兰兹·卡夫卡,生活于奥匈帝国统治下的捷克德语小说家,本职为保险业职员。主要作品有小说《审判》、《城堡》、《变形记》等。

another

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量分布式``发布订阅``消息系统,它可以处理消费者规模的网站中的所有动作流数据。

两个 kafka 有一个共同特别: 很会写

消息系统

低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步 RPC 的主要手段之一。

什么是消息系统

producer -> broker:发送数据
broker -> consumer: push数据
consumer -> broker: ack确认
broker -> broker: 删除/备份message

producer 发送消息给 broker,broker 持有数据,在合适的时机发送给 consumer,consumer 确认后,broker 删除消息数据。

优化点无外乎吞吐量、性能、可靠性、事务。

扩展概念
  1. 消息队列模型 sender -> queue -> receiver p2p
  2. 发布/订阅模型 publish -> topic -> subscribe
  3. push
  4. pull

什么时候使用消息系统


node 前端服务器1 AS c1
node 前端服务器2 AS c2
control metrics服务器 AS m1
c1 -down-> m1
c2 -down-> m1

node 前端服务器1 AS c1
node 前端服务器2 AS c2
node 后台服务器1 AS c3
control metrics服务器 AS s1
control 日志记录服务器 AS s2
control 用户行为服务器 AS s3
c1 -down-> s1
c2 -down-> s1
c1 -down-> s2
c2 -down-> s2
c1 -down-> s3
c2 -down-> s3
c3 -down-> s1
c3 -down-> s2
node 前端服务器1 AS c1<<producer>>
node 前端服务器2 AS c2<<producer>>
node 后台服务器1 AS c3<<producer>>
node 消息系统 AS kafka<<kafka>>
control metrics服务器 AS s1<<consumer>>
control 日志记录服务器 AS s2<<consumer>>
control 用户行为服务器 AS s3<<consumer>>
c1 -down-> kafka :pub
c2 -down-> kafka :pub
c3 -down-> kafka :pub
s1 -up-> kafka :sub
s2 -up-> kafka :sub
s3 -up-> kafka :sub
适合场景
不适合场景

kafka 好在哪

吞吐量/延时

延时越低,吞吐越高?
通常情况下,我们认为延时越低,单位时间可以处理的请求变多,所以吞吐量增加。但是两者并不是正相关关系。
e.g. kafka 处理一条消息需要花费 2ms,吞吐量为 1000/2=500。如果通过 batch,批量发送,每 8ms 发送一次 600 条,延时 =2ms+8ms=10ms,600*(1000/10)=60000。

消息持久化

保存在硬盘,不会丢失,可以重放。and 性能很高!!! 后面聊原因。

负载均衡和故障转移

多副本、多分区,保障高可用。

伸缩性

自身无状态,方便扩展。

名词解析

  1. message -> 消息
  2. broker -> kafka 服务器
  3. topic -> 主题,逻辑概念,一类消息,一个消息内容体
  4. partition -> 分区,消息实际存储的物理位置。有序队列,维护 offset。

kafka_0

  1. replica -> 副本(partition)。分为 leader replica 和 follower replica。和 Master-Slave 不同,follower 只从 leader 同步数据,不提供读写。只有在 leader 挂了之后,才会选举 follower 作为 leader 提供服务。kafka 保障同一个 partition 的 replica 在不同的 broker,否则无法提供故障转移。同一个 topic 可以有不同的 leader,同一个 topic+partition 只有一个 leader。
  2. ISR(is-sync replica) -> 同步副本集合。如果 follower 延迟过大,会被踢出集合,追赶上数据之后,重新向 leader 申请,加入 ISR 集合。并不是所有的 follower 都可以成为 leader,ISR 集合中的 follower 可以竞选 leader。通过 replica.lag.time.max.ms(默认 10s)设置 follower 同步时间,通过 RetchRequest(offset)同步 leader 信息。
  3. offset -> 位移、偏移量

kafka_1

  1. producer -> 生产者

  2. consumer -> 消费者

  3. group -> 组。通过维护各 group 的 offset,每条消息只会被发送到同一个 group 下的一个 consumer,实现不同模型。

    • 一个 group 有一或多个 consumer
    • 一个消息可以发送给多个 group
  4. controller -> 控制器。选举 broker 作为 controller,管理和协调 kafka 集群,和 zookeeper 通信。

  5. coordinator -> 协调者。用于实现成员管理、消费分配方案制定(rebalance)以及提交位移等,每个 group 选举 consumer 作为协调者。

kafka 高性能的秘密

顺序写?

kafka_2

网上的教程经常看到介绍,写入耗时主要集中在磁头寻道盘片旋转,而数据传输速度很快。kafka 采用了顺序写,所以效率高。不免有些疑问:

  1. 顺序写性能高,为什么还有随机写?
  2. 磁盘不会被占用,每次写入都需要寻道、旋转,那么顺序写的优势在哪?

原因

  1. 因为写入的是不同的文件,占用连续的 page。顺序写,不能修改。
  2. 增加前提:一次写入一个文件且文件足够大。

所以本质原因在于追加写,"每个 partition 是一个文件"。
读取时,识别顺序写,会进行预读。

PageCache

PageCache 为缓存,数据会不会丢失?

因为是操作系统管理,所以 kafka 进程挂了,数据不会丢失。如果操作系统掉电。。。依靠副本

Zero Copy

Java FileChannel.transferTO
Linux sendfile

kafka_3

kafka_4

partition

  1. leader 针对 partition 而不是 broker
  2. partition 不是一个文件而是一个文件夹
  3. partition 是我们能操作的最小概念

kafka_5

如果一直追加会导致文件过大,不便于使用(读写)和维护(删除旧数据),kafka 为此采用了几种措施

  1. 区分 segment
  2. 增加索引,包括 index 和 timeindex

segment

kafka_6

index 和 timeindex

kafka_7

  1. index,位移索引,间隔创建索引指向物理偏移地址。间隔通过 log.index.interval.bytes 设置,默认 4MB。
  2. timeindex,时间索引,为满足时序型统计需求。

  def append(largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {
    if (records.sizeInBytes > 0) {
      trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
      val physicalPosition = log.sizeInBytes()
      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)

      ensureOffsetInRange(largestOffset)

      // append the messages
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
      // Update the in memory max timestamp and corresponding offset.
      if (largestTimestamp > maxTimestampSoFar) {
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
      }
      // append an entry to the index (if needed)
      if (bytesSinceLastIndexEntry > indexIntervalBytes) {
        offsetIndex.append(largestOffset, physicalPosition)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
        bytesSinceLastIndexEntry = 0
      }
      bytesSinceLastIndexEntry += records.sizeInBytes
    }
  }
  

索引文件预分配空间,切分时裁剪。

p.s. producer 发送消息时,可以指定时间戳。如果机器时区不同,或者 retry、网络延时等导致时间混乱,按照时间索引进行查询时,导致查询不到消息。?? 时间会在发送时获取本机时间

long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

producer

kafka_8

producer 配置

通过配置文件了解细节

自定义 serializer

public class FastJsonSerializer implements Serializer {
    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, Object data) {
        return JSONObject.toJSONBytes(data);
    }

    @Override
    public void close() {

    }
}

自定义 partitioner

public class AbsPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (Long.parseLong(String.valueOf(key)) > 0) {
            return 0;
        } else {
            return 1;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

自定义分区策略 机器人发送到同一个 partition,为了快速响应真实用户。如果只是为了均匀分布,不需要指定 key(和旧版本不同)。

如果未指定 key,会通过轮询,避免 skewed

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

producer 拦截器

public class ProducerLogInterceptor implements ProducerInterceptor<String, Object> {

    @Override
    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
        System.out.println("send topic: " + record.topic());
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("record metadata");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

100% 送达配置 | 无消息丢失配置

通过配置替换 send().get()

消息内容

CRC 版本号 属性 时间戳 key 长度 key 内容 value 长度 value 内容
4B 1B 1B 8B 4B n 4B n

Consumer

group 保存位移 offset,替换 zookeeper 保存(/consumers/groupid/offsets/topic/partition节点)。checkpointing 定期从 consumer 到 broker 对 offset 进行持久化。(log.flush.offset.checkpoint.interval.ms 默认 60s)
offset 格式 =map(groupId+topic+partition, offset)

为什么不用 zookeeper 保存?

为什么不用 broker 保存?

  1. 增加应答机制,确认消费成功,影响吞吐
  2. 保存多个 consumer 的 offset,数据结构复杂
  /**
   *  Start the background threads to flush logs and do log cleanup
   */
  def startup() {
    /* Schedule the cleanup task to delete old logs */
    if (scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention",
                         cleanupLogs _,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher",
                         flushDirtyLogs _,
                         delay = InitialTaskDelayMs,
                         period = flushCheckMs,
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointLogRecoveryOffsets _,
                         delay = InitialTaskDelayMs,
                         period = flushRecoveryOffsetCheckpointMs,
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-log-start-offset-checkpoint",
                         checkpointLogStartOffsets _,
                         delay = InitialTaskDelayMs,
                         period = flushStartOffsetCheckpointMs,
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                         deleteLogs _,
                         delay = InitialTaskDelayMs,
                         unit = TimeUnit.MILLISECONDS)
    }
    if (cleanerConfig.enableCleaner)
      cleaner.startup()
  }

Consumer 配置

partition 分配策略

每个 partition 分配给一个 consumer。

e.g. 如果一个 group 订阅一个 topic,一个 topic 有 100 个 partition,一个 group 有 5 个 consumer。则每个 consumer 消费 20 个 partition

partition 分配策略,继承 AbstractPartitionAssignor 自定义策略规则,加权重等。自带分配规则:

  1. range 分区顺序排列、分组、分配给 consumer
  2. round-robin 分区顺序排列, 轮询 consumer,读取分区
  3. sticky 基于历史分配方案,避免数据倾斜
public class RangeAssignor extends AbstractPartitionAssignor {

    @Override
    public String name() {
        return "range";
    }

    private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics())
                put(res, topic, consumerId);
        }
        return res;
    }

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList<TopicPartition>());

        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List<String> consumersForTopic = topicEntry.getValue();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null)
                continue;

            Collections.sort(consumersForTopic);

            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
            }
        }
        return assignment;
    }

}

误解

使用过程中对 kafka consumer 的一些误解

误解 1

poll(long timeout)max.poll.records 按照时间或者消息记录数,控制每次获取消息。

poll 表示轮询,使用 poll 而不是 pull,并不需要 wakeup。所以可以使用poll(Long.MAX_VALUE),每次数据流准备好后,会返回并进行业务处理。

误解 2

"consumer 只能订阅一个 topic。"

consumer.subscribe(Pattern.compile("kafka.*"))
误解 3

"commitSync 同步提交,阻塞消费。commitAsync 异步提交,不阻塞消费。"

commitSync 和 commitAsync 都会阻塞 poll,因为在 poll 执行时轮询时会判断 commit 状态。commitAsync 不阻塞业务处理后续方法执行。

    void invokeCompletedOffsetCommitCallbacks() {
        while (true) {
            OffsetCommitCompletion completion = completedOffsetCommits.poll();
            if (completion == null)
                break;
            completion.invoke();
        }
    }

EOS(Exactly-once Semantics)

消费指定 partition

自定义分配策略?不需要,可以通过 assign 指定 topic partition

consumer.assign(Collections.singletonList(new TopicPartition(TOPIC, partition)));

控制提交分区 offset

Map<TopicPartition, OffsetAndMetadata> offsets 控制提交分区 offset,细粒度

consumer.commitSync(Collections.singletonMap(tp, offset));

rebalance

状态机

触发条件

  1. consumer 加入、退出、崩溃
  2. topic 发生变更,如正则匹配,增加 topic
  3. 分区发生变动
  4. 消费处理超时

rebalance 开销大,合理设置 request.timeout.ms、max.poll.records 和 max.poll.interval.ms 减少 rebalance 次数

rebalance generation 标识 rebalance,每次 +1, 延迟提交 offset 会被 group 拒绝 ILLEGAL_GENERATION

协议

  1. joinCroup 请求
  2. SyncGroup 请求,group leader 同步分配方案
  3. Heartbeat 请求 向 coordinator 汇报心跳
  4. LeaveGroup 请求
  5. DescribeGroup 查看组信息

确认 所在 broker
Math.abs(groupId.hashCode) % offsets.topic.num.partitions 确认分区,此分区所在的 leader broker

  1. 收集 join consumer,选取 leader,同步给 coordinator。 leader 负责分配
  2. 同步更新分配方案,发送 SyncGroup 请求给 coordinator,每个 consumer 都发送,coordinator 接受 leader 的方案,分配,返回 response
def handleJoinGroupRequest(request: RequestChannel.Request) {
    val joinGroupRequest = request.body[JoinGroupRequest]

    // the callback for sending a join-group response
    def sendResponseCallback(joinResult: JoinGroupResult) {
      val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
      def createResponse(requestThrottleMs: Int): AbstractResponse = {
        val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId,
          joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)

        trace("Sending join group response %s for correlation id %d to client %s."
          .format(responseBody, request.header.correlationId, request.header.clientId))
        responseBody
      }
      sendResponseMaybeThrottle(request, createResponse)
    }

    if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.groupId(), LITERAL))) {
      sendResponseMaybeThrottle(request, requestThrottleMs =>
        new JoinGroupResponse(
          requestThrottleMs,
          Errors.GROUP_AUTHORIZATION_FAILED,
          JoinGroupResponse.UNKNOWN_GENERATION_ID,
          JoinGroupResponse.UNKNOWN_PROTOCOL,
          JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
          JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
          Collections.emptyMap())
      )
    } else {
      // let the coordinator handle join-group
      val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
        (protocol.name, Utils.toArray(protocol.metadata))).toList
      groupCoordinator.handleJoinGroup(
        joinGroupRequest.groupId,
        joinGroupRequest.memberId,
        request.header.clientId,
        request.session.clientAddress.toString,
        joinGroupRequest.rebalanceTimeout,
        joinGroupRequest.sessionTimeout,
        joinGroupRequest.protocolType,
        protocols,
        sendResponseCallback)
    }
  }

多线程消费

自己实现缓存区,批量执行及确认 consumer.commitSync

  1. 多 consumer thread 效率高,单独 offset。 缺点:受限于 topic 分区数,broker 压力大,rebalance 可能性大
  2. 单 consumer 多 handler thread ,获取和处理节藕,伸缩性好。难于管理分区内消息顺序,位移提交困难,处理不当导致数据丢失。

其他

日志留存

暂停 consumer 消费

e.g. 消费逻辑为调用三方接口,如果三方接口不稳定,需要关闭一段时间。

compaction

manager 指定 lisnterners

因为 kafka 内部使用全称域名,不统一,导致无法获取元数据

生产环境

  1. 优雅的启动和关闭(Spring 生命周期)
  2. offset 跳过与重放

附录

  1. Apache Kafka实战
  2. Page Cache的落地问题
  3. Kafka文件存储机制那些事
  4. Netty 之 Zero-copy 的实现(下)
  5. 磁盘I/O那些事
  6. Kafka消费组(consumer group)
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    24 引用 • 35 回帖 • 106 关注
13 回帖
请输入回帖内容...
  • crick77

    @88250 为什么markdown不支持plantuml ?

    2 回复
  • 88250

    等 @Vanessa 研究一下是否能够支持。

    1 回复
  • Vanessa

    收到

  • pkwenda

    666

  • Vanessa

    欢迎体验。

    张三->李四: 嘿,小四儿, 写博客了没?
    Note right of 李四: 李四愣了一下,说:
    李四-->张三: 忙得吐血,哪有时间写。
    

    @88250 请部署

    1 回复
  • crick77

    跟我想的不太一样。。。

    之前静态博客使用的是 https://github.com/wafer-li/hexo-filter-plantuml 插件, 是做了一个转换

    1 回复
  • Vanessa

    有在线的 Demo 么?

    1 回复
  • crick77

    http://blog.crick.wang/2018/07/05/kafka/

    1 回复
  • Vanessa

    一样也可以使用的。黑客派上用的是本地的、矢量方案。hexo 用的是依赖第三方接口生成的图片。

    producer -> broker:发送数据 
    broker -> consumer: push数据 
    consumer -> broker: ack确认 
    broker -> broker: 删除/备份message
    

    你这样写就可以了

    imagepng

  • meetme

    满满的干货啊, 楼主之前作什么的

    1 回复
  • crick77

    搬砖 Java 码农

  • Michael19930815
    该回帖仅作者和楼主可见
    1 回复
  • crick77

    感谢纠正

请输入回帖内容 ...