弗兰兹·卡夫卡,生活于奥匈帝国统治下的捷克德语小说家,本职为保险业职员。主要作品有小说《审判》、《城堡》、《变形记》等。

another

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量分布式``发布订阅``消息系统,它可以处理消费者规模的网站中的所有动作流数据。

两个 kafka 有一个共同特别: 很会写

消息系统

低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步 RPC 的主要手段之一。

什么是消息系统

producer -> broker:发送数据
broker -> consumer: push数据
consumer -> broker: ack确认
broker -> broker: 删除/备份message

producer 发送消息给 broker,broker 持有数据,在合适的时机发送给 consumer,consumer 确认后,broker 删除消息数据。

优化点无外乎吞吐量、性能、可靠性、事务。

扩展概念
  1. 消息队列模型 sender -> queue -> receiver p2p
  2. 发布 / 订阅模型 publish -> topic -> subscribe
  3. push
  4. pull

什么时候使用消息系统


node 前端服务器 1 AS c1
node 前端服务器 2 AS c2
control metrics服务器 AS m1
c1 -down-> m1
c2 -down-> m1
node 前端服务器1 AS c1
node 前端服务器2 AS c2
node 后台服务器1 AS c3
control metrics服务器 AS s1
control 日志记录服务器 AS s2
control 用户行为服务器 AS s3
c1 -down-> s1
c2 -down-> s1
c1 -down-> s2
c2 -down-> s2
c1 -down-> s3
c2 -down-> s3
c3 -down-> s1
c3 -down-> s2
node 前端服务器1 AS c1<<producer>>
node 前端服务器2 AS c2<<producer>>
node 后台服务器1 AS c3<<producer>>
node 消息系统 AS kafka<<kafka>>
control metrics服务器 AS s1<<consumer>>
control 日志记录服务器 AS s2<<consumer>>
control 用户行为服务器 AS s3<<consumer>>
c1 -down-> kafka :pub
c2 -down-> kafka :pub
c3 -down-> kafka :pub
s1 -up-> kafka :sub
s2 -up-> kafka :sub
s3 -up-> kafka :sub
适合场景
  • 业务解耦,领域更清晰。区分业务核心系统
  • 最终一致性 (反之,强一致性,需要接收方回调确认,同步 RPC 更合适)
  • 广播,1 VS N,稳定上游服务
  • 错峰流控,拉平峰值,避免木桶
  • 日志同步
不适合场景
  • 强事务保证
  • 延迟敏感,实时响应

kafka 好在哪

吞吐量 / 延时

  • 吞吐量: 每秒能够处理的消息 or 字节数。
  • 延时: 客户端发送请求、服务端处理请求并发送相应给客户端。

延时越低,吞吐越高?
通常情况下,我们认为延时越低,单位时间可以处理的请求变多,所以吞吐量增加。但是两者并不是正相关关系。
e.g. kafka 处理一条消息需要花费 2ms,吞吐量为 1000/2=500。如果通过 batch,批量发送,每 8s 发送一次 600 条,延时 =2ms+8ms=10ms,600*(1000/10)=60000。

消息持久化

保存在硬盘,不会丢失,可以重放。and 性能很高!!! 后面聊原因。

负载均衡和故障转移

多副本、多分区,保障高可用。

伸缩性

自身无状态,方便扩展。

名词解析

  1. message -> 消息
  2. broker -> kafka 服务器
  3. topic -> 主题,逻辑概念,一类消息,一个消息内容体
  4. partition -> 分区,消息实际存储的物理位置。有序队列,维护 offset。

kafka_0

  • 同一个 topic 可以在不同 broker 上维护不同的分区 (负载均衡)
  • 同一个 topic 可以在不同 broker 上维护同一个分区 (冗余机制, 故障转移)
  1. replica -> 副本 (partition)。分为 leader replica 和 follower replica。和 Master-Slave 不同,follower 只从 leader 同步数据,不提供读写。只有在 leader 挂了之后,才会选举 follower 作为 leader 提供服务。kafka 保障同一个 partition 的 replica 在不同的 broker,否则无法提供故障转移。同一个 topic 可以有不同的 leader,同一个 topic+partition 只有一个 leader。
  2. ISR(is-sync replica) -> 同步副本集合。如果 follower 延迟过大,会被踢出集合,追赶上数据之后,重新向 leader 申请,加入 ISR 集合。并不是所有的 follower 都可以成为 leader,ISR 集合中的 follower 可以竞选 leader。通过 replica.lag.time.max.ms(默认 10s) 设置 follower 同步时间,通过 RetchRequest(offset) 同步 leader 信息。
  3. offset -> 位移、偏移量

