centos7 下 kafka 集群安装部署

下载 kafka

[root@kafka1 ~]# wget http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
--2018-04-20 23:57:35--  http://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
正在解析主机 mirrors.hust.edu.cn (mirrors.hust.edu.cn)... 202.114.18.160
正在连接 mirrors.hust.edu.cn (mirrors.hust.edu.cn)|202.114.18.160|:80... 已连接。
已发出 HTTP 请求,正在等待回应... 302 Found
位置:http://123.147.165.12:9990/mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz [跟随至新的 URL]
--2018-04-20 23:57:36--  http://123.147.165.12:9990/mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
正在连接 123.147.165.12:9990... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度:56969154 (54M) [application/octet-stream]
正在保存至: “kafka_2.11-1.1.0.tgz”

100%[=====================================================================================================================================================>] 56,969,154  22.5MB/s 用时 2.4s   

2018-04-20 23:57:39 (22.5 MB/s) - 已保存 “kafka_2.11-1.1.0.tgz” [56969154/56969154])

解压

tar zxvf kafka_2.11-1.1.0.tgz -C /usr/local/

安装 zookeeper

ZooKeeper 高可用集群的安装及配置

host ip port env usage os
kafka1 192.168.31.219 9092 zk3.4.11 集群 node1 centos7.4
kafka2 192.168.31.220 9092 zk3.4.11 集群 node2 centos7.4
kafka3 192.168.31.221 9092 zk3.4.11 集群 node3 centos7.4
说明:kafka 名中的 2.11 是 Scala 语言版本,后面的 1.1.0 是 kafka 版本,端口默认为 9092。

安装步骤

下载 kafka 并解压

tar zxvf kafka_2.11-1.1.0.tgz -C /usr/local/

image

编辑配置文件

vim server.properties

broker.id=1
host.name=192.168.31.219 --新增的
log.dirs=/usr/local/kafka_2.11-1.1.0/kafka-logs
zookeeper.connect=192.168.31.160:2181,192.168.31.161:2181,192.168.31.162:2181

其他配置可以保持默认,保持,退出;
在其他 2 台执行同样的操作,broker.id=1
host.name=192.168.31.219 对应需要变化

scp -r kafka_2.11-1.1.0 root@192.168.31.220:/usr/local/
scp -r kafka_2.11-1.1.0 root@192.168.31.221:/usr/local/

每个 kafka broker 中配置文件 server.properties 默认必须配置的属性如下:

broker.id=0  
num.network.threads=2  
num.io.threads=8  
socket.send.buffer.bytes=1048576  
socket.receive.buffer.bytes=1048576  
socket.request.max.bytes=104857600  
log.dirs=/tmp/kafka-logs  
num.partitions=2  
log.retention.hours=168  

log.segment.bytes=536870912  
log.retention.check.interval.ms=60000  
log.cleaner.enable=false  

zookeeper.connect=localhost:2181  
zookeeper.connection.timeout.ms=1000000  

server.properties 中所有配置参数说明 (解释) 如下列表:

