凌月风的个人博客

记录精彩人生

Open Source, Open Mind,
Open Sight, Open Future!
  menu

Java笔记系列——06-网络通信(1)

0 浏览

网络通信

TCP/IP网络通信

  • 七层模型,亦称OSI(Open System Interconnection)。 参考模型是 国际标准化组织 (ISO)制定的一个用于 计算机 或 通信系统 间互联的标准体系,一般称为 OSI参考模型 或七层模型
  • TCP/IP协议是由七层模型简化成四层而来
    image-20220621212341157
  • 通信流程
    image-20220621212434128
  • 三次握手
    image-20220621212515620
  • 四次挥手
    image-20220621212532088

IO模型

  • 对于TCP通信来说,每个TCP Socket的内核中都有一个发送缓冲区和一个接收缓冲区 接收缓冲区把数据缓存到内核
    • 若应用进程一直没有调用Socket的read方法进行读取,那么该数据会一 直被缓存在接收缓冲区内。
    • 不管进程是否读取Socket,对端发来的数据都会经过内核接收并缓存到 Socket的内核接收缓冲区。
    • read所要做的工作,就是把内核接收缓冲区中的数据复制到应用层用户的Buffer里。
    • 进程调用Socket的send发送数据的时候,一般情况下是将数据从应用层用户的Buffer里复制到Socket的 内核发送缓冲区,然后send就会在上层返回。换句话说,send返回时,数据不一定会被发送到对端image-20220621213601256

同步阻塞IO(BIO)

  • 同步阻塞IO:客户端向服务端发起一个数据读取请求,客户端在收到服务端返回数据之前,一直处于阻塞状态,直到服务端返回数据后完成本次会话。这个交互模型就叫做同步阻塞IO模型,也叫BIO模型。
    image-20220621214428157

  • 同步阻塞IO主要体现在两个阻塞点

    • 服务端接收客户端连接时的阻塞。

    • 客户端和服务端的IO通信时,数据未就绪的情况下的阻塞。

      public class BIOExample {
          public void server() throws Exception {
              ServerSocket socket = new ServerSocket(8888);
              while (true) {
                  Socket accept = socket.accept(); // 连接阻塞
                  System.out.println("监听到新客户端连接,客户端端口:" + accept.getPort());
                  InputStream inputStream = accept.getInputStream(); // IO阻塞
                  BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                  System.out.println("收到客户端消息:" + reader.readLine());
                  OutputStream outputStream = accept.getOutputStream();
                  BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream));
                  writer.write("Hello Client : " + accept.getPort());
                  writer.flush();
              }
          }
          public static void main(String[] args) throws Exception {
              new BIOExample().server();
          }
      }
      

  • 在单线程BIO模型中,服务端在同一个时刻只能处理一个客户端的连接,而如果一个网站同时 有1000个用户访问,那么剩下的999个用户都需要等待,而这个等待的耗时取决于前面的请求的处理时 长
    image-20220614200158645

  • 使用多线程优化BIO模型,将阻塞部分交给多线程去处理,就可以让服务端能够同时处理更多的客户端连接,避免因为某个客户端连接阻塞导致后续请求被阻塞。使用多线程的BIO模型也就是一个请求对应一个线程。

    public class BIOThreadExample {
        static ExecutorService executorService = Executors.newFixedThreadPool(5);
    
        public void server() throws Exception {
            ServerSocket socket = new ServerSocket(8888);
            while (true) {
                Socket accept = socket.accept();
                System.out.println("监听到新客户端连接,客户端端口:" + accept.getPort());
                executorService.submit(new BIOThread(accept));
            }
        }
    
        class BIOThread implements Runnable {
            Socket accept;
            public BIOThread(Socket accept) {
                this.accept = accept;
            }
            @Override
            public void run() {
                try {
                    InputStream inputStream = accept.getInputStream();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                    System.out.println("收到客户端消息:" + reader.readLine());
                    OutputStream outputStream = accept.getOutputStream();
                    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream));
                    writer.write("Hello Client : " + accept.getPort());
                    writer.flush();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            new BIOThreadExample().server();
        }
    }
    

  • 当引入了多线程之后,每个客户端的链接(Socket),我们可以直接给到线程池去执 行,而由于这个过程是异步的,所以并不会同步阻塞影响后续链接的监听,因此在一定程度上可以提升 服务端链接的处理数量。
    image-20220614200129406

  • 使用多线程的方式来解决这个问题,仍然有一个缺点,线程的数量取决于硬件配置,所以线程数量是有 限的,如果请求量比较大的时候,线程本身会收到限制从而并发量也不会太高。那怎么办呢,我们可以 采用非阻塞IO。

