IT虾米网

Java分布式锁三种实现方案

leader 2022年05月06日 编程语言 160 0

方案一:数据库乐观锁

乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。

异常实现流程

-- 可能会发生的异常情况 
-- 线程1查询,当前left_count为1,则有记录 
select * from t_bonus where id = 10001 and left_count > 0 
 
-- 线程2查询,当前left_count为1,也有记录 
select * from t_bonus where id = 10001 and left_count > 0 
 
-- 线程1完成领取记录,修改left_count为0, 
update t_bonus set left_count = left_count - 1 where id = 10001 
 
-- 线程2完成领取记录,修改left_count为-1,产生脏数据 
update t_bonus set left_count = left_count - 1 where id = 10001

通过乐观锁实现

-- 添加版本号控制字段 
ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus; 
 
-- 线程1查询,当前left_count为1,则有记录,当前版本号为1234 
select left_count, version from t_bonus where id = 10001 and left_count > 0 
 
-- 线程2查询,当前left_count为1,有记录,当前版本号为1234 
select left_count, version from t_bonus where id = 10001 and left_count > 0 
 
-- 线程1,更新完成后当前的version为1235,update状态为1,更新成功 
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234 
 
-- 线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理 
update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

方案二:基于Redis的分布式锁

SETNX命令(SET if Not eXists)
语法:SETNX key value
功能:原子性操作,当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。
Expire命令
语法:expire(key, expireTime)
功能:key设置过期时间
GETSET命令
语法:GETSET key value
功能:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。
GET命令
语法:GET key
功能:返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。
DEL命令
语法:DEL key [KEY …]
功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。

第一种:使用redis的setnx()、expire()方法,用于分布式锁

  1. setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功
  2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。
  3. 执行完业务代码后,可以通过delete命令删除key。

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题

第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题

  1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
  2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
  3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
  4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
  5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。
import cn.com.tpig.cache.redis.RedisService; 
import cn.com.tpig.utils.SpringUtils; 
 
/** 
 * Created by IDEA 
 * User: shma1664 
 * Date: 2016-08-16 14:01 
 * Desc: redis分布式锁 
 */ 
public final class RedisLockUtil { 
 
    private static final int defaultExpire = 60; 
 
    private RedisLockUtil() { 
        // 
    } 
 
    /** 
     * 加锁 
     * @param key redis key 
     * @param expire 过期时间,单位秒 
     * @return true:加锁成功,false,加锁失败 
     */ 
    public static boolean lock(String key, int expire) { 
 
        RedisService redisService = SpringUtils.getBean(RedisService.class); 
        long status = redisService.setnx(key, "1"); 
 
        if(status == 1) { 
            redisService.expire(key, expire); 
            return true; 
        } 
 
        return false; 
    } 
 
    public static boolean lock(String key) { 
        return lock2(key, defaultExpire); 
    } 
 
    /** 
     * 加锁 
     * @param key redis key 
     * @param expire 过期时间,单位秒 
     * @return true:加锁成功,false,加锁失败 
     */ 
    public static boolean lock2(String key, int expire) { 
 
        RedisService redisService = SpringUtils.getBean(RedisService.class); 
 
        long value = System.currentTimeMillis() + expire; 
        long status = redisService.setnx(key, String.valueOf(value)); 
 
        if(status == 1) { 
            return true; 
        } 
        long oldExpireTime = Long.parseLong(redisService.get(key, "0")); 
        if(oldExpireTime < System.currentTimeMillis()) { 
            //超时 
            long newExpireTime = System.currentTimeMillis() + expire; 
            long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime))); 
            if(currentExpireTime == oldExpireTime) { 
                return true; 
            } 
        } 
        return false; 
    } 
 
    public static void unLock1(String key) { 
        RedisService redisService = SpringUtils.getBean(RedisService.class); 
        redisService.del(key); 
    } 
 
    public static void unLock2(String key) {     
        RedisService redisService = SpringUtils.getBean(RedisService.class);     
        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));    
        if(oldExpireTime > System.currentTimeMillis()) {         
            redisService.del(key);     
        } 
   } 
 
} 
 
public void drawRedPacket(long userId) { 
    String key = "draw.redpacket.userid:" + userId; 
 
    boolean lock = RedisLockUtil.lock2(key, 60); 
    if(lock) { 
        try { 
            //领取操作 
        } finally { 
            //释放锁 
            RedisLockUtil.unLock(key); 
        } 
    } else { 
        new RuntimeException("重复领取奖励"); 
    } 
}

Spring AOP基于注解方式和SpEL实现开箱即用的redis分布式锁策略

import java.lang.annotation.ElementType; 
import java.lang.annotation.Retention; 
import java.lang.annotation.RetentionPolicy; 
import java.lang.annotation.Target; 
 