参数 说明 (解释)
broker.id=0 每一个 broker 在集群中的唯一表示,要求是正数。当该服务器的 IP 地址发生改变时,broker.id 没有变化,则不会影响 consumers 的消息情况
log.dirs=/data/kafka-logs kafka 数据的存放地址,多个地址的话用逗号分割, 多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2
port=9092 broker server 服务端口
message.max.bytes =6525000 表示消息体的最大大小,单位是字节
num.network.threads =4 broker 处理消息的最大线程数,一般情况下数量为 cpu 核数
num.io.threads =8 broker 处理磁盘 IO 的线程数,数值为 cpu 核数 2 倍
background.threads=4 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
queued.max.requests =500 等待 IO 线程处理的请求队列最大数,若是等待 IO 的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
host.name broker 的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到 ZK,一般不设置
socket.send.buffer.bytes=100*1024 socket 的发送缓冲区,socket 的调优参数 SO_SNDBUFF
socket.receive.buffer.bytes =100*1024 socket 的接受缓冲区,socket 的调优参数 SO_RCVBUFF
socket.request.max.bytes =10010241024 socket 请求的最大数值,防止 serverOOM,message.max.bytes 必然要小于 socket.request.max.bytes,会被 topic 创建时的指定参数覆盖
log.segment.bytes =102410241024 topic 的分区是以一堆 segment 文件存储的,这个控制每个 segment 的大小,会被 topic 创建时的指定参数覆盖
log.roll.hours =24*7 这个参数会在日志 segment 没有达到 log.segment.bytes 设置的大小,也会强制新建一个 segment 会被 topic 创建时的指定参数覆盖
log.cleanup.policy = delete 日志清理策略选择有:delete 和 compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic 创建时的指定参数覆盖
log.retention.minutes=300 或 log.retention.hours=24 数据文件保留多长时间, 存储的最大时间超过这个时间会根据 log.cleanup.policy 设置数据清除策略 log.retention.bytes 和 log.retention.minutes 或 log.retention.hours 任意一个达到要求,都会执行删除; 有 2 删除数据文件方式:按照文件大小删除:log.retention.bytes; 按照 2 中不同时间粒度删除:分别为分钟,小时
log.retention.bytes=-1 topic 每个分区的最大文件大小,一个 topic 的大小限制 = 分区数 *log.retention.bytes。-1 没有大小限 log.retention.bytes 和 log.retention.minutes 任意一个达到要求,都会执行删除,会被 topic 创建时的指定参数覆盖
log.retention.check.interval.ms=5minutes 文件大小检查的周期时间,是否处罚 log.cleanup.policy 中设置的策略
log.cleaner.enable=false 是否开启日志清理
log.cleaner.threads = 2 日志清理运行的线程数
log.cleaner.io.max.bytes.per.second=None 日志清理时候处理的最大大小
log.cleaner.dedupe.buffer.size=50010241024 日志清理去重时候的缓存空间,在空间允许的情况下,越大越好
log.cleaner.io.buffer.size=512*1024 日志清理时候用到的 IO 块大小一般不需要修改
log.cleaner.io.buffer.load.factor =0.9 日志清理中 hash 表的扩大因子一般不需要修改
log.cleaner.backoff.ms =15000 检查是否处罚日志清理的间隔
log.cleaner.min.cleanable.ratio=0.5 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被 topic 创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同 log.retention.minutes 的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被 topic 创建时的指定参数覆盖
log.index.size.max.bytes =1010241024 对于 segment 日志的索引文件大小限制,会被 topic 创建时的指定参数覆盖
log.index.interval.bytes =4096 当执行一个 fetch 操作后,需要一定的空间来扫描最近的 offset 大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.flush.interval.messages=None 例如 log.flush.interval.messages=1000; 表示每当消息记录数达到 1000 时 flush 一次数据到磁盘 log 文件”sync”到磁盘之前累积的消息条数, 因为磁盘 IO 操作是一个慢操作, 但又是一个”数据可靠性 "的必要手段, 所以此参数的设置, 需要在" 数据可靠性 "与" 性能 "之间做必要的权衡. 如果此值过大, 将会导致每次"fsync"的时间较长 (IO 阻塞), 如果此值过小, 将会导致"fsync" 的次数较多, 这也意味着整体的 client 请求有一定的延迟. 物理 server 故障, 将会导致没有 fsync 的消息丢失.
log.flush.scheduler.interval.ms =3000 检查是否需要固化到硬盘的时间间隔
log.flush.interval.ms = None 例如:log.flush.interval.ms=1000; 表示每间隔 1000 毫秒 flush 一次数据到磁盘仅仅通过 interval 来控制消息的磁盘写入时机, 是不足的. 此参数用于控制 "fsync" 的时间间隔, 如果消息量始终没有达到阀值, 但是离上一次磁盘同步的时间间隔达到阀值, 也将触发.
log.delete.delay.ms =60000 文件在索引中清除后保留的时间一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000 控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改
auto.create.topics.enable =true 是否允许自动创建 topic,若是 false,就需要通过命令创建 topic
default.replication.factor =1 是否允许自动创建 topic,若是 false,就需要通过命令创建 topic
num.partitions =1 每个 topic 的分区个数,若是在 topic 创建时候没有指定的话会被 topic 创建时的指定参数覆盖

以下是 kafka 中 Leader,replicas 配置参数