kafka_1

  • 上次提交的位移:group 确认的 offset
  • 当前位置: 读取后,未提交
  • HW:ISR 确认已同步后,leader 增加 HW。
  • LEO: leader 接收到的最新一条 producer 发送的数据
  • consumer 只能消费到 HW,未同步给所有 ISR 成员的消息无法消费
  • leader 保存 LEO、HW 和 remote LEO, min(LEO, remote LEO) 更新 HW
  • follower 轮询 leader,purgatory 暂存请求,500ms
  • 新版本 epoch 保存 leader 变更版本,维护 kv (epoch, offset)
  1. producer -> 生产者
  2. consumer -> 消费者
  3. group -> 组。通过维护各 group 的 offset,每条消息只会被发送到同一个 group 下的一个 consumer,实现不同模型。
    • 一个 group 有一或多个 consumer
    • 一个消息可以发送给多个 group
  1. controller -> 控制器。选举 broker 作为 controller,管理和协调 kafka 集群,和 zookeeper 通信。
  2. coordinator -> 协调者。用于实现成员管理、消费分配方案制定 (rebalance) 以及提交位移等,每个 group 选举 consumer 作为协调者。

kafka 高性能的秘密

顺序写?

kafka_2

网上的教程经常看到介绍,写入耗时主要集中在磁头寻道盘片旋转,而数据传输速度很快。kafka 采用了顺序写,所以效率高。不免有些疑问:

  1. 顺序写性能高,为什么还有随机写?
  2. 磁盘不会被占用,每次写入都需要寻道、旋转,那么顺序写的优势在哪?

原因

  1. 因为写入的是不同的文件,占用连续的 page。顺序写,不能修改。
  2. 增加前提: 一次写入一个文件且文件足够大。

所以本质原因在于追加写,"每个 partition 是一个文件"。
读取时,识别顺序写,会进行预读。

PageCache

  • Kafka 不会每次都写磁盘,而是写入分页存储PageCache就认为 producer 成功。
  • 操作系统决定什么时候将 PageCache 写入磁盘 (flush)。增加 flush 时间间隔,可以提升吞吐
  • flush 时为顺序写入,不会有额外的性能损耗。
  • 读取时,优先读取 PageCache。

PageCache 为缓存,数据会不会丢失?

因为是操作系统管理,所以 kafka 进程挂了,数据不会丢失。如果操作系统掉电。。。依靠副本

Zero Copy

java FileChannel.transferTO
linux sendfile

kafka_3

kafka_4

partition

  1. leader 针对 partition 而不是 broker
  2. partition 不是一个文件而是一个文件夹
  3. partition 是我们能操作的最小概念

kafka_5

如果一直追加会导致文件过大,不便于使用 (读写) 和维护 (删除旧数据),kafka 为此采用了几种措施

  1. 区分 segment
  2. 增加索引, 包括 index 和 timeindex

segment

kafka_6

  • segment=log+index+timeindex
  • 命名规则为segment 文件最后一条消息的 offset 值
  • log.segment.bytes 日志切割 (默认 1G)

index 和 timeindex

kafka_7

  1. index,位移索引,间隔创建索引指向物理偏移地址。间隔通过 log.index.interval.bytes 设置,默认 4MB。
  2. timeindex,时间索引,为满足时序型统计需求。

  def append(largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {
    if (records.sizeInBytes > 0) {
      trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
      val physicalPosition = log.sizeInBytes()
      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)

      ensureOffsetInRange(largestOffset)

      // append the messages
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
      // Update the in memory max timestamp and corresponding offset.
      if (largestTimestamp > maxTimestampSoFar) {
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
      }
      // append an entry to the index (if needed)
      if (bytesSinceLastIndexEntry > indexIntervalBytes) {
        offsetIndex.append(largestOffset, physicalPosition)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
        bytesSinceLastIndexEntry = 0
      }
      bytesSinceLastIndexEntry += records.sizeInBytes
    }
  }