同步非阻塞IO(NIO)

  • 同步非阻塞IO:NIO的本意是New IO,jdk1.4 提出的,它的出现为了弥补原本BIO的不足,提供了更高效的方式。
    • 当客户端向服务端发起请求时,如果服务端的数据未就绪的情况下, 客户端请求不会被阻塞,而是直接返回。
    • 有可能服务端的数据还未准备好的时候,客户端收到的返回是一个空的。
    • 在此模型下客户端想要获取准备好的数据,需要通过轮询的方式来获得请求结果。
    • NIO相比BIO来说,少了阻塞的过程,因此在性能和连接数上都会有明显提高
      image-20220614210454540

  • NIO模型中,将连接变为非阻塞

    public class NIOExample {
        public void server() throws IOException {
            ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress(8888));
            channel.configureBlocking(false);// 设置连接非阻塞
            while (true) {
                SocketChannel accept = channel.accept();
                if (accept == null) {
                    // System.out.println("客户端未连接");
                    continue;
                }
                System.out.println("新的客户端连接,端口:" + accept.getRemoteAddress());
            }
        }
        public static void main(String[] args) throws IOException {
            new NIOExample().server();
        }
    }
    

  • 在NIO模型中,客户端或者服务端需要通过一个线程不断轮询才能 获得结果,而这个轮询过程中会浪费线程资源。
    • 轮询过程中会有很多空轮询,而这个轮询会存在大量的系统调用(发起内核指令从网卡缓冲区中加载数据,用户空间到内核空间的切换),随着连接数量的增加,会导致性能问题。
    • 我们能不能够设计成,当客户端调用 read 方法之后,不仅仅不阻塞,同时也不需要轮询。而是等到服 务端的数据就绪之后, 告诉客户端。然后客户端再去读取服务端返回的数据呢?
    • 设计升级,引入I/O多路复用解决这个痛点