/** 
 * RUNTIME 
 * 定义注解 
 * 编译器将把注释记录在类文件中,在运行时 VM 将保留注释,因此可以反射性地读取。 
 * @author shma1664 
 * 
 */ 
@Retention(RetentionPolicy.RUNTIME) 
@Target(ElementType.METHOD) 
public @interface RedisLockable { 
 
 
    String[] key() default ""; 
 
    long expiration() default 60; 
} 
import javax.annotation.Resource; 
 
import java.lang.reflect.Method; 
 
import com.autohome.api.dealer.util.cache.RedisClient; 
import com.google.common.base.Joiner; 
import org.aspectj.lang.ProceedingJoinPoint; 
import org.aspectj.lang.Signature; 
import org.aspectj.lang.annotation.Around; 
import org.aspectj.lang.annotation.Aspect; 
import org.aspectj.lang.annotation.Pointcut; 
import org.aspectj.lang.reflect.MethodSignature; 
import org.springframework.expression.EvaluationContext; 
import org.springframework.expression.Expression; 
import org.springframework.expression.ExpressionParser; 
import org.springframework.expression.spel.standard.SpelExpressionParser; 
import org.springframework.expression.spel.support.StandardEvaluationContext; 
import org.springframework.stereotype.Component; 
 
/** 
 * Created by IDEA 
 * User: mashaohua 
 * Date: 2016-09-28 18:08 
 * Desc: 
 */ 
@Aspect 
@Component 
public class RedisLockAop { 
 
    @Resource 
    private RedisClient redisClient; 
 
    @Pointcut("execution(* com.autohome.api.dealer.tuan.service.*.*(..))") 
    public void pointcut(){} 
 
    @Around("pointcut()") 
    public Object doAround(ProceedingJoinPoint point) throws Throwable{ 
        Signature signature = point.getSignature(); 
        MethodSignature methodSignature = (MethodSignature) signature; 
        Method method = methodSignature.getMethod(); 
        String targetName = point.getTarget().getClass().getName(); 
        String methodName = point.getSignature().getName(); 
        Object[] arguments = point.getArgs(); 
 
        if (method != null && method.isAnnotationPresent(RedisLockable.class)) { 
            RedisLockable redisLock = method.getAnnotation(RedisLockable.class); 
            long expire = redisLock.expiration(); 
            String redisKey = getLockKey(targetName, methodName, redisLock.key(), arguments); 
            boolean isLock = RedisLockUtil.lock2(redisKey, expire); 
            if(!isLock) { 
                try { 
                    return point.proceed(); 
                } finally { 
                    unLock2(redisKey); 
                } 
            } else { 
                throw new RuntimeException("您的操作太频繁,请稍后再试"); 
            } 
        } 
 
        return point.proceed(); 
    } 
 
    private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) { 
 
        StringBuilder sb = new StringBuilder(); 
        sb.append("lock.").append(targetName).append(".").append(methodName); 
 
        if(keys != null) { 
            String keyStr = Joiner.on(".").skipNulls().join(keys); 
            String[] parameters = ReflectParamNames.getNames(targetName, methodName); 
            ExpressionParser parser = new SpelExpressionParser(); 
            Expression expression = parser.parseExpression(keyStr); 
            EvaluationContext context = new StandardEvaluationContext(); 
            int length = parameters.length; 
            if (length > 0) { 
                for (int i = 0; i < length; i++) { 
                    context.setVariable(parameters[i], arguments[i]); 
                } 
            } 
            String keysValue = expression.getValue(context, String.class); 
            sb.append("#").append(keysValue); 
        } 
        return sb.toString(); 
    }
<!-- https://mvnrepository.com/artifact/javassist/javassist --> 
<dependency> 
    <groupId>org.javassist</groupId> 
    <artifactId>javassist</artifactId> 
    <version>3.18.1-GA</version> 
</dependency>
import javassist.*; 
import javassist.bytecode.CodeAttribute; 
import javassist.bytecode.LocalVariableAttribute; 
import javassist.bytecode.MethodInfo; 
import org.apache.log4j.Logger; 
 
/** 
 * Created by IDEA 
 * User: mashaohua 
 * Date: 2016-09-28 18:39 
 * Desc: 
 */ 
public class ReflectParamNames { 
    private static Logger log = Logger.getLogger(ReflectParamNames.class); 
    private  static ClassPool pool = ClassPool.getDefault(); 
 