索引文件预分配空间,切分时裁剪。

p.s. producer 发送消息时,可以指定时间戳。如果机器时区不同,或者 retry、网络延时等导致时间混乱,按照时间索引进行查询时,导致查询不到消息。?? 时间会在发送时获取本机时间

long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

producer

kafka_8

producer 配置

通过配置文件了解细节

  • bootstrap.servers 指定其中一个,会自动找到 leader,但是如果指定的机器挂了,无法切换
  • acks 0, 1, all|-1。 0 表示无需确认,1 表示 leader 确认,-1 表示所有 ISR 确认。
  • buffer.memory 缓存消息的缓冲区大小 32MB,过小会影响吞吐。写入速度超过发送速度,停止 & 等待 IO 发送,still 追不上会报错。
  • compression.type 开启压缩,提升 IO 吞吐,增加 CPU 压力。需要看服务器是 IO 密集型 or 计算密集型。 属性 0: 无压缩,1: GZIP,2: Snappy,3: LZ4
  • retries 重试,屏蔽网络抖动 or leader 选举 or NotController,导致消息重复发送。详细参见RetriableException
  • retry.backoff.ms 重试间隔
  • batch.size 批量发送大小,默认 16KB,增加可提升吞吐
  • linger.ms 发送时间,默认为 0,立即发送,不判断 batch.size 大小。
  • max.request.size 消息大小,因为存在 header 等,实际大小大于消息本身
  • request.timeout.ms 超时时间,默认 30s。broker 给 producer 反馈
  • partitioner.class
  • key.serializer & value.serializer
  • interceptor.classes 自定义拦截器

自定义 serializer

public class FastJsonSerializer implements Serializer {
    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, Object data) {
        return JSONObject.toJSONBytes(data);
    }

    @Override
    public void close() {

    }
}

自定义 partitioner

