1.单线程

2.aof 和 rdb

为什么说他轻量级

:::color4
因为像原来的platform thread创建时,需要在栈空间上分配1mb的内存,1000个线程就需要1g栈空间

而虚拟线程则不用

:::

执行原理

:::color4
当我们创建并且运行虚拟线程时,jvm后台会帮助我们去创建一个platform thread pool,且pool size=Runtime.getAvailableCore

我们可以像创建普通Java对象一样创建虚拟线程,你可以把虚拟对象看做平台线程的普通版的。那他要如何执行呢,它会绑定到平台线程上。且一个平台线程只能绑定一个虚拟线程,当虚拟线程里的任务执行完时就会断开。

:::

适用场景:

:::color4
IO阻塞任务!!!!

为啥?阻塞时,虚拟线程会剥离平台线程,然后会生成堆栈快照。且它阻塞完重新被jvm bind到platform thread时会把原来的platform堆栈删掉,并且把之前剥离时产出的堆栈copy并加载到平台线程的堆栈,从而继续完成

最大的好处就是在IO任务避免浪费了cpu

:::

那如果一个虚拟线程阻塞后发现所有线程都被占用了,那会发生什么?

:::color4
就继续等待,就这么简单

:::

意义

:::color4
1.内存占用小

2.虚拟线程的开销比较小,因为只有mouting or unmouting。但平台线程的parking就会继续上下文切换,这就涉及到用户->内核->用户

:::

:::success
在大多数现代 OS(比如 Linux)里,线程调度由内核完成,所以线程上下文切换通常伴随 用户态 内核态 的切换。

上下文切换 (Context Switch) 指的是 CPU 从执行一个线程(或进程)切换到另一个线程(或进程)。

切换时需要保存和恢复很多内容,例如:

  • 通用寄存器(eax、rbx…)
  • 程序计数器(下一条要执行的指令地址)
  • 栈指针
  • 内存映射(页表信息,切换进程时才会变)

👉 换句话说,就是 CPU 不能“中途换人跑”,必须先把当前线程的“执行现场”保存好,再加载另一个线程的“执行现场”。

:::

JDK1.5开始,开始将工作单元和工作机制分离开来,工作单元就是Runnable和Callable这种,

而具体工作由Excutor具体提供

在hotspot vm的线程模型中,用户线程和内核线程是一对一的

Excutor可以分为两个部分,一个是架构一个是其包含的组件

Excutor的结构

1.任务:执行的任务需要实现的接口:Runnable(无返回值)和Callable(有返回值)

2.任务的执行:任务执行的core interface:Excutor接口,以及继承于这个接口的ExcutorService接口。

ExcutorService接口有两个非常重要的实现类,ThreadPoolExcutor和ScheduleThreadPoolExcutor

3.异步计算的结果:包括接口Future和实现接口Future的FutureTask类

Executor框架的成员:

- ThreadPoolExecutor 
- ScheduledThreadPoolExecutor 
- Future接口 
- Runnable接口和Callable接口 

Runnable、Callable、Future、RunnableFuture、FutureTask、CompletableFuture:

Runnable:只有一个void run()方法,Thread实现了这个接口,所以创建Thread的时候实现这个接口,然后thread.start()就能创建一个线程执行run(),或者往Thread构造方法中传入Runnable的一个实现类。

Callable:有返回值的call()方法

Future:接口,异步获取任务结果

RunnableFuture:接口,其实就相当于异步的Runnable

**FutureTask:RunnableFuture的实现类。内部其实内聚了一个Callable,即使你用第二个构造方法传入Runnable,也会包装成Callable!所以其实就相当于异步的Runnable和Callable(适配器模式),里面的run方法就是执行Callable的call()!其线程安全通过CAS保证 **

CompletableFuture:实现了Future接口,提供了许多更为强大的功能,使用:Java CompletableFuture

ScheduleThreadPoolExcutor

构造方法和普通线程池一样,或者使用Executors.newScheduledThreadPool(int coreSize)

核心方法:

延迟执行一次:schedule(Runnable command, long delay,TimeUnit unit)