    static{ 
        ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class); 
        pool.insertClassPath(classPath); 
    } 
 
    public static String[] getNames(String className,String methodName) { 
        CtClass cc = null; 
        try { 
            cc = pool.get(className); 
            CtMethod cm = cc.getDeclaredMethod(methodName); 
            // 使用javaassist的反射方法获取方法的参数名 
            MethodInfo methodInfo = cm.getMethodInfo(); 
            CodeAttribute codeAttribute = methodInfo.getCodeAttribute(); 
            LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag); 
            if (attr == null) return new String[0]; 
 
            int begin = 0; 
 
            String[] paramNames = new String[cm.getParameterTypes().length]; 
            int count = 0; 
            int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1; 
 
            for (int i = 0; i < attr.tableLength(); i++){ 
                //  为什么 加这个判断,发现在windows 跟linux执行时,参数顺序不一致,通过观察,实际的参数是从this后面开始的 
                if (attr.variableName(i).equals("this")){ 
                    begin = i; 
                    break; 
                } 
            } 
 
            for (int i = begin+1; i <= begin+paramNames.length; i++){ 
                paramNames[count] = attr.variableName(i); 
                count++; 
            } 
            return paramNames; 
        } catch (Exception e) { 
            e.printStackTrace(); 
        }finally{ 
            try { 
                if(cc != null) cc.detach(); 
            } catch (Exception e2) { 
                log.error(e2.getMessage()); 
            } 
 
 
        } 
        return new String[0]; 
    } 
}
 
 
在需要使用分布式锁的地方添加注解
/** 
 * 抽奖接口 
 * 添加redis分布式锁保证一个订单只有一个请求处理,防止用户刷礼物,支持SpEL表达式 
 * redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId 
 * @param orderId 订单id 
 * @return 抽中的奖品信息 
 */ 
@RedisLockable(key = {"#orderId"}, expiration = 120) 
@Override 
public BonusConvertBean drawBonus(Integer orderId) throws BonusException{ 
    // 业务逻辑 
}

第三种方案:基于Zookeeper的分布式锁

利用节点名称的唯一性来实现独占锁

ZooKeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。

package com.shma.example.zookeeper.lock; 
 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.Lock; 
 
import org.apache.zookeeper.*; 
import org.apache.zookeeper.data.Stat; 
 
/** 
 * Created by IDEA 
 * User: mashaohua 
 * Date: 2016-09-30 16:09 
 * Desc: 
 */ 
public class ZookeeperLock implements Lock, Watcher { 
    private ZooKeeper zk; 
    private String root = "/locks";//根 
    private String lockName;//竞争资源的标志 
    private String myZnode;//当前锁 
    private int sessionTimeout = 30000; 
    private List<Exception> exception = new ArrayList<Exception>(); 
 
    /** 
     * 创建分布式锁,使用前请确认config配置的zookeeper服务可用 
     * @param config 127.0.0.1:2181 
     * @param lockName 竞争资源标志,lockName中不能包含单词lock 
     */ 
    public ZookeeperLock(String config, String lockName){ 
        this.lockName = lockName; 
        // 创建一个与服务器的连接 
        try { 
            zk = new ZooKeeper(config, sessionTimeout, this); 
            Stat stat = zk.exists(root, false); 
            if(stat == null){ 
                // 创建根节点 
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 
            } 
        } catch (IOException e) { 
            exception.add(e); 
        } catch (KeeperException e) { 
            exception.add(e); 
        } catch (InterruptedException e) { 
            exception.add(e); 
        } 
    } 
 
    @Override 
    public void lock() { 
        if(exception.size() > 0){ 
            throw new LockException(exception.get(0)); 
        } 
        if(!tryLock()) { 
            throw new LockException("您的操作太频繁,请稍后再试"); 
        } 
    } 
 
    @Override 
    public void lockInterruptibly() throws InterruptedException { 
        this.lock(); 
    } 
 
    @Override 
    public boolean tryLock() { 
        try { 
            myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 
            return true; 
        } catch (KeeperException e) { 
            e.printStackTrace(); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
        return false; 
    } 
 
    @Override 
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 
        return tryLock(); 
    } 
 
    @Override 
    public void unlock() { 
        try { 
            zk.delete(myZnode, -1); 
            myZnode = null; 
            zk.close(); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } catch (KeeperException e) { 
            e.printStackTrace(); 
        } 
    } 
 
    @Override 
    public Condition newCondition() { 
        return null; 
    } 
 
    @Override 
    public void process(WatchedEvent watchedEvent) { 
        // 
    } 
 
} 
 
ZookeeperLock lock = null; 
try { 
    lock = new ZookeeperLock("127.0.0.1:2182","test1"); 
    lock.lock(); 
    //业务逻辑处理 
} catch (LockException e) { 
    throw e; 
} finally { 
    if(lock != null) 
        lock.unlock(); 
}

利用临时顺序节点控制时序实现

/lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。
算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。
对于解锁操作,只需要将自身创建的节点删除即可。

package com.shma.example.zookeeper.lock; 
 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.Lock; 
 
