Kafka 入门实战

一、docker-compose部署kafka

安装compose

在 192.168.1.63 机器进行部署:

# ①、下载,国内镜像

# 版本过老
# curl -L https://get.daocloud.io/docker/compose/releases/download/2.6.1/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

sudo curl -L "https://get.daocloud.io/docker/compose/releases/download/v2.6.1/docker-compose-$(uname -s)-$(uname -m)"   > /usr/local/bin/docker-compose

# ②、给docker compose 目录授权
chmod +x /usr/local/bin/docker-compose

# ③、查看一下version,显示有版本号那就说明安装成功了
docker-compose version

部署

编写docker-compose.yml文件

version: '3.9'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./zookeeper/data:/data
    ports:
      - 2181:2181
  kafka:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.63:9092 # 暴露在外的地址
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
      - /etc/localtime:/etc/localtime
      - ./kafka/kafka-logs:/kafka
    depends_on:
      - zookeeper
    links:
      - zookeeper
  kafka-manager:
    image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面
    environment:
        ZK_HOSTS: zookeeper:2181
        KAFKA_MANAGER_AUTH_ENABLED: "true"
        KAFKA_MANAGER_USERNAME: "admin"
        KAFKA_MANAGER_PASSWORD: "admin"
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
      - kafka
    links:
      - zookeeper

运行:

 docker-compose up -d

 ...
 Status: Downloaded newer image for sheepkiller/kafka-manager:latest
Creating mmp_zookeeper_1 ... done
Creating mmp_kafka_1     ... done
Creating mmp_kafka-manager_1 ... done

2.docker-compose ps查看是否启动成功

[root@node2 mmp]# docker-compose ps
        Name                      Command               State                                  Ports                                
------------------------------------------------------------------------------------------------------------------------------------
mmp_kafka-manager_1   ./start-kafka-manager.sh         Up      0.0.0.0:9000->9000/tcp,:::9000->9000/tcp                            
mmp_kafka_1           start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp,:::9092->9092/tcp                            
mmp_zookeeper_1       /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
[root@node2 mmp]# 

3.查看启动的容器,进入kafka容器

docker ps
docker exec -it kafka_kafka_1 /bin/bash

4.创建一个名称为test的topic:

kafka-topics.sh --create --topic test \
--zookeeper zookeeper:2181 --replication-factor 1 \
--partitions 1

5.查看刚刚创建的topic信息

kafka-topics.sh --zookeeper zookeeper:2181 \
--describe --topic test

6、打开生产者发送若干条消息

kafka-console-producer.sh --topic=test \
--broker-list kafka:9092

7.新开一个shell窗口,进入kafka容器,打开消费者接收消息

kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning --topic test

8.如果发送消息和接收消息成功,则证明kafka可用

9.查看kafka的版本

find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

查出的这个文件名称中,2.13是Scala版本,2.7.0是Kafka版本

10.访问kafka的管理页面
http://192.168.31.249:9000/

file

file

方式二:安装kafdrop

version: '3.9'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./zookeeper/data:/data
    ports:
      - 2181:2181
  kafka:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://11.0.1.60:9092 # 暴露在外的地址这里改为自己的
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
      - /etc/localtime:/etc/localtime
      - ./kafka/kafka-logs:/kafka
    depends_on:
      - zookeeper
    links:
      - zookeeper
  kafdrop:
    image: obsidiandynamics/kafdrop:3.27.0
    container_name: kafdrop
    restart: "no"
    environment:
        KAFKA_BROKERCONNECT: "kafka:9092"
        JVM_OPTS: "-Xms64M -Xmx128M"
    ports:
      - "9000:9000"
    depends_on:
      - kafka

http://11.0.1.60:9000/

file
file

生产者:
file

消费者:
file

查看消费的消息:
file

file

查看kafka日志:

[root@dev test-0]# pwd
/mmp/kafka/kafka-logs/kafka-logs-fe3aa69d36e0/test-0
[root@dev test-0]# ls -l
total 8
-rw-r--r-- 1 root root 10485760 Nov 26 22:56 00000000000000000000.index
-rw-r--r-- 1 root root      463 Nov 26 23:08 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Nov 26 22:56 00000000000000000000.timeindex
-rw-r--r-- 1 root root        8 Nov 26 22:56 leader-epoch-checkpoint

[root@dev test-0]# 

docker-compose卸载:

cd /mmp/

[root@dev mmp]# ls -l
-rw-r--r-- 1 root root 860 Nov 26 06:39 docker-compose.yaml
drwxr-xr-x 4 root root  35 Nov 25 08:56 kafka
drwxr-xr-x 3 root root  18 Nov 25 08:56 zookeeper

# 启动
docker-compose up -d

# 应用卸载
docker-compose down

sudo rm /usr/local/bin/docker-compose

应用:

[root@dev mmp]# ls -l
total 4
-rw-r--r-- 1 root root 984 Nov 25 17:29 docker-compose.yaml
drwxr-xr-x 4 root root  35 Nov 25 08:56 kafka
drwxr-xr-x 3 root root  18 Nov 25 08:56 zookeeper
[root@dev mmp]# docker-compose down
[+] Running 4/4
 ⠿ Container mmp-kafka-manager-1  Removed                                                                                                  0.5s
 ⠿ Container mmp-kafka-1          Removed                                                                                                  6.0s
 ⠿ Container mmp-zookeeper-1      Removed                                                                                                 10.1s
 ⠿ Network mmp_default            Removed                                                                                                  0.1s