固定频率执行:scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),首次在initialDelay后执行,之后每隔period时间执行一次。如果任务时间超过周期,会等待上次执行完毕

固定延迟执行:scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit),和上面类似,但每次计算是上次任务结束后开始计算延迟

1.运维问题

原生部署zk需要强制部署 zk,这使得kafka运维人员需要同时具备运维zk和kafka的能力

2.为了保证可用性

原生kafka与zk交互是依赖单个broker即Controller,如果旧broker故障了,会选举出一个

新的Controller节点。新的Controller继任成功,会从zk上拉取元数据同步初始化,同时通知其他broker更新

新的ActiveControllerID。且旧broker需要关闭监听、事件处理线程和定时任务。这个过程如果partition特别多,

那么就会导致整个过程非常漫长,且这个过程kafka是无法处理消息的,处于不可用状态

3.分区瓶颈

当分区数增加时,<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Zookeeper</font>保存的元数据变多,<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Zookeeper</font>集群压力变大,达到一定级别后,监听延迟增加,

<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Kafaka</font>的工作带来了影响。所以,<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Kafka</font>单集群承载的分区数量是一个瓶颈。而这又恰恰是一些业务场景需要

的。

升级

<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">KIP-500</font><font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Quorum Controller</font>代替之前的<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Controller</font><font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Quorum</font>中每个<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Controller</font>节点都会保存所有

元数据,通过<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">KRaft</font>协议保证副本的一致性。这样即使<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Quorum Controller</font>节点出故障了,新的<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">Controlle</font>

<font style="color:rgb(10, 191, 91);background-color:rgb(243, 245, 249);">r</font>迁移也会非常快。

即由单个Controller变成了多个Controller,即由独裁变为了议会制

升级kafka后,官方说轻松支持百万partition

早期kafka集群是由zk来保证,我们公司也是,后面kafka更新了,不需要zk了

早期版本:Consumer靠zk来保存offset,而producer不与zk打交道

生产数据的流程

早期版本:

1. producer需要找到集群,并把那个数据放到某个分区中。而这个leader端口号和分区存在zk中,集群和zk一直处于连接状态,partition的leader端口号回由集群去拿出,然后producer会向集群拿到topic对应的partition以及partition的leader信息
2. <font style="color:rgb(85, 85, 85);background-color:rgb(253, 253, 253);">当Producer通过Sender从集群获取到partition和Leader信息,若</font>**<font style="color:rgb(85, 85, 85);background-color:rgb(253, 253, 253);">有指定partition则使用指定的partition</font>**<font style="color:rgb(85, 85, 85);background-color:rgb(253, 253, 253);">,若没有则使用分区算法对key做操作;当没有key则轮询partition;</font>
3. producer会先把数据存入DQ中,每一个partition一个DQ,就是一种消息聚合的方式,sender会轮询,队列满了或者到达一定的时间周期,就会发送给leader。  

Producer给Leader发数据使用批处理,如果没有批处理每次发送都建立连接在进程间做交互,会使效率很低
4. leader将数据写入本地的log日志分段
5. 后续Consumer轮询从broker拉取消息,Kafka的ACK应答机制(producer,三种,0,1,2 );

三种ACK模式:

当取值为0,则不关心是否到达,尽最大努力交付,效率高,数据可能丢失;

取值为1(默认),Producer的发送数据,需要等待Leader的应答才能发生下一条,不关心Follower是否接收成功,性能稍慢,数据较安全,但当Leader突然宕机,则当Follower还未同步,数据会丢失;

取值为 -1(all) ,Producer发送数据,需要等待ISR内的所有副本(leader和所有Follower)都完成备份,最安全,性能差;需要等待follower一定时间后拉取数据,也就是这个”一定时间”是其效率的主要影响

![](/images/a27e7ae381e0c5a8aefe7ee4d84f556c.png)

详细流程


sender方法:

1.producer会将数据push到broker上,每条消息会被追加到topic下的一个partition里的log文件里的末

尾,保证分区有序性,且会顺序写入磁盘