public class AbsPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (Long.parseLong(String.valueOf(key)) > 0) {
            return 0;
        } else {
            return 1;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

自定义分区策略 机器人发送到同一个 partition,为了快速响应真实用户。如果只是为了均匀分布,不需要指定 key(和旧版本不同)。

如果未指定 key,会通过轮询,避免 skewed

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

producer 拦截器

public class ProducerLogInterceptor implements ProducerInterceptor<String, Object> {

    @Override
    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
        System.out.println("send topic:" + record.topic());
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("record metadata");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

100% 送达配置 | 无消息丢失配置

通过配置替换 send().get()

  • max.block.ms=999999
  • acks=-1
  • retries=999 不会重试非 RetriableException 异常
  • max.in.flight.requests.per.connection=1 发送未响应请求的数量
  • KafkaProducer.send(record,callback)
  • clonse(0)

消息内容

CRC 版本号 属性 时间戳 key 长度 key 内容 value 长度 value 内容
4B 1B 1B 8B 4B n 4B n

Consumer

group 保存位移 offset,替换 zookeeper 保存 (/consumers/groupid/offsets/topic/partition节点 )。checkpointing 定期从 consumer 到 broker 对 offset 进行持久化。(log.flush.offset.checkpoint.interval.ms 默认 60s)
offset 格式 =map(groupId+topic+partition, offset)

为什么不用 zookeeper 保存?

  • zookeeper 不擅长频繁写 (强一致性)

为什么不用 broker 保存?

  1. 增加应答机制,确认消费成功,影响吞吐
  2. 保存多个 consumer 的 offset,数据结构复杂
  /**
   *  Start the background threads to flush logs and do log cleanup
   */
  def startup() {
    /* Schedule the cleanup task to delete old logs */
    if (scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention",
                         cleanupLogs _,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher",
                         flushDirtyLogs _,
                         delay = InitialTaskDelayMs,
                         period = flushCheckMs,
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointLogRecoveryOffsets _,
                         delay = InitialTaskDelayMs,
                         period = flushRecoveryOffsetCheckpointMs,
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-log-start-offset-checkpoint",
                         checkpointLogStartOffsets _,
                         delay = InitialTaskDelayMs,
                         period = flushStartOffsetCheckpointMs,
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                         deleteLogs _,
                         delay = InitialTaskDelayMs,
                         unit = TimeUnit.MILLISECONDS)
    }
    if (cleanerConfig.enableCleaner)
      cleaner.startup()
  }

Consumer 配置

  • session.timeout.ms 协调者 (coordinator) 检测失败的时间,踢出 consumer rebalance
  • heartbeat.interval.ms 如果需要 rebalance,会在心跳线程的 response 中 set rebalance_in_progress,心跳线程间隔。必须小于 session.timeout.ms
  • max.poll.interval.ms consumer 处理逻辑最大时间 & consumer 启动选举 coordinator 时间
  • auto.offset.reset earliest|lastest 更换 group 后,重新消费。 默认 lastest
  • enable.auto.commit false 手动提交位移
  • auto.commit.interval.ms 自动提交位移时间间隔
  • fetch.max.bytes 如果消息很大,需要手动设置 50 * 1024 * 1024
  • max.poll.records 单次调用返回的消息数 500
  • connections.max.idle.ms 默认 9 分钟,推荐 -1。不关闭空闲连接,周期性请求处理时间增加。
  • partition.assignment.strategy partition 分配策略, 默认 RangeAssignor

partition 分配策略

每个 partition 分配给一个 consumer。

e.g. 如果一个 group 订阅一个 topic,一个 topic 有 100 个 partition,一个 group 有 5 个 consumer。则每个 consumer 消费 20 个 partition

partition 分配策略,继承 AbstractPartitionAssignor 自定义策略规则,加权重等。自带分配规则:

  1. range 分区顺序排列、分组、分配给 consumer
  2. round-robin 分区顺序排列, 轮询 consumer,读取分区
  3. sticky 基于历史分配方案,避免数据倾斜
public class RangeAssignor extends AbstractPartitionAssignor {

    @Override
    public String name() {
        return "range";
    }

    private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics())
                put(res, topic, consumerId);
        }
        return res;
    }

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList<TopicPartition>());

        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List<String> consumersForTopic = topicEntry.getValue();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null)
                continue;

            Collections.sort(consumersForTopic);

            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
            }
        }
        return assignment;
    }

}

误解

使用过程中对 kafka consumer 的一些误解

误解 1

poll(long timeout)max.poll.records 按照时间或者消息记录数,控制每次获取消息。

poll 表示轮询,使用 poll 而不是 pull,并不需要 wakeup。所以可以使用poll(Long.MAX_VALUE),每次数据流准备好后,会返回并进行业务处理。

误解 2

"consumer 只能订阅一个 topic。"

consumer.subscribe(Pattern.compile("kafka.*"))
误解 3

"commitSync 同步提交,阻塞消费。commitAsync 异步提交,不阻塞消费。"

commitSync 和 commitAsync 都会阻塞 poll,因为在 poll 执行时轮询时会判断 commit 状态。commitAsync 不阻塞业务处理后续方法执行。

    void invokeCompletedOffsetCommitCallbacks() {
        while (true) {
            OffsetCommitCompletion completion = completedOffsetCommits.poll();
            if (completion == null)
                break;
            completion.invoke();
        }
    }

EOS(Exactly-once Semantics)

  • at most once 最多一次,消息可能丢失,但不会被重复处理。获取消息后,先 commit,然后业务处理。
  • at least once 最少一次 消息不会丢失,但可能被处理多次。获取消息后,先业务处理,然后 commit。
  • exactly once 会被处理且只会被处理一次

消费指定 partition

自定义分配策略?不需要,可以通过 assign 指定 topic partition

consumer.assign(Collections.singletonList(new TopicPartition(TOPIC, partition)));
  • assign + subscribe 冲突错误java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
  • assign + assign 后一个生效
  • 2 个 consumer assign 同一个 partition 消费两次
  • 一个 consumer assign 一个 consumer subscribe, rebalance 踢出 assign

控制提交分区 offset

Map<TopicPartition, OffsetAndMetadata> offsets 控制提交分区 offset,细粒度

consumer.commitSync(Collections.singletonMap(tp, offset));

