凌月风的个人博客

记录精彩人生

Open Source, Open Mind,
Open Sight, Open Future!
  menu

Java笔记系列——08-分布式协调(Zookeeper)

0 浏览

1、Zookeeper

  • zookeeper是一个分布式的协调中间件,能解决分布式下的组件协调问题,所以才有许多应用使用zookeeper
    • kafka 集成Zookeeper,实现集群选举(leader选举)、配置管理
    • hbase 集成zookeeper,实现集群管理
    • sharding jdbc集成zookeeper,实现注册中心、配置中心

配置安装

  • 单机模式。
    1. 下载zookeeper安装包,完成后解压
    2. 初次使用zookeeper,需要将conf目录下的zoo_sample.cfg文件copy一份重命名为zoo.cfg
    3. 修改zoo.cfg文件中的dataDir目录,dataDir表示日志文件存放的路径
    4. 启动服务:bin/zkServer.sh start
    5. 查看服务状态:bin/zkServer.sh status
    6. 停止服务:bin/zkServer.sh stop
    7. 重启服务:bin/zkServer.sh restart
    8. 连接服务器:zkCli.sh -timeout 0 -r -server ip:port
  • 集群模式
    1. 多台服务器按照单机模式准备好配置
    2. 在数据文件目录下创建myid文件内容为数字编号。
    3. zoo.cfg文件中增加内容
      # 编号与myid文件内容一致
      server.1=192.168.221.128:2888:3888
      server.2=192.168.221.129.2888:3888
      server.3=192.168.221.130.2888:2888
      
    4. 如果是observer节点是如下配置
      #只有observer节点有这个
      peerType=observer
      server.1=192.168.221.128:2888:3888
      server.2=192.168.221.129:2888:3888
      server.3=192.168.221.0:2888:3888
      #所有机器都需要配置这个
      server.4=192.168.221.1:2888:3888:observer
      
    5. 启动zookeeper

数据结构

  • zookeeper的视图结构和标准的文件系统非常类似,每一个节点称之为ZNode, 是zookeeper的最小单元。每个znode上都可以保存数据以及挂载子节点,构成一个层次化的树形结构image-20220619123335089
  • 操作命令
    • 创建节点 :create [-s] [-e] [-c] [-t ttl] path [data] [acl]
      • [-s]:sequential 序列化的,即可以重复创建,在路径后面加上序列号
      • [-e]:ephemeral 临时的,断开连接后自动失效
      • [-c] :表示container node(容器节点),
      • [-t ttl]:表示TTL Nodes(带超时时间的节点)
      • [acl]:是针对这个节点创建一个权限的,如果创建权限了,则拥有权限的才可以访问
    • 删除节点:delete [-v version] path 删除节点,-v表示版本号,实现乐观锁机制
    • 更新节点:set [-s] [-v version] path data 给节点赋值 -s返回节点状态
    • 查询节点信息:get [-s] [-w] path 获取指定节点的值
  • Znode分为多种类型
    • 持久化节点,PERSISTENT。不会随客户端的断开而自动删除,是创建节点的默认类型。image-20220619123613172
    • 带序号的持久化节点,PERSISTENT_SEQUENTIAL。持久化节点增加了序号,znode的名字将被附加一个单调递增的数字image-20220619123701310
    • 临时节点,EPHEMERAL。当客户端断开时自动删除。示例:如果该Client创建了/Server1和/Server2这两个节点,当Client的session断开后,这两个节点会被Zookeeper自动删除。临时节点不能存在子节点image-20220619123803988
    • 带序号的临时节点,EPHEMERAL_SEQUENTIAL。临时节点增加了序号,znode的名字将被附加一个单调递增的数字。临时节点不能存在子节点image-20220619123922878
    • 容器节点,CONTAINER。container节点是一个特殊用途的节点,它是为Leader、Lock等操作而设计的节点类型,它的作用是: 当容器节点的最后一个子节点被删除后,容器节点将会被标注并且在一段时间后删除。由于容器节点存在这个特性,所以当我们在容器节点下创建一个子节点时,需要捕获KeeperException.NoNodeException异常,如果捕获到这个异常,就需要重新创建容器节点。
    • 超时节点,TTL。如果某个节点设置为TTL节点类型,那么这个节点在指定TTL时间(单位为毫秒)段内没有修改并且没有子节点时,该节点会在一段时间后被删除。属于zookeeper的扩展类型,默认不开启,如果要想使用该类型,必须在zookeeper的bin/zkService.sh中修改zookeeper的java环境变量zookeeper.extendedTypesEnabled=true
    • 带序号的超时节点,PERSISTENT_SEQUENTIAL_WITH_TTL。超时节点加入了序号。
  • 每一个Znode除了存储数据内容以外,还存储了数据节点本身的一些状态信息,通过get命令可以获得状态信息的详细内容image-20220619122502119
  • zookeeper为数据节点引入了版本的概念,每个数据节点都有三类版本信息,对数据节点任何更新操作都会引起版本号的变化 。version属性就是用来实现乐观锁机制的“写入校验” image-20220620120942406