IO多路复用

  • 多路复用提供了一种机制,当调用方调用 read 方法之后,不仅仅不阻塞,同时也不需要轮询。而是等到被调用方的数据就绪之后, 告诉调用方。然后调用方再去读取被调用方返回的数据。
  • 在Linux中,内核把所有的外部设备都当成是一个文件来操作,对一个文件的读写会调 用内核提供的系统命令,返回一个fd(文件描述符)。而对于一个socket的读写也会有相应的文件描 述符,称为socketfd
    • I/O多路复用的本质是通过一种机制(系统内核缓冲I/O数据),让单个进程可以监视多个文件描述符, 一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作。

  • Linux API提供的常见IO复用方式
    • select:进程可以通过把一个或者多个fd传递给select系统调用,进程会阻塞在select操作上,这 样select可以帮我们检测多个fd是否处于就绪状态,这个模式有两个缺点 由于他能够同时监听多个文件描述符,假如说有1000个,这个时候如果其中一个fd 处于就绪 状态了,那么当前进程需要线性轮询所有的fd,也就是监听的fd越多,性能开销越大。 同时,select在单个进程中能打开的fd是有限制的,默认是1024,对于那些需要支持单机上 万的TCP连接来说确实有点少
      image-20220621212650237
    • poll:本质上跟select类似,优化了单个进程能打开的fd上限
    • epoll:是基于事件驱动方式来代替顺序扫描,因此性能相 对来说更高,主要原理是,当被监听的fd中,有fd就绪时,会告知当前进程具体哪一个fd就绪,那 么当前进程只需要去从指定的fd上读取数据即可,另外,epoll所能支持的fd上线是操作系统的最 大文件句柄,这个数字要远远大于1024
      image-20220621212703509

  • NIO使用多路复用流程
    1. 客户端请求到服务端后,此时客户端在传输数据过程中,为了避免Server端在获取客户端数据过程中阻 塞,服务端会把该请求注册到Selector复用器上
    2. 服务端此时不需要等待,只需要启动一个线程,通过 selector.select()阻塞后等待复用器上就绪的channel即可。
    3. 如果某个客户端连接数据传输完 成,那么select()方法会返回就绪的channel,然后执行相关的处理即可。
      image-20220614210743605

  • 基于多路复用的NIO模型实现

    public class NIOSelectorExample implements Runnable {
        Selector selector;
        ServerSocketChannel channel;
        public NIOSelectorExample() throws IOException {
            // 多路复用器
            selector = Selector.open();
            // 异步通道监听TCP连接
            channel = ServerSocketChannel.open().bind(new InetSocketAddress(8888));
            // 设置连接非阻塞
            channel.configureBlocking(false);
            // 将接收TCP连接事件注册到多路复用器上
            channel.register(selector, SelectionKey.OP_ACCEPT);
        }
        public static void main(String[] args) throws IOException {
            new Thread(new NIOSelectorExample()).start();
        }
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    // 多路复用器阻塞,等待事件就绪
                    selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
                        // 说明有连接准备就绪进来了
                        dispatch(iterator.next());
                        // 移除,防止多次处理
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        public void dispatch(SelectionKey key) throws IOException {
            // 目前只注册了一个 SelectionKey.OP_ACCEPT 的监听
            if (key.isAcceptable()) {
                // 因为注册的时候是ServerSocketChannel,所以拿到也是ServerSocketChannel
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                // 根据就绪的key拿到对应的连接
                SocketChannel accept = ((ServerSocketChannel) key.channel()).accept();
                System.out.println("新客户端连接:" + accept.getRemoteAddress());
                // 设置IO非阻塞
                accept.configureBlocking(false);
                // 获取连接之后需要IO获取信息了,这里体现了多路复用,注册到了同一个Selector上
                accept.register(selector, SelectionKey.OP_READ);
            } else if (key.isReadable()) {
                // 此时说明是有一个IO读取准备就绪了
                // 因为注册的Channel 就是 SocketChannel,所以直接拿到
                SocketChannel channel = (SocketChannel) key.channel();
                // 读取数据
                ByteBuffer allocate = ByteBuffer.allocate(1024);
                channel.read(allocate);
                System.out.println("收到消息:" + new String(allocate.array()));
                // 读取完数据,下一步进行写回。注册到同一个Selector
                channel.register(selector, SelectionKey.OP_WRITE);
            } else if (key.isWritable()) {
                // 因为注册的Channel 就是 SocketChannel,所以直接拿到
                SocketChannel channel = (SocketChannel) key.channel();
                // 写回数据
                ByteBuffer allocate = ByteBuffer.allocate(1024);
                allocate.put("Hello Client".getBytes(StandardCharsets.UTF_8));
                allocate.flip();
                channel.write(allocate);
                // 写回操作,然后继续
                channel.register(selector, SelectionKey.OP_READ);
            }
        }
    }
    

  • 使用epoll模型的I/O多路复用的优点:

    • 通过NIO的多路复用机制,解决了IO阻塞导致客户端连接处理受限的问题
    • 可以把多个I/O的阻塞复用到同一个selector上,从而使得系统在单线程 的情况下可以同时处理多个客户端请求。它的最大优势是系统开销小,并且不需要创建新的进程或者线 程,降低了系统的资源开销。

异步非阻塞IO(AIO)

  • 异步IO 和 多路复用机制,最大的区别在于:当数据就绪后,客户端不需要发送内核指令从内核空间读取数据,而是系统会异步把这个数据直接拷贝到用户空间,应用程序只需要直接使用该数据即可
    image-20220621215537605