import org.apache.zookeeper.CreateMode; 
import org.apache.zookeeper.KeeperException; 
import org.apache.zookeeper.WatchedEvent; 
import org.apache.zookeeper.Watcher; 
import org.apache.zookeeper.ZooDefs; 
import org.apache.zookeeper.ZooKeeper; 
import org.apache.zookeeper.data.Stat; 
 
/** 
 * Created by IDEA 
 * User: mashaohua 
 * Date: 2016-09-30 16:09 
 * Desc: 
 */ 
public class DistributedLock implements Lock, Watcher{ 
    private ZooKeeper zk; 
    private String root = "/locks";//根 
    private String lockName;//竞争资源的标志 
    private String waitNode;//等待前一个锁 
    private String myZnode;//当前锁 
    private CountDownLatch latch;//计数器 
    private int sessionTimeout = 30000; 
    private List<Exception> exception = new ArrayList<Exception>(); 
 
    /** 
     * 创建分布式锁,使用前请确认config配置的zookeeper服务可用 
     * @param config 127.0.0.1:2181 
     * @param lockName 竞争资源标志,lockName中不能包含单词lock 
     */ 
    public DistributedLock(String config, String lockName){ 
        this.lockName = lockName; 
        // 创建一个与服务器的连接 
        try { 
            zk = new ZooKeeper(config, sessionTimeout, this); 
            Stat stat = zk.exists(root, false); 
            if(stat == null){ 
                // 创建根节点 
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
            } 
        } catch (IOException e) { 
            exception.add(e); 
        } catch (KeeperException e) { 
            exception.add(e); 
        } catch (InterruptedException e) { 
            exception.add(e); 
        } 
    } 
 
    /** 
     * zookeeper节点的监视器 
     */ 
    public void process(WatchedEvent event) { 
        if(this.latch != null) { 
            this.latch.countDown(); 
        } 
    } 
 
    public void lock() { 
        if(exception.size() > 0){ 
            throw new LockException(exception.get(0)); 
        } 
        try { 
            if(this.tryLock()){ 
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); 
                return; 
            } 
            else{ 
                waitForLock(waitNode, sessionTimeout);//等待锁 
            } 
        } catch (KeeperException e) { 
            throw new LockException(e); 
        } catch (InterruptedException e) { 
            throw new LockException(e); 
        } 
    } 
 
    public boolean tryLock() { 
        try { 
            String splitStr = "_lock_"; 
            if(lockName.contains(splitStr)) 
                throw new LockException("lockName can not contains \\u000B"); 
            //创建临时子节点 
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); 
            System.out.println(myZnode + " is created "); 
            //取出所有子节点 
            List<String> subNodes = zk.getChildren(root, false); 
            //取出所有lockName的锁 
            List<String> lockObjNodes = new ArrayList<String>(); 
            for (String node : subNodes) { 
                String _node = node.split(splitStr)[0]; 
                if(_node.equals(lockName)){ 
                    lockObjNodes.add(node); 
                } 
            } 
            Collections.sort(lockObjNodes); 
            System.out.println(myZnode + "==" + lockObjNodes.get(0)); 
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ 
                //如果是最小的节点,则表示取得锁 
                return true; 
            } 
            //如果不是最小的节点,找到比自己小1的节点 
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); 
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); 
        } catch (KeeperException e) { 
            throw new LockException(e); 
        } catch (InterruptedException e) { 
            throw new LockException(e); 
        } 
        return false; 
    } 
 
    public boolean tryLock(long time, TimeUnit unit) { 
        try { 
            if(this.tryLock()){ 
                return true; 
            } 
            return waitForLock(waitNode,time); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
        return false; 
    } 
 
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { 
        Stat stat = zk.exists(root + "/" + lower,true); 
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 
        if(stat != null){ 
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); 
            this.latch = new CountDownLatch(1); 
            this.latch.await(waitTime, TimeUnit.MILLISECONDS); 
            this.latch = null; 
        } 
        return true; 
    } 
 
    public void unlock() { 
        try { 
            System.out.println("unlock " + myZnode); 
            zk.delete(myZnode,-1); 
            myZnode = null; 
            zk.close(); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } catch (KeeperException e) { 
            e.printStackTrace(); 
        } 
    } 
 
    public void lockInterruptibly() throws InterruptedException { 
        this.lock(); 
    } 
 
    public Condition newCondition() { 
        return null; 
    } 
 
    public class LockException extends RuntimeException { 
        private static final long serialVersionUID = 1L; 
        public LockException(String e){ 
            super(e); 
        } 
        public LockException(Exception e){ 
            super(e); 
        } 
    } 
 
}

原文地址:Java分布式锁三种实现方案 - steven.木子 - 博客园


评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

深入理解TransactionTemplate编程式事务