功能特性

  • watcher机制

    • zookeeper提供了分布式数据的发布/订阅功能,zookeeper允许客户端向服务端注册一个watcher监 听,当服务端的一些指定事件触发了watcher,那么服务端就会向客户端发送一个事件通知。
    • zookeeper提供以下几种命令来对指定节点设置监听
      • get [-s] [-w] path:监听指定path节点的修改和删除事件。同样该事件也是一次性触发。
        get -w /node
        # 在其他窗口执行下面命令,会触发相关事件
        set /node 123
        delete /node
        
      • ls [-s] [-w] [-R] path : 监控指定path的子节点的添加和删除事件。
        ls -w /node
        # 在其他窗口执行下面命令,会触发相关事件
        create /node/node1
        delete /node/node1
        
      • stat [-w] path:作用和get完全相同
      • addWatch [-m mode] path:作用是针对指定节点添加事件监听,支持两种模式
        • PERSISTENT,持久化订阅,针对当前节点的修改和删除事件,以及当前节点的子节点的删除和新增事件。
        • PERSISTENT_RECURSIVE,持久化递归订阅,在PERSISTENT的基础上,增加了子节点修改 **的事件触发,以及子节点的子节点的数据变化都会触发相关事件(满足递归订阅特性) **
  • Session会话机制

    • 状态流转
      1. 首先,客户端向Zookeeper Server发起连接请求,此时状态为CONNECTING
      2. 当连接建立好之后,Session状态转化为CONNECTED,此时可以进行数据的IO操作。
      3. 如果Client和Server的连接出现丢失,则Client又会变成CONNECTING状态
      4. 如果会话过期或者主动关闭连接时,此时连接状态为CLOSE
      5. 如果是身份验证失败,直接结束 image-20220620121424247
  • 权限控制

    • Zookeeper作为一个分布式协调框架,内部存储了一些分布式系统运行时的状态的数据,比如master选举、比如分布式锁。对这些数据的操作会直接影响到分布式系统的运行状态。因此,为了保证zookeeper中的数据的安全性,避免误操作带来的影响。Zookeeper提供了一套ACL权限控制机制来保证数据的安全
    • ACL权限控制,使用: schemei:d:perm 来标识。
      • Scheme(权限模式),标识授权策略

        Zookeeper提供以下权限模式,所谓权限模式,就是使用什么样的方式来进行授权

        • world:默认方式,相当于全部都能访问。
        • auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
        • digest:即用户名:密码这种方式认证,这也是业务系统中最常用的。用字符串sername:password 来产生一个MD5串,然后该串被用来作为ACL ID。认证是通过明文发送username:password 来进行的,当用在ACL时,表达式为username:base64,base64是password的SHA1摘要的编码。
        • **ip:通过ip地址来做权限控制,比如 ip:192.168.1.1 表示权限控制都是针对这个ip地址的。也可以针对网段 **ip:192.168.1.1/24,此时addr中的有效位与客户端addr中的有效位进行比对
      • ID(授权对象):指权限赋予的用户或一个指定的实体,不同的权限模式下,授权对象不同 image-20220620122234312

      • Permission:指通过权限检查后可以被允许的操作,create /delete /read/write/admin

        • create: 允许对子节点Create 操作
        • read :允许对本节点GetChildren 和GetData 操作
        • write :允许对本节点SetData 操作
        • delete :允许对子节点Delete 操作
        • admin :允许对本节点setAcl 操作
    • ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限,每个znode支持设置多种 权限控制方案和多个权限,子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它 的子节点。