参数 说明 (解释)
controller.socket.timeout.ms =30000 partition leader 与 replicas 之间通讯时,socket 的超时时间
controller.message.queue.size=10 partition leader 与 replicas 数据同步时, 消息的队列尺寸
replica.lag.time.max.ms =10000 replicas 响应 partition leader 的最长等待时间,若是超过这个时间,就将 replicas 列入 ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.max.messages =4000 如果 follower 落后与 leader 太多, 将会认为此 follower[或者说 partition relicas] 已经失效 ## 通常, 在 follower 与 leader 通讯时, 因为网络延迟或者链接断开, 总会导致 replicas 中消息同步滞后 ## 如果消息之后太多,leader 将认为此 follower 网络延迟较大或者消息吞吐能力有限, 将会把此 replicas 迁移 ## 到其他 follower 中.## 在 broker 数量较少, 或者网络不足的环境中, 建议提高此值.
replica.socket.timeout.ms=30*1000 follower 与 leader 之间的 socket 超时时间
replica.socket.receive.buffer.bytes=64*1024 leader 复制时候的 socket 缓存大小
replica.fetch.max.bytes =1024*1024 replicas 每次获取数据的最大大小
replica.fetch.wait.max.ms =500 replicas 同 leader 之间通信的最大等待时间,失败了会重试
replica.fetch.min.bytes =1 fetch 的最小数据尺寸, 如果 leader 中尚未同步的数据不足此值, 将会阻塞, 直到满足条件
num.replica.fetchers=1 leader 进行复制的线程数,增大这个数值会增加 follower 的 IO
replica.high.watermark.checkpoint.interval.ms =5000 每个 replica 检查是否将最高水位进行固化的频率
controlled.shutdown.enable =false 是否允许控制器关闭 broker , 若是设置为 true, 会关闭所有在这个 broker 上的 leader,并转移到其他 broker
controlled.shutdown.max.retries =3 控制器关闭的尝试次数
controlled.shutdown.retry.backoff.ms =5000 每次关闭尝试的时间间隔
leader.imbalance.per.broker.percentage =10 leader 的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.check.interval.seconds =300 检查 leader 是否不平衡的时间间隔
offset.metadata.max.bytes 客户端保留 offset 信息的最大空间大小

kafka 中 zookeeper 参数配置

参数 说明 (解释)
zookeeper.connect = localhost:2181 zookeeper 集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.session.timeout.ms=6000 ZooKeeper 的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms =6000 ZooKeeper 的连接超时时间
zookeeper.sync.time.ms =2000 ZooKeeper 集群中 leader 和 follower 之间的同步时间

启动 kafka

/usr/local/kafka_2.11-1.1.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-1.1.0/config/server.properties -后台进程启动

/usr/local/kafka_2.11-1.1.0/bin/kafka-server-stop.sh

自动启动

chmod a+x /etc/rc.local
vim /etc/rc.local
/usr/local/kafka_2.11-1.1.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-1.1.0/config/server.properties

检查测试

./bin/zkCli.sh -server 192.168.31.160:2181

ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config, hbase]
[zk: 192.168.31.160:2181(CONNECTED) 1] ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config, hbase]
[zk: 192.168.31.160:2181(CONNECTED) 2] ls /brokers
[ids, topics, seqid]
[zk: 192.168.31.160:2181(CONNECTED) 3] ls /brokers/ids
[1, 2, 3]
[zk: 192.168.31.160:2181(CONNECTED) 4] get /brokers/ids
null
cZxid = 0x30000005a
ctime = Sat Apr 21 00:50:05 CST 2018
mZxid = 0x30000005a
mtime = Sat Apr 21 00:50:05 CST 2018
pZxid = 0x300000091
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 3
[zk: 192.168.31.160:2181(CONNECTED) 5] ls /brokers/ids
[1, 2, 3]
[zk: 192.168.31.160:2181(CONNECTED) 6] 

参考资料:

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:PipeSoloSymWide 等,欢迎大家加入,贡献开源。

    3140 引用 • 3905 回帖 • 654 关注
  • CentOS

    CentOS(Community Enterprise Operating System)是 Linux 发行版之一,它是来自于 Red Hat Enterprise Linux 依照开放源代码规定释出的源代码所编译而成。由于出自同样的源代码,因此有些要求高度稳定的服务器以 CentOS 替代商业版的 Red Hat Enterprise Linux 使用。两者的不同在于 CentOS 并不包含封闭源代码软件。

    146 引用 • 128 回帖 • 725 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    19 引用 • 18 回帖 • 207 关注
感谢    关注    收藏    赞同    反对    举报    分享