纯净、安全、绿色的下载网站

首页|软件分类|下载排行|最新软件|IT学院

当前位置:首页IT学院IT技术

C# 实现Zookeeper分布式锁 C# 实现Zookeeper分布式锁的参考示例

没有星星的夏季   2021-06-24 我要评论
想了解C# 实现Zookeeper分布式锁的参考示例的相关内容吗没有星星的夏季在本文为您仔细讲解C# 实现Zookeeper分布式锁的相关知识和一些Code实例欢迎阅读和指正我们先划重点:C#,实现分布式锁,c#,Zookeeper下面大家一起来学习吧

  分布式锁 

  互联网初期我们系统一般都是单点部署也就是在一台服务器完成系统的部署后期随着用户量的增加服务器的压力也越来越大响应速度越来越慢甚至出现服务器崩溃的情况

  为解决服务器压力太大响应慢的特点分布式系统部署出现了

  简单的说就是我们将系统资源部署到多台服务器中然后使用一台服务器做入口代理根据一些决策将接收到的请求转发到资源服务器这也就是我们常说的 反向代理(一般就是使用nginx)

   虽然分布式解决了服务器压力的问题但也带来了新的问题

  比如我们有一个下单统计的功能当完成下单后需要执行统计功能而在高访问的情况下可能有两个下单请求(A和B)同时完成然后一起执行了统计功能这样可能导致的结果就是A请求未将B请求数据统计在内而B请求可能也未将A请求数据统计在内这样就造成了数据的统计错这个问题的产生的根本原因就是统计功能的并发导致的如果是单点部署的系统我们简单的使用一个锁操作就能完成了但是在分布式环境下A和B请求可能同时运行在两个服务器中普通的锁就不能起到效果了这个时候就要使用分布式锁了 

  Zookeeper分布式锁原理

  分布式锁的实现发放有多种简单的我们可以使用数据库表去实现它也可以使用redis去实现它这里要使用的Zookeeper去实现分布式锁

  Zookeeper分布式锁的原理是巧妙的是使用了znode临时节点的特点和监听(watcher)机制监听机制很简单就是我们可以给znode添加一个监听器当znode节点状态发生改变时(如:数据内容改变节点被删除)会通知到监听器

  前面几节介绍过znode有三种类型  

  PERSISTENT:持久节点即使在创建该特定znode的客户端断开连接后持久节点仍然存在默认情况下除非另有说明否则所有znode都是持久的
  EPHEMERAL:临时节点客户端是连接状态时临时节点就是有效的当客户端与ZooKeeper集合断开连接时临时节点会自动删除临时节点不允许有子节点临时节点在leader选举中起着重要作用
  SEQUENTIAL:顺序节点可以是持久的或临时的当一个新的znode被创建为一个顺序节点时ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径顺序节点在锁定和同步中起重要作用

  其中顺序节点可以是持久的或临时的而临时节点有个特点就是它属于创建它的那个会话当会话断开临时节点就会自动删除如果在临时节点上注册了监听器那么监听器就会收到通知如果临时节点有了时间顺序那我们为实现分布式锁就又有一个想法:

  假如在Zookeeper中有一个znode节点/Locker

  1、当client1连接Zookeeper时先判断/Locker节点是否存在子节点如果没有子节点那么会在/Locker节点下创建一个临时顺序的znode节点假如是/client1表示client1获取了锁状态client1可以继续执行

  2、当client2连接Zookeeper时先判断/Locker节点是否存在子节点发现已经存在子节点了然后获取/Locker下的所有子节点同时按时间顺序排序在最后一个节点也就是/client1节点上注册一个监听器(watcher1)同时在/Locker节点下创建一个临时顺序的znode节点假如是/client2同时client2将被阻塞而阻塞状态的释放是在监听器(watcher1)中的

  3、当client3连接Zookeeper时先判断/Locker节点是否存在子节点发现已经存在子节点了然后获取/Locker下的所有子节点同时按时间顺序排序在最后一个节点也就是/client2节点上注册一个监听器(watcher2)同时在/Locker节点下创建一个临时顺序的znode节点假如是/client3同时client2将被阻塞而阻塞状态的释放是在监听器(watcher2)中的

  以此类推

  4、当client1执行完操作了断开Zookeeper的连接因为/client1是临时顺序节点于是将会自动删除而client2已经往/client1节点中注册了一个监听器(watcher1),于是watcher1将会受到通知而watcher1又会释放client2的阻塞状态于是client2获取锁状态继续执行

  5、当client2执行完操作了断开Zookeeper的连接因为/client2是临时顺序节点于是将会自动删除而client3已经往/client2节点中注册了一个监听器(watcher2),于是watcher2将会受到通知而watcher2又会释放client3的阻塞状态于是client3获取锁状态继续执行

  以此类推

  这样不管分布式环境中有几台服务器都可以保证程序的排队似的执行了

  C#实现Zookeeper分布式锁

  上一节有封装过一个ZookeeperHelper的辅助类(Zookeeper基础教程(四):C#连接使用Zookeeper)使用这个辅助类实现了一个ZookeeperLocker类: 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AspNetCore.ZookeeperConsole
{
    /// <summary>
    /// 基于Zookeeper的分布式锁
    /// </summary>
    public class ZookeeperLocker : IDisposable
    {
        /// <summary>
        /// 单点锁
        /// </summary>
        static object locker = new object();
        /// <summary>
        /// Zookeeper集群地址
        /// </summary>
        string[] address;
        /// <summary>
        /// Zookeeper操作辅助类
        /// </summary>
        ZookeeperHelper zookeeperHelper;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="lockerPath">分布式锁的根路径</param>
        /// <param name="address">集群地址</param>
        public ZookeeperLocker(string lockerPath, params string[] address) : this(lockerPath, 0, address)
        {
        }
        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="lockerPath">分布式锁的根路径</param>
        /// <param name="sessionTimeout">回话过期时间</param>
        /// <param name="address">集群地址</param>
        public ZookeeperLocker(string lockerPath, int sessionTimeout, params string[] address)
        {
            this.address = address.ToArray();

            zookeeperHelper = new ZookeeperHelper(address, lockerPath);
            if (sessionTimeout > 0)
            {
                zookeeperHelper.SessionTimeout = sessionTimeout;
            }
            if (!zookeeperHelper.Connect())
            {
                throw new Exception("connect failed:" + string.Join(",", address));
            }
            lock (locker)
            {
                if (!zookeeperHelper.Exists())//根节点不存在则创建
                {
                    zookeeperHelper.SetData("", "", true);
                }
            }
        }
        /// <summary>
        /// 生成一个锁
        /// </summary>
        /// <returns>返回锁名</returns>
        public string CreateLock()
        {
            var path = Guid.NewGuid().ToString().Replace("-", "");
            while (zookeeperHelper.Exists(path))
            {
                path = Guid.NewGuid().ToString().Replace("-", "");
            }
            return CreateLock(path);
        }
        /// <summary>
        /// 使用指定的路径名称设置锁
        /// </summary>
        /// <param name="path">锁名不能包含路径分隔符(/)</param>
        /// <returns>返回锁名</returns>
        public string CreateLock(string path)
        {
            if (path.Contains("/"))
            {
                throw new ArgumentException("invalid path");
            }
            return zookeeperHelper.SetData(path, "", false, true);
        }
        /// <summary>
        /// 获取锁
        /// </summary>
        /// <param name="path">锁名</param>
        /// <returns>如果获得锁返回true否则一直等待</returns>
        public bool Lock(string path)
        {
            return LockAsync(path).GetAwaiter().GetResult();
        }
        /// <summary>
        /// 获取锁
        /// </summary>
        /// <param name="path">锁名</param>
        /// <param name="millisecondsTimeout">超时时间单位:毫秒</param>
        /// <returns>如果获得锁返回true否则等待指定时间后返回false</returns>
        public bool Lock(string path, int millisecondsTimeout)
        {
            return LockAsync(path, millisecondsTimeout).GetAwaiter().GetResult();
        }
        /// <summary>
        /// 异步获取锁等等
        /// </summary>
        /// <param name="path">锁名</param>
        /// <returns>如果获得锁返回true否则一直等待</returns>
        public async Task<bool> LockAsync(string path)
        {
            return await LockAsync(path, System.Threading.Timeout.Infinite);
        }
        /// <summary>
        /// 异步获取锁等等
        /// </summary>
        /// <param name="path">锁名</param>
        /// <param name="millisecondsTimeout">超时时间单位:毫秒</param>
        /// <returns>如果获得锁返回true否则等待指定时间后返回false</returns>
        public async Task<bool> LockAsync(string path, int millisecondsTimeout)
        {
            var array = await zookeeperHelper.GetChildrenAsync("", true);
            if (array != null && array.Length > 0)
            {
                var first = array.FirstOrDefault();
                if (first == path)//正好是优先级最高的则获得锁
                {
                    return true;
                }

                var index = array.ToList().IndexOf(path);
                if (index > 0)
                {
                    //否则添加监听
                    var are = new AutoResetEvent(false);
                    var watcher = new NodeWatcher();
                    watcher.NodeDeleted += (ze) =>
                    {
                        are.Set();
                    };
                    if (await zookeeperHelper.WatchAsync(array[index - 1], watcher))//监听顺序节点中的前一个节点
                    {
                        if (!are.WaitOne(millisecondsTimeout))
                        {
                            return false;
                        }
                    }

                    are.Dispose();
                }
                else
                {
                    throw new InvalidOperationException($"no locker found in path:{zookeeperHelper.CurrentPath}");
                }
            }
            return true;
        }
        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            zookeeperHelper.Dispose();
        }
    }
}

  现在写个程序可以模拟一下

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace AspNetCore.ZookeeperConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            //Zookeeper连接字符串采用host:port格式多个地址之间使用逗号(,)隔开
            string[] address = new string[] { "192.168.209.133:2181", "192.168.209.133:2181", "192.168.209.133:2181" };
            //会话超时时间,单位毫秒
            int sessionTimeOut = 10000;
            //锁节点根路径
            string lockerPath = "/Locker";

            for (var i = 0; i < 10; i++)
            {
                string client = "client" + i;
                //多线程模拟并发
                new Thread(() =>
                {
                    using (ZookeeperLocker zookeeperLocker = new ZookeeperLocker(lockerPath, sessionTimeOut, address))
                    {
                        string path = zookeeperLocker.CreateLock();
                        if (zookeeperLocker.Lock(path))
                        {
                            //模拟处理过程
                            Console.WriteLine($"【{client}】获得锁:{DateTime.Now}");
                            Thread.Sleep(3000);
                            Console.WriteLine($"【{client}】处理完成:{DateTime.Now}");
                        }
                        else
                        {
                            Console.WriteLine($"【{client}】获得锁失败:{DateTime.Now}");
                        }
                    }
                }).Start();
            }
                        
            Console.ReadKey();
        }
    }
}

  运行结果如下:

   

   可以发现锁功能是实现了的

  如果程序运行中打印日志:Client session timed out, have not heard from server in 8853ms for sessionid 0x1000000ec5500b2

  或者直接抛出异常:org.apache.zookeeper.KeeperException.ConnectionLossException:“Exception_WasThrown”

  只需要适当调整sessionTimeOut时间即可


相关文章

猜您喜欢

  • python视频传输 python实现不同电脑之间视频传输功能

    想了解python实现不同电脑之间视频传输功能的相关内容吗景唯acr在本文为您仔细讲解python视频传输的相关知识和一些Code实例欢迎阅读和指正我们先划重点:python视频传输,python电脑之间视频传输下面大家一起来学习吧..
  • java销售系统 java实现鲜花销售系统

    想了解java实现鲜花销售系统的相关内容吗司天宏在本文为您仔细讲解java销售系统的相关知识和一些Code实例欢迎阅读和指正我们先划重点:java鲜花销售系统,java鲜花销售,java销售系统下面大家一起来学习吧..

网友评论

Copyright 2020 www.gamerfx.net 【游戏天空】 版权所有 软件发布

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 点此查看联系方式