使用实例

  • 在java中使用Apache Curator完成zookeeper的操作 ,Curator是一个zookeeper的Java客户端
  • 每一个Java客户端其实都是提供了三个功能
    • 建立连接
    • 基于连接进行增删改查
    • 提供了一些解决方案的封装
  • 导入依赖
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>5.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>5.2.0</version>
    </dependency>
    
  • Example
    public class CuratorExample {
        final CuratorFramework curatorFramework;
    
        {
            curatorFramework = CuratorFrameworkFactory
                .builder()
                .connectionTimeoutMs(20000)
                .connectString("192.168.221.128:2181")
                // 指定重试策略
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .sessionTimeoutMs(15000)
                .build();
            curatorFramework.start(); //启动
        }
    
    
        /**
         * watch演示
         *
         * @param node 监听的节点
         */
        public void witchTest(String node) {
            CuratorCache curatorCache = CuratorCache.
                build(curatorFramework, node, CuratorCache.Options.SINGLE_NODE_CACHE);
            CuratorCacheListener listener = CuratorCacheListener.builder()
                .forAll((type, oldData, data) -> System.out.println("事件类型:" + type + ":childData:" + oldData + ":data" + data)).build();
            curatorCache.listenable().addListener(listener);
            curatorCache.start();
        }
    
        /**
         * 同步CURD
         */
        public void curdTest() throws Exception {
            // 创建持久化节点,如果父节点不存在一并创建,并为节点赋值
            String node = curatorFramework.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath("/node", "Hello World".getBytes());
    
            //获取节点的value,以及节点的状态信息,并将状态存储到stat
            Stat stat = new Stat();
            byte[] data = curatorFramework.getData().storingStatIn(stat).forPath(node);
    
            // 修改节点的值,
            curatorFramework.setData().withVersion(stat.getVersion()).forPath(node, "Update Date Result".getBytes());
    
            // 删除节点
            curatorFramework.delete().forPath(node);
            Stat existStat = curatorFramework.checkExists().forPath(node);
            if (existStat == null) {
                System.out.println("节点删除成功");
            }
        }
    
        /***
         * 异步CURD
         */
        public void asyncCRUD() throws Exception {
            String node = curatorFramework.create().withMode(CreateMode.PERSISTENT)
                .inBackground((session, event) -> {
                    System.out.println(Thread.currentThread().getName() + ":执行创建节点:" + event.getPath());
                }).forPath("/async-node");
            System.out.println("异步执行创建节点:" + node);
        }
    }
    

应用场景

  • 注册中心
    • 使用Watcher机制,配合SpringBoot 的 Environment,实现SpringBoot启动时的配置参数注入
  • 分布式锁
    • 使用唯一节点特性实现分布式锁。如果并发量大的情况下,线程锁释放之后,大量等待线程同时竞争,引发惊群效应,影响服务器性能。不推荐使用
    • 使用有序节点实现分布式锁
      1. 每个客户端都往指定的节点下注册一个临时有序节点,越早创建的节点,节点的顺序编号就越小,那么我们可以判断子节点中最小的节点设置为获得锁。
      2. 如果自己的节点不是所有子节点中最小的,意味着还没有获得锁。然后监听编号比自己小的节点删除事件。
      3. 当比自己小的节点删除以后,客户端会收到watcher事件,此时再次判断自己的节点是不是所有子节点中最小的,如果是则获得锁,否则就不断重复这个过程,这样就不会导致羊群效应,因为每个客户端只需要监控一个节点 image-20220620161739566
    • 可以使用curatorcurator提供的InterProcessMutex 这样一个api。除了分布式锁之外,还提供了leader选举、分布式队列等常用的功能。
      • InterProcessMutex:分布式可重入排它锁
      • InterProcessSemaphoreMutex:分布式排它锁
      • InterProcessReadWriteLock:分布式读写锁
  • leader选举
    • 在分布式计算中,leader election是很重要的一个功能,这个选举过程是这样子的:指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是leader或者coordinator。当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务leader。除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。image-20220620161911730
    • 可以通过Curator基于Zookeeper封装的Leader选举工具类LeaderLatch与LeaderSelector 来实现