rebalance

状态机

触发条件

  1. consumer 加入、退出、崩溃
  2. topic 发生变更,如正则匹配,增加 topic
  3. 分区发生变动
  4. 消费处理超时

rebalance 开销大,合理设置 request.timeout.ms、max.poll.records 和 max.poll.interval.ms 减少 rebalance 次数

rebalance generation 标识 rebalance,每次 +1, 延迟提交 offset 会被 group 拒绝 ILLEGAL_GENERATION

协议

  1. joinCroup 请求
  2. SyncGroup 请求,group leader 同步分配方案
  3. Heartbeat 请求 向 coordinator 汇报心跳
  4. LeaveGroup 请求
  5. DescribeGroup 查看组信息

确认 所在 broker
Math.abs(groupId.hashCode) % offsets.topic.num.partitions 确认分区,此分区所在的 leader broker

  1. 收集 join consumer,选取 leader,同步给 coordinator。 leader 负责分配
  2. 同步更新分配方案,发送 SyncGroup 请求给 coordinator,每个 consumer 都发送,coordinator 接受 leader 的方案,分配,返回 response
def handleJoinGroupRequest(request: RequestChannel.Request) {
    val joinGroupRequest = request.body[JoinGroupRequest]

    // the callback for sending a join-group response
    def sendResponseCallback(joinResult: JoinGroupResult) {
      val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
      def createResponse(requestThrottleMs: Int): AbstractResponse = {
        val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId,
          joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)

        trace("Sending join group response %s for correlation id %d to client %s."
          .format(responseBody, request.header.correlationId, request.header.clientId))
        responseBody
      }
      sendResponseMaybeThrottle(request, createResponse)
    }

    if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.groupId(), LITERAL))) {
      sendResponseMaybeThrottle(request, requestThrottleMs =>
        new JoinGroupResponse(
          requestThrottleMs,
          Errors.GROUP_AUTHORIZATION_FAILED,
          JoinGroupResponse.UNKNOWN_GENERATION_ID,
          JoinGroupResponse.UNKNOWN_PROTOCOL,
          JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
          JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
          Collections.emptyMap())
      )
    } else {
      // let the coordinator handle join-group
      val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
        (protocol.name, Utils.toArray(protocol.metadata))).toList
      groupCoordinator.handleJoinGroup(
        joinGroupRequest.groupId,
        joinGroupRequest.memberId,
        request.header.clientId,
        request.session.clientAddress.toString,
        joinGroupRequest.rebalanceTimeout,
        joinGroupRequest.sessionTimeout,
        joinGroupRequest.protocolType,
        protocols,
        sendResponseCallback)
    }
  }

多线程消费

自己实现缓存区,批量执行及确认 consumer.commitSync

  1. 多 consumer thread 效率高,单独 offset。 缺点:受限于 topic 分区数,broker 压力大,rebalance 可能性大
  2. 单 consumer 多 handler thread ,获取和处理节藕,伸缩性好。难于管理分区内消息顺序,位移提交困难,处理不当导致数据丢失。

其他

日志留存

  • log.retemtopm.{hours|minutes|ms}
  • log.retention.bytes 字节 默认 -1
  • 当前日志段不会清除
  • 和日志最近修改时间比较、比较记录时间戳

暂停 consumer 消费

e.g. 消费逻辑为调用三方接口,如果三方接口不稳定,需要关闭一段时间。

  • 暂停 consumer.pause(consumer.assignment());
  • 启动 consumer.resume(consumer.assignment());

compaction

  • 订阅 binlog see canal
  • 高可用日志化

manager 指定 lisnterners

因为 kafka 内部使用全称域名,不统一,导致无法获取元数据

生产环境

  1. 优雅的启动和关闭 (Spring 生命周期)
  2. offset 跳过与重放

附录

  1. Apache Kafka 实战
  2. Page Cache 的落地问题
  3. Kafka 文件存储机制那些事
  4. Netty 之 Zero-copy 的实现(下)
  5. 磁盘 I/O 那些事
  6. Kafka 消费组 (consumer group)

  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    19 引用 • 18 回帖 • 207 关注
感谢    关注    收藏    赞同    反对    举报    分享