随心一记

一二三四五,上山打老鼠


  • 首页

  • 归档

  • 标签
ywcsb

ywcsb

游戏可以不玩,小说不能不看。

153 日志
3 分类
42 标签
RSS
GitHub 知乎 随心一记
Links
  • 随心一记
  • 追梦人物的
  • MSDN

Python基于Redis实现分布式锁

发表于 2022-05-10 | 阅读 2253 | 分类于 Python |

分布式锁应该具备的条件:

  • 在分布式系统环境下,任意时刻,只能有一个客户端能持有锁;
  • 高可用/高性能的获取锁和释放锁,必须保证加锁和解锁是同一个客户端所为;
  • 具备锁失效机制(防止死锁),即使有一个客户端在持有锁期间崩溃而没有主动释放锁,也能保证后续其他客户端能正常加锁;
  • 具备非堵塞特性,即没有获取到锁,直接返回获取锁失败。

示例代码:

class RedisLock:
    """
    分布式锁
    """

    def __init__(self, redis_conn, Django=False):
        """

        @param redis_conn: redis 实例对象
        @param Django: 是否是django,如果是django框架就使用django自带缓存
        @type Django: bool
        """
        self.redis_conn = redis_conn
        self.ip = socket.gethostbyname(socket.gethostname())
        self.pid = os.getpid()
        self.sentinel = object()
        self.django = Django

    @staticmethod
    def get_lock_key(key: str) -> str:
        """
        格式化锁名称
        @param key: 名称
        @type key: str
        @return: key 名称
        @rtype: str
        """
        lock_key = f'lock_{key}'
        return lock_key

    def gen_unique_value(self) -> str:
        """
        获取锁value使锁有唯一性,防止其它线程误删
        @return:
        @rtype:
        """
        thread_name = threading.current_thread().name
        time_now = time.time()
        unique_value = f'{self.ip}-{self.pid}-{thread_name}-{time_now}'
        return unique_value

    def get_lock(self, key, timeout: int = 100) -> str:
        """
        获取锁
        @param key: 锁名
        @type key: str
        @param timeout: 锁过期时间,防止死锁
        @type timeout: int
        @return:
        @rtype: str
        """

        lock_key = self.get_lock_key(key)
        unique_value = self.gen_unique_value()
        while True:
            if self.django:
                judge = self.redis_conn.add(lock_key, unique_value, timeout)
            else:
                judge = self.redis_conn.set(lock_key, unique_value, nx=True, ex=timeout)
            if judge:
                return unique_value
            time.sleep(0.001)


    def del_lock(self, key, value):
        # 释放锁
        lock_key = self.get_lock_key(key)
        old_lock_value = self.redis_conn.get(lock_key)
        if old_lock_value == value:
            return self.redis_conn.delete(lock_key)

加锁过程

  1. 首先需要为锁生成一个唯一标识value;
  2. 然后使用redis set 命令设置锁,从 v2.6.12 版本开始,set命令支持nx和ex参数,具体内容可点击进行查看;如果锁之前不存在,则加锁成功,并设置锁的过期时间,返回锁唯一标识;

解锁过程

  1. 查询锁对应的标识是否与本次解锁的标识相同;
  2. 如果标识相同,则在事务中删除锁。

下面对刚才实现的分布式锁进行测试,使用50个线程,模拟秒杀10张票,从结果的有序性可以看出是否为加锁状态,代码如下:

from threading import Thread

import redis



count = 10


def ticket(i, lock, key):
    lock_value = lock.get_lock(key, 10)
    print(f"线程{i}--获得了锁")
    time.sleep(1)
    global count
    if count < 1:
        print(f"线程{i}没抢到票, 票已经抢完了")
        return
    count -= 1
    print(f"线程{i}抢到票了, 还剩{count}张票")
    lock.del_lock(key, lock_value)
    print(f"线程{i}--释放了锁")

if __name__ == '__main__':
    # Redis 存字符串返回的是byte,指定decode_responses=True可以解决
    pool = redis.ConnectionPool(host="127.0.0.1", port=6379, socket_connect_timeout=3, decode_responses=True)
    _redis = redis.Redis(connection_pool=pool)
    lock = RedisLock(_redis)
    for i in range(10):
        t = Thread(target=ticket, args=(i, lock, 'test_key'))
        t.start()

输出如下:

线程1--获得了锁
线程1抢到票了, 还剩4张票
线程1--释放了锁
线程2--获得了锁
线程4--获得了锁
线程3--获得了锁 
线程0--获得了锁 
线程6--获得了锁 
线程7--获得了锁 
线程8--获得了锁 
线程5--获得了锁 
线程9--获得了锁 
线程11--获得了锁
线程10--获得了锁
线程13--获得了锁
线程2抢到票了, 还剩3张票
线程2--释放了锁
线程14--获得了锁
线程12--获得了锁
线程15--获得了锁
线程16--获得了锁
线程17--获得了锁
线程19--获得了锁
线程18--获得了锁
线程4抢到票了, 还剩2张票
线程4--释放了锁
线程3抢到票了, 还剩1张票
线程0抢到票了, 还剩0张票
线程8没抢到票, 票已经抢完了
线程6没抢到票, 票已经抢完了
线程7没抢到票, 票已经抢完了
线程3--释放了锁
线程5没抢到票, 票已经抢完了
线程0--释放了锁
线程9没抢到票, 票已经抢完了
线程11没抢到票, 票已经抢完了
线程10没抢到票, 票已经抢完了
线程13没抢到票, 票已经抢完了
线程14没抢到票, 票已经抢完了
线程12没抢到票, 票已经抢完了
线程15没抢到票, 票已经抢完了
线程16没抢到票, 票已经抢完了
线程17没抢到票, 票已经抢完了
线程19没抢到票, 票已经抢完了
线程18没抢到票, 票已经抢完了

装饰器

def redis_lock(key: str, timeout: int = 10, redis_conn: object = cache, Django=Ture):
    """
    @param key: 锁名称
    @type key: str
    @param timeout: 锁过期时间
    @type timeout: int 
    @param redis_conn: Redis实例
    @type Django bool
    @param Django 是否是django框架缓存 默认使用redis缓存
    """
    lock = RedisLock(redis_conn, Django=Django)
    def run_lock(func):
        def wrapper(*args, **kwargs):
            thread_name = threading.current_thread().name
            logger.info(f"{thread_name} 线程, {key} key,正在获取锁")
            lock_value = lock.get_lock(key, timeout)
            logger.info(f"{thread_name} 线程, {key} key,已获取锁")
            res = func(*args, **kwargs)
            lock.del_lock(key, lock_value)
            logger.info(f'{thread_name} 线程, {key} key,释放锁')
            return res

        return wrapper

    return run_lock



  ### 使用装饰器
@redis_lock(key='test', timeout=15)
def increase_data():
    # lock = RedisLock(cache)
    # lock_value = lock.get_lock('test')
    time.sleep(10)
    # lock.del_lock('test', lock_value)
觉得不错,支持一下!
geerniya WeChat Pay

微信打赏

geerniya Alipay

支付宝打赏

# Python # Redis
kubernetes node节点失效 快速重新调度
java CPU占用过高问题的排查及解决

发表评论

共 0 条评论

    暂无评论
© 2018 - 2022 ywcsb
冀ICP备17022045号-1
Supported by 腾讯云