[root@dev mmp]# 

二、kafka理论

file

同步发送

在kafka集群的情况下:
file

在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢?
ack应答信号有三种设置:
1、ack=0:kafka集群不需要任何broker收到消息就立即返回ack给生产者。这种方式是最容易丢消息的,但是效率是最高的;
2、ack=1(默认):必须确保多个副本之间的leader已经收到消息,并把消息写入到本地的log中才会返回ack给生产者。这种方式是性能和安全性最均衡的。
3、ack=-1/all:还有一个配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完之后才会返回ack给生产者(也就是此时集群中有两个broker已完成数据的接收),这种方式是最安全、但性能最差的(相当于不仅要leader接收到数据还要完成一份备份)。min.insync.replicas=n(1
个leader+(n-1)个follower完成数据的接收)。

kafka集群消息的消费

从集群中消费消息:

./kafka-console-consumer.sh --bootstrap-server 192.168.1.106:9092, 192.168.1.106:9093, 192.168.1.106:9094 --from-beginning --topic my-replicated-topic

file

指定消费组来消费消息:

./kafka-console-consumer.sh --bootstrap-server 192.168.1.106:9092, 192.168.1.106:9093, 192.168.1.106:9094 --from-beginning --consumer -property group.id=testGroup1 --topic my-replicated-topic

总结:
1:一个partiton分区只能被一个消费组里面的一个消费者消费,目的是为了保证消费的顺序性,但是只能保证同一个分区的局部有序,一个主题下的多个partition分区的多个消费者消费的总顺序性是不保证的。(一个主题下的消息被拆分成了多个分区,而每个分区的消费都是对应不同的消费者,每个消费者之间是异步的,所以总的顺序是不被保证的——但是是有办法做到消费的总顺序性的);
2:partition分区的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition分区的数量,否则多出来的消费者消费不到消息(因为一个partition只能被消费组里面的一个消费者消费);
3:如果某一个消费者挂了,那么会触发rebalance机制,会让其他消费者来消费这个partition分区。

KafKa线上问题优化

11.1 如何防止消息丢失?
生产者:使用同步发送,将ack设置为1或者-1/all
消费者:将自动提交设置为手动提交

11.2 如何防止消息被重复消费?
file

生产者使用同步发送方式(为了防止消息丢失),kafka收到了来自生产者的消息,但是由于网络原因,生产者没有收到kafka返回的ack响应,于是生产者重发消息,这就会造成下游消费者的重复消费。那么怎么解决呢?
关闭同步发送可以解决重复消费问题但是会带来消息丢失的风险,不建议。
在下游消费者端(业务层面):
方案1:在数据库中创建联合主键(自增id+orderId)——对于入库操作来说
方案2:使用分布式锁 Redission.lock(orderId)

此外,KafKa在重平衡过程中是不能消费的,并且也不能提交offset,这就会导致消息重复消费
11.3 如何做到消息顺序消费?
在KafKa中只保证局部有序:单个partition是有序的,但是多个partition之间的全局有序性是不保证的,怎么做到这一点呢?
首先保证生产者发送过来的数据是有序的,也就是broker中的数据是有序的,
生产者:保证消息不丢失——使用同步发送,ack设置为非0值(也就是必须要有broker收到消息后)
消费者:①一个主题只创建一个partition分区,这样消息都发送到了一个分区里面,保证了消费的顺序;
②生产者在发送消息的时候指定要发往那个分区
其实就是把所有的消息都发送到一个分区里面,kafka保证了各个分区自己的有序性

11.4 如何解决消息积压问题?
消费速度远远赶不上消息的生产速度,导致kafka中有大量数据没有被消费,随着消息的堆积,消费者的寻址性能会越来越差,最后导致整个kafka对外提供的服务性能很差,从而造成其他服务也变慢,造成雪崩。
解决方案:
●在这个消费者中,使用多线程,充分利用机器的性能进行消费消息。
●通过业务的架构设计,提升业务层面消费的性能。
●创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度
●创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配.上多个消费者。该消费者将polI下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。- --不常用

11.5 延时队列
使用KafKa来实现一个消息队列:
正常情况下只要生产者发出消息,消费者端都是会立即接收到的,那么要想一个办法来让消息延迟发出去:
以订单超过30分钟未付款就自动取消订单为例:
思路:
1、用户取消了订单,延迟消息准备发出,但是延迟消息并不是直接发送到目标topic,而是专门发送到一个处理延迟消息的topic,例如delay-minutes-30
2、创建消费者拉取delay-minutes-30主题的消息,将满足条件的延迟消息发送到真正的目标topic
实现细节:
在消费者中不能用sleep方法来做延时,因为max.poll.interval.ms设置了两次poll的时间间隔,如果超出这个时间,kafka会认为这个消费者已经挂掉,会进行rebalance操作,而且这个消费者再也poll不到消息。


相关文章:
docker-compose部署kafka
KafKa详细入门实战

为者常成,行者常至