Reactor 模型

  • Reactor本质上还是NIO的多路复用,是基于NIO多路复用机制提出的一个高性能IO设计模式。它的核心思想是把响应IO事件和业务处理进行分离,通过一个或者多个线程来处理IO事件,根据职责进行了拆分,多个角色负责各自的工作。模型有三个重要的组件或者说角色:
    • Reactor :负责监听事件并转发请求
    • Acceptor :处理客户端连接请求
    • Handler :执行非阻塞读/写

  • 单线程单Reactor模型
    • 其中Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处理,有IO读写事件之后交给handler 处理。
    • Acceptor主要任务就是构建handler ,在获取到和client相关的SocketChannel之后 ,绑定到相应的
      handler上,对应的SocketChannel有读写事件之后,基于reactor 分发,handler就可以处理了(所有的IO事件都绑定到selector上,由Reactor分发)
    • 执行过程中,Handler的处理是串行的
      image-20220615104156495

  • 多线程单Reactor模型
    • 由于单线程Reactor模型存在着缺点,handler的执行是串行的,如果其中一个handler处理线程阻塞将导致其他的业务处理阻塞。由于handler和reactor在同一个线程中的执行,这也将导致新的无法接收新的请求
    • 使用多线程的方式来处理业务,也就是在业务处理的地方加入线程池异步处理,将reactor和handler在不同的线程来执行 。执行过程中,将Handler的处理交给多线程去处理。
    • 相较于单线程单Reactor模型,只是IO操作的步骤交给了多线程处理。
      image-20220615111114890

  • 多线程多Reactor

    • 在多线程单Reactor模型中,我们发现所有的I/O操作是由一个Reactor来完成,而Reactor运行在单个线程中,它需要处理包括 Accept / read / write / connect 操作,对于小容量的场景,影响不大。但是对于高负载、大并发或大数据量的应用场景时,容易成为瓶颈 。为此引入多Reactor多线程模式

      • 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的读取和发送;
      • 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
    • 多线程中的角色及作用

      • Main Reactor,负责接收客户端的连接请求,然后把接收到的请求进行转发
      • Acceptor,主要负责连接事件,并将IO读写请求转发到 SubReactor线程池。
      • Sub Reactor,Main Reactor 通常监听客户端连接后会将通道的读写转发到 Sub Reactor 线程池中一个线程(负载均衡),负责数据的读写。在 NIO 中 通常注册通道的读(OP_READ)、写事件(OP_WRITE) image-20220615114013078

  • 单线程单Reactor模型代码示例

    public class ReactorExample implements Runnable {
        final Selector selector;
        final ServerSocketChannel socketChannel;
    
        public ReactorExample() throws IOException {
            selector = Selector.open();
            socketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8888));
            socketChannel.configureBlocking(false);
            //  注册的时候将Acceptor处理器,作为一个附加对象传入
            socketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, socketChannel));
        }
    
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
    
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
    
                        dispatch(iterator.next());
                        iterator.remove();
    
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        private void dispatch(SelectionKey key) {
            // 获取注册时传入的附加对象
            Runnable runnable = (Runnable) key.attachment();
            // 之所以采用Runnable实际是为了更好的复用,实现其他接口也是一样的
            runnable.run();
        }
    
        class Acceptor implements Runnable {
            final Selector selector;
            final ServerSocketChannel socketChannel;
    
            public Acceptor(Selector selector, ServerSocketChannel socketChannel) {
                this.selector = selector;
                this.socketChannel = socketChannel;
            }
    
            @Override
            public void run() {
                SocketChannel accept = null;
                try {
                    accept = socketChannel.accept();
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ, new Handler(selector, accept));
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
        class Handler implements Runnable {
            Selector selector;
            SocketChannel channel;
    
            public Handler(Selector selector, SocketChannel channel) {
                this.selector = selector;
                this.channel = channel;
            }
    
            @Override
            public void run() {
                ByteBuffer allocate = ByteBuffer.allocate(1024);
                try {
                    channel.read(allocate);
                    System.out.println("收到消息:" + new String(allocate.array()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

  • 多线程单Reactor模型示例

    public class ReactorThreadExample implements Runnable {
        final Selector selector;
        final ServerSocketChannel socketChannel;
        static ExecutorService executorService =  Executors.newFixedThreadPool(5);
        public ReactorThreadExample() throws IOException {
            selector = Selector.open();
            socketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8888));
            socketChannel.configureBlocking(false);
            //  注册的时候将Acceptor处理器,作为一个附加对象传入
            socketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, socketChannel));
        }
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        dispatch(iterator.next());
                        iterator.remove();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        private void dispatch(SelectionKey key) {
            // 获取注册时传入的附加对象
            Runnable runnable = (Runnable) key.attachment();
            // 之所以采用Runnable实际是为了更好的复用,实现其他接口也是一样的
            runnable.run();
        }
        class Acceptor implements Runnable {
            final Selector selector;
            final ServerSocketChannel socketChannel;
            public Acceptor(Selector selector, ServerSocketChannel socketChannel) {
                this.selector = selector;
                this.socketChannel = socketChannel;
            }
            @Override
            public void run() {
                SocketChannel accept = null;
                try {
                    accept = socketChannel.accept();
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ, new Handler(selector, accept));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        class Handler implements Runnable {
            Selector selector;
            SocketChannel channel;
            public Handler(Selector selector, SocketChannel channel) {
                this.selector = selector;
                this.channel = channel;
            }
            @Override
            public void run() {
                executorService.execute(() -> {
                    ByteBuffer allocate = ByteBuffer.allocate(1024);
                    try {
                        channel.read(allocate);
                        System.out.println("收到消息:" + new String(allocate.array()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
    

序列化

  • 把一个对象实现跨JVM、跨网络传输 需要将对象进行序列化,收到信息之后进行反序列化得到对象
  • 常用序列化技术:json (fastjson/gson/jackson)xmljavaprotobufkyroavrojutemessagepackmarshallingthrifthessianhessian(dubbo)hessian(sofa)

  • Protobuf序列化
    • Protobuf是Google的一种数据交换格式,它独立于语言、独立于平台。Google提供了多种语言来实现,比如Java、C、Go、Python,每一种实现都包含了相应语言的编译器和库文件,Protobuf是一个纯粹的表示层协议,可以和各种传输层协议一起使用。
    • Protobuf使用比较广泛,主要是空间开销小和性能比较好,非常适合用于公司内部对性能要求高的RPC调用。 另外由于解析性能比较高,序列化以后数据量相对较少,所以也可以应用在对象的持久化场景中
    • 使用Protobuf会相对来说麻烦些,因为他有自己的语法,有自己的编译器,如果需要用到的话必须要投入成本在这个技术的学习中
    • protobuf有个缺点就是要传输的每一个类的结构都要生成对应的.proto文件,如果某个类发生修改,还需要重新生成该类对应的.proto文件
    • 使用protobuf开发的一般步骤是
      1. 配置开发环境,安装protocol compiler代码编译器
      2. 编写.proto文件,定义序列化对象的数据结构
      3. 基于编写的.proto文件,使用protocol compiler编译器生成对应的序列化/反序列化工具类
      4. 基于自动生成的代码,编写自己的序列化应用
    • Protobuf序列化存储格式image-20220621235221408 image-20220621235154896
    • protobuf里面用到了两种压缩算法,一种是varint,另一种是zigzag

  • 常用的序列化方式有很多,那么在工具的选择上要有以下考虑:
    • 性能:空间-> 序列化之后的数据报文大小,时间-> 消耗的时间
    • 语言特性:是否支持多种开发语言,是否支持跨平台
    • 成熟度
    • 扩展性、兼容性
  • 通常情况下,在性能要求不严格的网络场景可以使用json序列化,性能更高的使用protobuf、avro

分布式一致性协议

  • 分布式存储系统通常通过维护多个副本来进行容错,提高系统的可用性。要实现此目标,就必须要解决分布式存储系统的最核心问题:维护多个副本的一致性。

    • 一致性是构建具有容错性(fault-tolerant)的分布式系统的基础。
    • 在一个具有一致性的性质的集群里面,同一时刻所有的结点对存储在其中的某个值都有相同的结果,即对其共享的存储保持一致。
    • 集群具有自动恢复的性质,当少数结点失效的时候不影响集群的正常工作,当大多数集群中的结点失效的时候,集群则会停止服务(不会返回一个错误的结果)。
  • 一致性协议的作用就是为了保证 基于分布式系统架构下的所有节点进行事务处理过程中能够保持一致性

    • 一致性协议通常基于replicated state machines,即所有结点都从同一个state出发,都经过同样的一些操作序列(log),最后到达同样的state。

CAP理论

  • CAP理论表达了一个分布式系统里不可能同时满足以下的三个特性:

    • Consistency(一致性)
    • Availability(可用性)
    • Partition tolerance(分区容忍性)
  • 这三个性质对应了分布式系统的三个指标:

    • **一致性:**分布式系统节点中,各节点存储的数据都是一致的。对于客户端的每次读操作,分布式系统中每个节点响应的都是最新数据,强调的是数据正确。
    • **可用性:**指系统提供的服务必须一直处于可用的状态,每次请求都能获取到非错的响应——但是不保证获取的数据为最新数据。任何客户端的请求都能得到响应数据,但不保证数据最新,强调的是必须有响应。
    • **分区容忍性:**由于分布式系统通过网络进行通信,网络是不可靠的。当任意数量的消息丢失或延迟到达时,系统仍会继续提供服务,不会挂掉。换句话说,分区容忍性是站在分布式系统的角度,对访问本系统的客户端的再一种承诺:我会一直运行,不管我的内部出现何种数据同步问题,强调的是不挂掉。
  • CAP理论说的就是:一个分布式系统,不可能同时做到这三点。在不存在网络失败的情况下(分布式系统正常运行时),C和A能够同时保证。只有当网络发生分区或失败时,才会在C和A之间做出选择。

    • 对于一个分布式系统而言,P是前提,必须保证,因为只要有网络交互就一定会有延迟和数据丢失,这种状况我们必须接受,必须保证系统不能挂掉。所以只剩下C、A可以选择。要么保证数据一致性(保证数据绝对正确),要么保证可用性(保证系统不出错)。
    • 当选择了C(一致性)时,如果由于网络分区而无法保证特定信息是最新的,则系统将返回错误或超时。
    • 当选择了A(可用性)时,系统将始终处理客户端的查询并尝试返回最新的可用的信息版本,即使由于网络分区而无法保证其是最新的。

Raft协议

  • Raft协议规定一个节点会处于三种状态之一:Follower state、Candidate state、Leader state。
    • Follower:请求的被动更新者,从Leader接受更新请求,然后写入本地日志文件
    • Candidate:如果Follower状态在一段时间内没有收到Leader的心跳,则判断Leader可能已经故障,此时启动选主过程,此时节点会变成Candidate状态,直到选主结束。
    • Leader:所有请求的处理者,Leader节点接受Client的更新请求,本地处理后再同步至多个其他节点。
  • Raft协议将一致性协议的核心内容分拆成为几个关键阶段,以简化流程,提高协议的可理解性。
    • Leader election:选举Leader
      1. 初始状态下,所有的节点都是Follower状态,我们认为这些处于Follower状态的节点叫Follower(追随者/爱好者)
      2. 当Follower无法感知到Leader存在时(收不到Leader的心跳),会给自己设置一个 election timeout(选举超时时间),这个时间控制在150ms300ms之间的随机时间,等过了超时时间,该节点会变成一个Candidate(参选者)
      3. 成为Candidate后,会在本地记录一个第一届选举,然后投自己一票,并通知其他节点进行投票
      4. 节点在收到其他Candidate的投票请求之后,如果自己还没投过票,那就投请求节点一票然后返回给请求节点。同时自己的选举超时时间重新开始计时。
      5. 当Candidate收到半数以上的选票之后就会变为Leader
      6. Leader会不断的向其他节点发送信号,如果其他节点收到了信号,就会根据自己的选举超时时间重新计时
      7. 一旦节点没有收到Leader的信号,等待自己的选举超时时间计时完成,那么该节点变为一个Candidate,开始重复第3个步进行选举。
    • Log Replication
      • 当Leader被选出来后,就可以接受客户端发来的请求了。
      • Leader会把客户端发来的请求作为一个log entry 记录到自己的日志中,然后向其他节点发送AppendEntriesRPC请求。
      • 当其他节点收到Leader节点的请求,也会将这个操作记录到自己的日志中,然后返回给Leader一个消息,说自己记录完毕。
      • 当Leader收到大多数副本的记录成功的返回消息,那么Leader会将自己记录在日志中的操作执行,并返回给客户端说已经操作成功了。同时向其他节点发送消息,通知各节点提交各自节点的日志操作。
      • 由此来保证整个集群节点数据一致性
  • 当脑裂现象发生处理
    • 如果Leader节点在少数派
      • 此时客户端的请求会记录到Leader自己的日志中,但是因为自己下边的节点少于一半,所以不会提交。
      • 当脑裂恢复时,原来的Leader发现自己是上一届的Leader,那么会放弃自己的身份,从新的Leader选取同步数据,同时将自己本地未提交的日志记录的操作回滚

Gossip协议

  • Gossip protocol 也叫 Epidemic Protocol (流行病协议),别名很多比如:“流言算法”、“疫情传播算 法”等。 Gossip是一种去中心化、容错并保证最终一致性的协议。假设我们提前设置如下规则:
    1. Gossip 是周期性的散播消息,把周期限定为 1 秒
    2. 被感染节点随机选择 k 个邻接节点(fan-out)散播消息,这里把 fan-out 设置为 3,每次最多往 3 个节点散播。
    3. 每次散播消息都选择尚未发送过的节点进行散播
    4. 收到消息的节点不再往发送节点散播,比如 A -> B,那么 B 进行散播的时候,不再发给 A。
    5. 这里一共有 16 个节点,节点 1 为初始被感染节点,通过 Gossip 过程,最终所有节点都被感染
      image-20220617120610156

  • gossip协议包含多种消息,包括pingpongmeetfail等等。
    • ping:每个节点都会频繁给其他节点发送ping,其中包含自己的状态还有自己维护的集群元数据,通过ping交换元数据;
    • pong: 返回pingmeet,包含自己的状态和其他信息,也可以用于信息广播和更新;
    • fail: 某个节点判断另一个节点fail之后,就发送fail给其他节点,通知其他节点,指定的节点宕机了。
    • meet:某个节点发送meet给新加入的节点,让新节点加入集群中

  • gossip协议的特点

    • 在节点数量有限的网络中,每个节点都会“随机”(不是真正随机,而是根据规则选择通信 节点)与部分节点通信,经过一番杂乱无章的通信后,每个节点的状态在一定时间内会达成一致
    • 元数据的更新比较分散,不是集中在一个地方,更新请求会陆陆续续,打 到所有节点上去更新有一定的延时,降低了压力;
    • 去中心化、可扩展、容错、一致性收敛、简单。
    • 由 于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终 一致性协议。
    • 元数据更新有延时可能导致集群的一些操作会有一些滞后。 造成消息的延迟 , 消息冗余 。

2PC协议

  • 两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。
    • 将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。
    • 二阶段提交协议也被认为是一种一致性协议,用来保证分布式系统数据的一致性。
  • 在两阶段提交协议中,系统一般包含两类角色
    • 一类为协调者(coordinator),通常一个系统中只有一个;
    • 另一类为事务参与者(participants,cohorts或workers),一般包含多个

  • 步骤

    • 我们的应用程序(client)发起一个开始请求到协调者;
    • 协调者先将消息写到本地日志,之后向所有的事务参与者发起消息。
    • 事务参与者在收到消息后,执行具体本机事务,但不会进行commit,如果成功返回,不成功返回。返回前都应把要返回的消息写到日志里,当作凭证。
    • 协调者收集所有执行器返回的消息,如果所有执行器都返回,那么给所有执行器发生送commit消息,执行器收到commit后执行本地事务的commit操作;如果有任一个执行器返回no,那么给所有执行器发送abort消息,执行器收到abort消息后执行事务abort操作。image-20220617222646806
image/svg+xml