2.为了不让log文件过大,kafka还采用了分区分段存储,即log体积达到了某个阈值,就会生成新的log文件,

后面来的消息会追加到新的文件里

3.文件结构

一个topic里有多个partition,partition内有多个segement,一个segement包括三个文件index,log, timeIndex

partition就是一个文件夹,属于物理概念。命名规则为 topic + 分区号 如 first-1

而segement也是个逻辑概念

index,log文件命名为 segement里第一条消息的offset

首先kafka是分布式集群,吞吐量大。天生分布式,随便加机器

零拷贝

这里的零拷贝是对于应用层而言

我们要将磁盘的数据发送到远端的服务器去,一般的应用做法是

0.调用系统函数read,切换为内核态

1.先将数据从磁盘拷贝到page cache

2.然后切换到**用户态,**将数据拷贝到应用层

3.切换为内核态,调用write,将数据写入page cache

4.最后将page cache 拷贝到网卡 缓冲区

5.切换为用户态,继续程序的运行

这其中需要经过四次拷贝,四次用户内核态切换

而对于sendFile的零拷贝

**数据不需要再拷贝到应用层,而是由内核态全程负责,让数据从磁盘拷贝到内核缓存,然后直接发给NIC **

网卡缓冲区,且用户态内核态也只需要切换两次

PS:DMA

顺序写磁盘

page cache预热

写page cache

分段日志

Leader将文件切分为多个segement,好处就是让 IO更快

Topic→partition→segment

index、log以当前segement的第一条消息的offset命名,以便可以快速查找

log里面存的是消息,而index里存的是offset和其对应的 物理偏移

Kafka消息(存储)格式及索引组织方式 - 杭州.Mark - 博客园

双端队列与批处理

在producer这边,每一个partition在其producer都会对应着一个DQ

1.producer发送数据时都会往这个DQ里存消息

2.DQ满了或者一定的时间周期到了,producer就会拿出里面的消息,send到broker的Leader里。这即满足了

partition里消息的有序性,还提高了效率性

但消息聚合会降低一定的实时性

而Consumer这边也是一样,先拉取一批数据到本地的DQ里,然后消费

压缩:给的字符串,会被压缩成byte数组,压缩后数据小,传输快

提供可靠性支持,每个partition都会其replication。broker会从这些里面选举其Leader和follower。

Leader和follower其实都是副本

producer和Consumer只会和 Leader交互

Leader会将数据同步到follower,如果Leader主动push数据,这时候会让Leader负载过大,故这时候

应该让follower主动定时去pull数据

Kafka集群的目的是保存消息,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,生产者把数据以K、V的方式传给集群;

首先Kafka需要多台机器组成一个Kafka集群才能承载负荷,每一台服务器是一个Broker,

数据有多种进行分类 → 一个Broker中有Topic主题

为了提高负载 → 一个Topic有多个partition分区,一个partition可以放在多个不同的Broker上,而数据放在不同的partition当中(取模的方式),可以让不同的消费者来消费,提高消费速率

为了防止数据丢失 → 每个分区下有多个副本,即Leader和Follower,Leader做io处理,Follower只作备份,多个分区进行“交叉备份”,Follower从Leader中实时同步数据,当Leader挂掉后,Follower会代替Leader。

以【消费者组】为单位进行读取数据,其中一个消费者读取一个partition分区,所以一般partition分区数量=消费者组中的Consumer消费者数量,以保证分区内数据有序

Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

Kafka -> Broker -> Topic -> partition -> Replication(Leader、Follower) ->Consumer

1)Producer :消息生产者,就是向 kafka broker中的Topic主题发消息的客户端;

2)Consumer :消息消费者,向 kafka broker中的Topic中 取消息的客户端;

3)Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

4)Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。 一个 broker可以容纳多个topic。

5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;topic逻辑上的概念,partition是物理上的概念;

6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;partition中的每条消息都会被分配一个有序的id(offset)

7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,

且kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。

8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

9)follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

10)Offset:偏移量, kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。

当然the first offset就是00000000000.kafka。

点到点

同一条消息只会被一个消费者消费

发布订阅


一个消费者可以被多个消费者消费