集群

  • Zookeeper中,集群角色分为三种,分别是Leader、Follower、Observe
    • Leader服务器是整个zookeeper集群的核心,主要的工作任务有两项
      • 事物请求的唯一调度和处理者,保证集群事物处理的顺序性
      • 集群内部各服务器的调度者
    • Follower角色,Follower角色的主要职责是
      • 处理客户端非事物请求、转发事物请求给leader服务器
      • 参与事物请求Proposal的投票(需要半数以上服务器通过才能通知leader commit数据; Leader发 起的提案,要求Follower投票)
      • 参与Leader选举的投票
    • Observer角色,Observer是zookeeper3.3开始引入的一个全新的服务器角色,从字面来理解,该角色充当了观察者的角色。
      • 观察zookeeper集群中的最新状态变化并将这些状态变化同步到observer服务器上。
      • Observer的工作原理与follower角色基本一致,而它和follower角色唯一的不同在于observer不参与任何形式的投票,包括事物请求Proposal的投票和leader选举的投票。
      • **简单来说,observer服务器只提供非事物请求服务,通常在于不影响集群事物处理能力的前提下提升集群非事物处理的能力 **
  • 通常zookeeper是由2n+1台server组成,每个server都知道彼此的存在。每个server都维护的内存状态镜像以及持久化存储的事务日志和快照。

    **之所以要满足2n+1台server,是因为一个节点要成为集群中的leader,需要有超过及群众过半数的节点支持,这个涉及到leader选举算法。同时也涉及到事务请求的提交投票 **

  • zookeeper并不是强一致性服务,它是一个最终一致性模型:顺序一致性模型 。

    zookeeper不保证在每个实例中,两个不同的客户端具有相同的zookeeper数据视图,由于网络延迟等 **因素,一个客户端可能会在另外一个客户端收到更改通知之前执行更新,考虑到2个客户端A和B的场景,如果A把znode /a的值从0设置为1,然后告诉客户端B读取 /a, 则客户端B可能会读取到旧的值0,具体取决于他连接到那个服务器,如果客户端A和B要读取必须要读取到相同的值,那么client B在读取操作之前执行sync方法。 zooKeeper.sync(); **

    参考官网描述ZooKeeper: Because Coordinating Distributed Systems is a Zoo (apache.org)

  • zookeeper持久化
    • Zookeeper的数据是持久化在磁盘上的,默认的目录是在/tmp/zookeeper下,这个目录中会存放事务日志和快照日志。该路径可以通过zoo.cfg文件来修改。
    • **在Zab协议中每当有接收到客户端的事务请求后Leader与Follower都会将把该事务日志存入磁盘日志文件中,该日志文件名称格式为 **log.zxid, 其中zxid表示当前日志文件中开始记录的第一条数据的zxid
    • zxid,是Zookeeper中的数据对应的事务ID。为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了zxid。可以stat /node命令查看节点的信息
    • zxid是一个64位的数字,它高32位是epoch,用来标识leader关系是否改变。低32位用于递增计数 。

      每次一个leader被选出来,它都会计算一个新的epoch(原来的epoch+1),标识当前属于新leader的统治时期。

    • epoch标识了当前Leader周期,会存储到currentEpoch中。集群机器相互通信时,会带上这个epoch以确保彼此在同一个Leader周期中。
  • zookeeper使用了zab协议,用来进行原子广播以及崩溃恢复。
  • zookeeper数据同步(zab协议原子广播)
    • 数据同步原理
      • 使用到了zab协议的原子广播,这个原子广播其实就是相当于简化版本的2pc,半数提交

        **和完整的2pc事务不一样的地方在于,zab协议不能终止事务,follower节点要么ACK给leader,要么抛弃leader,只需要保证过半数的节点响应这个消息并提交了即可。虽然在某一个时刻follower节点和leader节点的状态会不一致,但是也是这个特性提升了集群的整体性能。 **

    • 在zookeeper中,客户端会随机连接到zookeeper集群中的一个节点。所有事务请求必须由一个全局唯一的服务器来协调处理,这个服务器就是Leader服务器,其他的服务器就是follower。
    • 数据同步流程
      1. 如果是读请求,就直接从当前节点中读取数据,返回给客户端
      2. 如果是写请求,那么请求会被转发给leader
      3. Leader服务器把客户端的请求转化成一个事务Proposal(提议),并把这个Proposal分发给集群中的所有Follower服务器。
      4. Follower收到Proposal ,本地操作,记录日志,还没有进行最终提交,然后返回消息给Leader。
      5. Leader服务器需要等待所有Follower服务器的反馈,一旦超过半数的Follower服务器进行了正确的反馈,那么Leader就会再次向所有的Follower服务器发送Commit消息,要求各个follower节点对前面的一个Proposal进行提交 。
    • 由于是半数提交,如果Leader服务器崩溃会带来的数据不一致问题。崩溃之后会进入到崩溃恢复模式,先选举出新的Leader,然后重新同步数据
  • zookeeper崩溃恢复
    • 由于使用的zab协议“半数提交”,当 Leader挂掉,然后选举的过程中要考虑数据的一致性问题
      • 如果Leader在收到合法数量 follower 的 ACK 后,就向各个 follower 广播 COMMIT 命令,同时也会在本地执行 COMMIT 并向连接的客户端返回「成功」。但是如果在各个 follower 在收到 COMMIT 命令前Leader 就挂了,导致剩下的服务器并没有执行都这条消息 。那么要保证最后的这个事务最终能够在所有的服务器上都能被提交成功,否则将会出现不一致
      • 当 leader 接收到消息请求生成 proposal 后就挂了,其他 follower 并没有收到此proposal,因此经过恢复模式重新选了 leader 后,这条消息是被跳过的。此时,之前挂了的 leader 重新启动并注册成了 follower,他保留了被跳过消息的 proposal 状态,与整个系统的状态是不一致的,需要将其删除
    • Leader选举算法需要能够确保已经被leader提交的事务Proposal能够提交、同时丢弃已经被跳过的事务Proposal。选举算法的规则是比较myid和zxid、epoch
    • 若进行Leader选举,则至少需要两台机器。
      • 集群初始化阶段,当有一台服务器Server1启动时,其单独无法进行和完成Leader选举,当第二台服务器Server2启动时,此时两台机器可以相互通信,每台机器都试图找到Leader,于是进入Leader选举阶段,选举步骤如下
        1. 每个Server发出一个投票。由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid、epoch,使用(myid, zxid,epoch)来表示
        2. 此时Server1的投票为(1,0,0),Server2的投票为(2,0,0),然后各自将这个投票发给集群中其他机器。
        3. 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票(epoch)、是否来自LOOKING状态的服务器。
        4. 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行比较。首先比较epoch,虽然存储在zxid中的高32位,但是,选择epoch大的服务器作为Leader。
        5. 如果epoch一样大,检查zxid。zxid比较大的服务器优先作为Leader
        6. 如果zxid相同,那么就比较myid。myid较大的服务器作为Leader服务器。
        7. 对于Server1而言,它的投票是(1,0,0),接收Server2的投票为(2,0,0),首先会比较两者的zxid,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2,0,0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
        8. 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2,0,)的投票信息,此时便认为已经选出了Leader。
        9. 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING
      • 运行过程中的leader选举 。当集群中的leader服务器出现宕机或者不可用的情况时,那么整个集群将无法对外提供服务,而是进入新一轮的Leader选举,服务器运行期间的Leader选举和启动时期的Leader选举基本过程是一致的。
        1. 变更状态。Leader挂后,余下的非Observer服务器都会将自己的服务器状态变更为LOOKING,然后开始进入Leader选举过程。
        2. 每个Server会发出一个投票。在运行期间,每个服务器上的ZXID可能不同,此时假定Server1的ZXID为123,Server3的ZXID为122;在第一轮投票中,Server1和Server3都会投自己,产生投票(1,123,1),(3,122,1),然后各自将投票发送给集群中所有机器。接收来自各个服务器的投票。与启动时过程相同。
        3. 处理投票。与启动时过程相同,此时,Server1将会成为Leader。
        4. 统计投票。与启动时过程相同。
        5. 改变服务器的状态。与启动时过程相同

心中无我,眼中无钱,念中无他,朝中无人,学无止境

纸上得来终觉浅,绝知此事要躬行

image/svg+xml