KAFKA是如何处理粘包拆包的

76次阅读
没有评论

共计 4692 个字符,预计需要花费 12 分钟才能阅读完成。

本篇内容主要讲解“KAFKA 是如何处理粘包拆包的”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“KAFKA 是如何处理粘包拆包的”吧!

一、为什么会出现粘包拆包现象?

我们知道,TCP 数据包都是按照协议进行拆包、编号然后分批发送的;
那么对应我们应用层有意义的数据包,传输层的协议并不了解其含义,更不会去根据你的业务内容去分包和发送,只会按照自己的协议栈去进行数据发送。
因此,就出现了网络数据的粘包,拆包问题。
究其本质,其实就是传输层并不了解上层应用的数据含义,只会按照协议栈进行数据发送。

二、通常有哪些解决粘包拆包问题的方法?

在了解出现这个问题的本质后,那么要想解决这个问题就很简单了。
不就是在进行数据接收的时候,我们应用层收到数据后根据标识判断一下,数据是否完整,如果完整了我们再进行数据包解析,最后交给业务代码不就好了?
通常解决粘包拆包的问题有三种方案:

定长,例如我保证我每一条数据都是 200b,那么我每接收到 200b 就认为是一条完整的数据,接着就可以进行解析,并向业务代码交付。

分隔符,一样的意思,我每条数据末尾都用一个分隔符例如换行符,制表符这种来标识这条数据写完了,那么我们收到数据判找一下这个分割符在哪儿,最后进行切割就可以得到完整的数据包了。

自定义协议,这个也很简单,就是定义一个你的完整数据包的内容格式是什么样子的,例如 len + data,其中 len 是代表 data 的字节长度。这样每次根据前面 4 个字节的 len,就能得到后面还需要多少数据才是一条完整的数据,少了就等,多了就截取。

最后,可能很多不熟悉网络编程的同学会纳闷,那万一 TCP 的数据包丢失了,乱序了,上面这种方法不就出问题了嘛?
其实不是的,TCP 一个可靠的消息传输协议,其协议的根本思想就是提供可靠的数据传输服务。
翻译一下就是,你可以相信 TCP 传输的数据是可靠的,在交付给应用层数据的时候,是不会出现上述这种情况的。
出现这种情况只会在传输层出现,而 TCP 协议也为对应的情况设计了分批、编号、去重、校验和、超时重传等一系列的操作,来保证数据可靠。

三、kakfa 是如何解决粘包拆包问题的呢?

最后,让我们来看下 kafka 是如何解决粘包拆包问题的呢?是以上面提到的哪种方式来解决的呢?
首先看粘包,也就是接收到了多余的数据,该如何拆分数据包,读取到正确完整的数据包?
如下面代码所示,分为三个阶段:

先读取前 4 字节,转换为一个 int,即长度。

根据长度申请内存 buffer。

最后读取指定大小的数据到申请好的 buffer

由此,就完整了一整条数据的正确读取。整个过程其实就是上面提到的 len+data 这么一个简单的自定义协议。

public NetworkReceive read() throws IOException { NetworkReceive result = null; //  新建一个 receive if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } //  真正的数据 read receive(receive); //  数据读取完成的后置操作  if (receive.complete()) { //  倒带,等待读 receive.payload().rewind(); //  直接引用赋值  result = receive; //  最后清空当前引用,然后等待下次进入 read 的时候,执行 new  操作  receive = null; } else if (receive.requiredMemoryAmountKnown()   !receive.memoryAllocated()   isInMutableState()) {//pool must be out of memory, mute ourselves. mute(); }return result;}
public long readFrom(ScatteringByteChannel channel) throws IOException {int read = 0;
 //  存在数据 if (size.hasRemaining()) { // len + dataint bytesRead = channel.read(size); if (bytesRead   0)throw new EOFException(); read += bytesRead;
 //  如果读满了长度,则直接倒带得到具体的 len 值
 //  这里的 size 是一个 byteBuffer 类型的,也就是接收到的数据  if (!size.hasRemaining()) {size.rewind(); int receiveSize = size.getInt(); if (receiveSize   0)throw new InvalidReceiveException(Invalid receive (size =   + receiveSize + )  if (maxSize != UNLIMITED   receiveSize   maxSize)throw new InvalidReceiveException(Invalid receive (size =   + receiveSize +   larger than   + maxSize + )  requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL) if (receiveSize == 0) {buffer = EMPTY_BUFFER; }
 }
 } //  如果长度已经就绪了,那么就需要接下来的 data 需要多少空间,在这里进行申请 if (buffer == null   requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet buffer = memoryPool.tryAllocate(requestedBufferSize); if (buffer == null)log.trace(Broker low on memory - could not allocate buffer of size {} for source {} , requestedBufferSize, source); }
 //  申请完毕之后,就调用 read 函数,直接 read 出来即可。if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead   0)throw new EOFException(); read += bytesRead; } //  返回读取的总字节数 return read;}

再先看拆包,也就是接收到数据不够组成一条完整的数据,该如何等待完整的数据包?
下面代码最核心的就是 receive.complete() 函数的判断逻辑,这个判断的三个条件分别意味着:

!size.hasRemaining():接收到的 buffer 数据已经读取完成。

buffer != null:buffer 已经创建。

!buffer.hasRemaining():buffer 已经读取完成。

翻译一下,其实就是只要一条数据没读完整,那么 receive.complete() 函数返回值就是 false,那么最终返回的结果就是 null,等待下一次 OP_READ 事件的时候再接着上次没读完的数据读取,直到读取一条完整的数据为止。

public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool); }
 receive(receive); if (receive.complete()) {receive.payload().rewind(); result = receive; receive = null; } else if (receive.requiredMemoryAmountKnown()   !receive.memoryAllocated()   isInMutableState()) {//pool must be out of memory, mute ourselves. mute(); }return result;}
public boolean complete() { return !size.hasRemaining()   buffer != null   !buffer.hasRemaining();}

最后,我们再补充一点,当我们一次性收到很多条数据的时候,会如何处理呢?
下面的源码告诉了我们答案,就是一次性全部读取出来,然后存入 stageReceives 这个数据结构中等待下一步业务处理。

private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {//if channel is ready and has bytes to read from socket or buffer, and has no //previous receive(s) already staged or otherwise in progress then read from it if (channel.ready()   (key.isReadable() || channel.hasBytesBuffered())   !hasStagedReceive(channel)
   !explicitlyMutedChannels.contains(channel)) { NetworkReceive networkReceive; //  一次性读取所有的 receives,暂存到 stageReceives 中  while ((networkReceive = channel.read()) != null) { madeReadProgressLastPoll = true; addToStagedReceives(channel, networkReceive); }// isMute 是判断当前 channel 是否关注了 OP_READ 事件  if (channel.isMute()) {outOfMemory = true; //channel has muted itself due to memory pressure. } else {madeReadProgressLastPoll = true; }
 }
}

到此,相信大家对“KAFKA 是如何处理粘包拆包的”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-25发表,共计4692字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)