您好,欢迎来到华佗小知识。
搜索
您的当前位置:首页​【五一创作】基于mysql关系型实现分布式锁

​【五一创作】基于mysql关系型实现分布式锁

来源:华佗小知识


0.写在前面

在多线程高并发场景下,为了保证资源的线程安全问题,jdk为我们提供了synchronized关键字和
ReentrantLock可重入锁,但是它们只能保证一个jvm内的线程安全。在分布式集群、微服务、云原生横行的当下,如何保证不同进程、不同服务、不同机器的线程安全问题,jdk并没有给我们提供既有的解决方案。此时,我们就必须借助于相关技术手动实现了。目前主流的实现有三种方式:
1. 基于mysql关系型实现
2. 基于redis非关系型数据实现
3. 基于zookeeper实现

这篇文章主要讲解的是基于基于mysql关系型实现分布式锁

1. 从减库存聊起

库存在并发量较大情况下很容易发生超卖现象,一旦发生超卖现象,就会出现多成交了订单而发不了货的情况。

场景:
        商品S库存余量为5时,用户A和B同时来购买一个商品S,此时查询库存数都为5,库存充足则开始减库存:
用户A:update db_stock set stock = stock - 1 where id = 1
用户B:update db_stock set stock = stock - 1 where id = 1
并发情况下,更新后的结果可能是4,而实际的最终库存量应该是3才对

1.1. 环境准备

为了模拟具体场景我们需要准备开发环境

首先需要在mysql数据库中准备一张表:

CREATE TABLE `db_stock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`product_code` varchar(255) DEFAULT NULL COMMENT '商品编号',
`stock_code` varchar(255) DEFAULT NULL COMMENT '仓库编号',
`count` int(11) DEFAULT NULL COMMENT '库存量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

 表中数据如下:

 创建分布式锁demo工程:

 pom依赖文件:


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

 application.yml配置文件:

server:
  port: 6000
spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://172.16.116.100:3306/test
    username: root
    password: root

DistributedLockApplication启动类:

@SpringBootApplication
@MapperScan("com.atguigu.distributedlock.mapper")

public class DistributedLockApplication {
    public static void main(String[] args) {
        SpringApplication.run(DistributedLockApplication.class, args);
   }
}

Stock实体类:

@Data
@TableName("db_stock")
public class Stock {
   @TableId
   private Long id;
   private String productCode;
   private String stockCode;
   private Integer count;
}

StockMapper接口:

public interface StockMapper extends BaseMapper<Stock> {
}

  1.2. 简单实现减库存

接下来咱们代码实操一下

StockController:

@RestController
public class StockController {
    @Autowired
    private StockService stockService;
    @GetMapping("check/lock")
    public String checkAndLock(){
        this.stockService.checkAndLock();
        return "验库存并锁库存成功!";
    }
}

StockService:

@Service
public class StockService {
    @Autowired
    private StockMapper stockMapper;

    public void checkAndLock() {
// 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);
// 再减库存
        if (stock != null && stock.getCount() > 0) {
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
        }
    }
}

测试:

 查看数据库:

在浏览器中一个一个访问时,每访问一次,库存量减1,没有任何问题。

 1.3. 演示超卖现象

接下来咱们使用jmeter压力测试工具,高并发下压测一下,添加线程组:并发100循环50次,即5000次请求。

 

 给线程组添加HTTP Request请求:

填写测试接口路径如下:

再选择你想要的测试报表,例如这里选择聚合报告:

启动测试,查看压力测试报告:

测试结果:请求总数5000次,平均请求时间202ms,中位数(50%)请求是在173ms内完成的,90%请求是在344ms内完成的,最小耗时12ms,最大耗时1125ms,错误率0%,每秒钟平均473.8次。

查看mysql数据库剩余库存数:还有4870

此时如果还有人来下单,就会出现超卖现象(别人购买成功,而无货可发)。

1.4. jvm锁问题演示 

使用jvm锁(synchronized关键字或者ReetrantLock)试试:

 重启tomcat服务,再次使用jmeter压力测试,效果如下:

查看mysql数据库:

 并没有发生超卖现象,完美解决。  

1.4.2. 原理

添加synchronized关键字之后,StockService就具备了对象锁,由于添加了独占的排他锁,同一时刻只 有一个请求能够获取到锁,并减库存。此时,所有请求只会one-by-one执行下去,也就不会发生超卖现象。

1.5. 多服务问题 

 使用jvm锁在单工程单服务情况下确实没有问题,但是在集群情况下会怎样? 接下启动多个服务并使用nginx负载均衡,结构如下:

启动三个服务(端口号分别8000 8100 8200),如下:

1.5.1. 安装配置nginx

基于安装nginx:

# 拉取镜像

docker pull nginx:latest

# 创建nginx对应资源、日志及配置目录

mkdir -p /opt/nginx/logs /opt/nginx/conf /opt/nginx/html

# 先在conf目录下创建nginx.conf文件,配置内容参照下方

# 再运行容器

docker run -d -p 80:80 --name nginx -v /opt/nginx/html:/usr/share/nginx/html 

-v /opt/nginx/conf/nginx.conf:/etc/nginx/nginx.conf -v 
/opt/nginx/logs:/var/log/nginx nginx
user  nginx;
worker_processes  1;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    worker_connections  1024;
}

http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    keepalive_timeout  65;

    #gzip  on;

    #include /etc/nginx/conf.d/*.conf;
	
	upstream distributed {
		server 172.16.116.1:8000;
		server 172.16.116.1:8100;
		server 172.16.116.1:8200;
	}
	
	server {
		listen       80;
        server_name  172.16.116.100;
		location / {
			proxy_pass http://distributed;
		}
	}
	
}

 经过测试,通过nginx访问服务一切正常。

1.5.2. 压力测试

 注意:先把数据库库存量还原到5000。

参照之前的测试用例,再创建一个新的测试组:参数给之前一样

 测试结果:性能只是略有提升。

 数据库库存剩余量如下:

 又出现了并发问题,即出现了超卖现象。

 1.6. mysql锁演示

除了使用jvm锁之外,还可以使用数据锁:悲观锁 或者 乐观锁

悲观锁:在读取数据时锁住那几行,其他对这几行的更新需要等到悲观锁结束时才能继续 。 乐观所:读取数据时不锁,更新时检查是否数据已经被更新过,如果是则取消当前更新,一般在悲观锁 的等待时间过长而不能接受时我们才会选择乐观锁。

1.6.1. mysql悲观锁

在MySQL的InnoDB中,预设的Tansaction isolation level 为REPEATABLE READ(可重读)

在SELECT 的读取锁定主要分为两种方式:

  • SELECT ... LOCK IN SHARE MODE (共享锁)
  • SELECT ... FOR UPDATE (悲观锁)

这两种方式在事务(Transaction) 进行当中SELECT 到同一个数据表时,都必须等待其它事务数据被提交(Commit)后才会执行。 而主要的不同在于LOCK IN SHARE MODE 在有一方事务要Update 同一个表单时很容易造成死锁。简单的说,如果SELECT 后面若要UPDATE 同一个表单,最好使用SELECT ... FOR UPDATE。

代码实现改造StockService:

在StockeMapper中定义selectStockForUpdate方法:

public interface StockMapper extends BaseMapper<Stock> {
    public Stock selectStockForUpdate(Long id);
}

在StockMapper.xml中定义对应的配置:  

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.atguigu.distributedlock.mapper.StockMapper">
    <select id="selectStockForUpdate" 
resultType="com.atguigu.distributedlock.pojo.Stock">
       select * from db_stock where id = #{id} for update
    </select>
</mapper>

压力测试

注意:测试之前,需要把库存量改成5000。压测数据如下:比jvm性能高很多,比无锁要低将近1倍

mysql数据库存:

1.6.2. mysql乐观锁 

乐观锁( Optimistic Locking ) 相对悲观锁而言,乐观锁假设认为数据一般情况下不会造成冲突,所 以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则重试。那么 我们如何实现乐观锁呢?

使用数据版本(Version)记录机制实现,这是乐观锁最常用的实现 方式。一般是通过为数据库表增加 一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一 次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录 的当前版本信息与第一次取 出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新。

给db_stock表添加version字段:

 对应也需要给Stock实体类添加version属性。此处略。

代码实现

public void checkAndLock() {
    // 先查询库存是否充足
    Stock stock = this.stockMapper.selectById(1L);
    // 再减库存
    if (stock != null && stock.getCount() > 0){
        // 获取版本号
        Long version = stock.getVersion();
        stock.setCount(stock.getCount() - 1);
        // 每次更新 版本号 + 1
        stock.setVersion(stock.getVersion() + 1);
        // 更新之前先判断是否是之前查询的那个版本,如果不是重试
        if (this.stockMapper.update(stock, new UpdateWrapper<Stock>
().eq("id", stock.getId()).eq("version", version)) == 0) {
            checkAndLock();
       }
   }
}

 重启后使用jmeter压力测试工具结果如下:

修改测试参数如下:

 测试结果如下:

说明乐观锁在并发量越大的情况下,性能越低(因为需要大量的重试);并发量越小,性能越高。

 1.6.3. mysql锁缺陷

在数据库集群情况下会导致数据库锁失效,并且很多数据库集群的中间件压根就不支持悲观锁。例如:mycat,在读写分离的场景下可能会导致乐观锁不可靠。 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。

 2. 基于mysql实现分布式锁

 不管是jvm锁还是mysql锁,为了保证线程的并发安全,都提供了悲观独占排他锁。所以独占排他也是 分布式锁的基本要求。 可以利用唯一键索引不能重复插入的特点实现。设计表如下:

CREATE TABLE `db_lock` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `lock_name` varchar(50) NOT NULL COMMENT '锁名',
  `class_name` varchar(100) DEFAULT NULL COMMENT '类名',
  `method_name` varchar(50) DEFAULT NULL COMMENT '方法名',
  `server_name` varchar(50) DEFAULT NULL COMMENT '服务器ip',
  `thread_name` varchar(50) DEFAULT NULL COMMENT '线程名',
  `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP 

COMMENT '获取锁时间',
  `desc` varchar(100) DEFAULT NULL COMMENT '描述',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_unique` (`lock_name`)
) ENGINE=InnoDB AUTO_INCREMENT=13329824461455363 DEFAULT CHARSET=utf8;

Lock实体类:  

@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("db_lock")

public class Lock {
    private Long id;
    private String lockName;
    private String className;
    private String methodName;
    private String serverName;
    private String threadName;
    private Date createTime;
    private String desc;
}

LockMapper接口:

public interface LockMapper extends BaseMapper<Lock> {
}

2.1. 基本思路 

synchronized关键字和ReetrantLock锁都是独占排他锁,即多个线程争抢一个资源时,同一时刻只有 一个线程可以抢占该资源,其他线程只能阻塞等待,直到占有资源的线程释放该资源。

2.2. 代码实现

改造StockService:

@Service

public class StockService {
    @Autowired
    private StockMapper stockMapper;
    @Autowired
    private LockMapper lockMapper;
    /**
     * 数据库分布式锁
     */
    public void checkAndLock() {
        // 加锁
        Lock lock = new Lock(null, "lock", this.getClass().getName(), new 
Date(), null);
        try {
            this.lockMapper.insert(lock);
       } catch (Exception ex) {
            // 获取锁失败,则重试
            try {
                Thread.sleep(50);
                this.checkAndLock();
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);
        // 再减库存
        if (stock != null && stock.getCount() > 0){
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
       }
        // 释放锁
        this.lockMapper.deleteById(lock.getId());
   }
}

加锁:  

// 加锁
Lock lock = new Lock(null, "lock", this.getClass().getName(), new Date(), null);
try {
    this.lockMapper.insert(lock);
} catch (Exception ex) {
    // 获取锁失败,则重试
    try {
        Thread.sleep(50);
        this.checkAndLock();
   } catch (InterruptedException e) {
        e.printStackTrace();
   }
}

解锁:

// 释放锁
this.lockMapper.deleteById(lock.getId());

使用Jmeter压力测试结果:

 可以看到性能感人。mysql数据库库存余量为0,可以保证线程安全。 

2.3. 缺陷及解决方案 

1. 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。

解决方案:给锁数据库 搭建主备

2. 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。

解决方案:只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。

3. 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

解决方案:记录获取锁的主机信息和线程信息,如果相同线程要获取锁,直接重入。

4. 受制于数据库性能,并发能力有限。

解决方案:无法解决。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- huatuo0.cn 版权所有 湘ICP备2023017654号-2

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务