适合放入缓存的数据

  1. 即时性、数据一致性要求不高的。

  2. 访问量大且更新频率不高的数据(读多,写少)。

    举例:

    1.电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率来定)。

    2.后台如果发布一个商品,买家需要5分钟才能看到新的商品一般还是可以接受的。

    3.物流信息。

读模式缓存使用流程

1635512623631

1635512538828

本地缓存与局限性

集群情况下,每个节点的本地缓存可能会不一致(数据一致性)

1635513182709

分布式缓存

使用缓存中间件:redis(集群、分片)

1635513259851

缓存失效

读模式,会存在缓存失效问题:缓存穿透、雪崩、击穿

缓存穿透

缓存穿透:查询一个一定不存在的数据,导致一定会查询缓存+查询DB,缓存失去意义(大并发过来时任然会查询db)。

风险:利用不存在的数据进行攻击,数据库顺时压力增大。

最终导致崩溃解决:

​ 方法1:将null结果缓存,并加入短暂过期时间 弊端:查询条件使用UUID生成,仍然出现缓存穿透问题,并且redis存满了null。

​ 方法2:布隆过滤器,不放行不存在的查询,在redis维护id的hash表过滤掉id不存在的查询(不到达DB层查询)。

缓存雪崩

缓存雪崩:高并发状态下,大面积redis数据失效,导致所有查询到达DB,DB瞬时压力过重雪崩。

解决方法:

​ 方法1:规避雪崩,设置随机的有效时间(实际上无需设置随机时间,因为每个缓存放入库中的时间本身就不固定)让每一个缓存过期时间重复率降低。

​ 方法2:永不失效。

​ 方法3:

​ 事前:尽量保证整个 redis 集群的高可用性,发现机器宕机尽快补上。选择合适的内存淘汰策略。

​ 事中:本地ehcache缓存 + hystrix限流&降级,避免MySQL崩掉。

​ 事后:利用 redis 持久化机制保存的数据尽快恢复缓存 。

问题:如果已经出现了缓存雪崩,如何解决?

​ 方法1:熔断、降级。

缓存击穿

缓存击穿:高并发状态下,一条数据过期,所有请求到达DB

解决方法:

​ 方法1:加分布式锁,例原子操作(Redis的SETNX或者Memcache的ADD)。

​ 流程:查询cache失败,竞争锁,竞争成功查询cache,查询成功返回释放锁,查询失败则查询DB,并set缓存,并释放锁。

​ 方法2:永不失效

分布式锁

实现原理

1
2
文档1:http://redisdoc.com/string/set.html
文档2:http://www.redis.cn/commands/set.html

1635664198425

1635670158689

1635673350495

原理演示SETNX

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1.打开多个sh框

2.打开xshell撰写栏(查看->撰写->撰写栏)

3.编辑命令,发送给多个窗口,同时连接redis客户端
docker exec -it redis redis-cli

4.编辑命令,发送给多个窗口,同时占锁
set key value NX
返回OK表示占锁成功,返回nill占锁失败

5.设置锁过期时间
set key value EX 300 NX

6.查看锁过期时间
ttl lock

发送命令至全部会话:

1635669533421

锁值:

1635670061333

问题合集

问题1:(删除锁)未执行删除锁逻辑,会导致其他线程无法获得锁,出现死锁

问题2:(设置过期时间)锁释放操作可能失败(服务宕机),所以需要设置过期时间

问题3:(设置过期时间的原子性)设置过期时间的代码必须在setnx抢占锁的同时设置,保证原子性

问题4:(仅可以删除当前线程占用的锁)删除锁时,可能锁已过期删除了其他线程的锁,占锁时设置值为uuid,删除时判断当前uuid是否相等,并且需要使用lua脚本执行原子删除操作

分布式锁类型

可重入锁

1
2
// redisson实现了JUC包下的可重入锁
RLock lock = redissonClient.getLock("redisson_lock");

公平锁

1
2
// 有顺序进行加锁操作,按照请求的顺序
RLock lock = redisson.getFairLock("fair-lock");

读写锁

1
2
3
4
5
6
7
8
// 写+读:读阻塞
// 写+写:阻塞
// 读+写:写阻塞
RReadWriteLock rwlock = redisson.getReadWriteLock("lock");
// 读锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 写锁
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

写锁

1597734236213

读锁

1597734296729

读锁同时存入多个

1597735057026

信号量Semphore

先设置一个值:”park” 3。

  1. acquire:获取一个信号量,为0阻塞。
  2. release:释放一个信号量,+1。
  3. tryacquire:尝试获取一个信号量,不阻塞。

作用:【限流】
所有服务上来了去获取一个信号量,一个一个放行(最多只能n个线程同时执行)

1635689277138

闭锁CountDownLatch

等待一组操作执行完毕,统一执行

1635689625640

锁的粒度

锁的粒度一定要小,例如不应该锁整个商品操作,应该带上商品ID

锁时效问题

结果放入缓存的操作,应该放在同步代码块内,否则会造成重复查询DB的情况。

1635663356474

redis分布式锁版本

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 查询三级分类(原生版redis分布式锁版本)
*/
public Map<String, List<Catalog2VO>> getCatalogJsonFromDBWithRedisLock() {
// 1.抢占分布式锁,同时设置过期时间
String uuid = UUID.randomUUID().toString();
// 使用setnx占锁(setIfAbsent)
Boolean isLock = redisTemplate.opsForValue().setIfAbsent(CategoryConstant.LOCK_KEY_CATALOG_JSON, uuid, 300, TimeUnit.SECONDS);
if (isLock) {
// 2.抢占成功
Map<String, List<Catalog2VO>> result = null;
try {
// 查询DB
return getCatalogJsonFromDB();
} finally {
// 3.查询UUID是否是自己,是自己的lock就删除
// 封装lua脚本(原子操作解锁)
// 查询+删除(当前值与目标值是否相等,相等执行删除,不等返回0)
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call('del',KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
// 删除锁
redisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList(CategoryConstant.LOCK_KEY_CATALOG_JSON), uuid);
}
} else {
// 4.加锁失败,自旋重试
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonFromDBWithRedisLock();
}
}

Redisson

1
文档:https://github.com/redisson/redisson/wiki/Table-of-Content

概述

1.不推荐直接使用SETNX实现分布式锁,应该使用Redisson,因为根据锁的实现会分为:读写锁、可重入锁、闭锁、信号量。

2.封装了分布式Map、List等类型。

3.Redisson与lettuce、jedis一样都是redis的客户端,代替了redisTemplate。

使用原生redisson

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
步骤:
1.引入依赖
<!--redisson,redis客户端,封装了分布式锁实现,也可以使用springboot的方式,不需要自己配置-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.3</version>
</dependency>

2.配置类
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
@Configuration
public class MyRedissonConfig {

/**
* 注入客户端实例对象
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redisson(@Value("${spring.redis.host}") String host, @Value("${spring.redis.port}")String port) throws IOException {
// 1.创建配置
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port);// 单节点模式
// config.useSingleServer().setAddress("rediss://" + host + ":" + port);// 使用安全连接
// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001");// 集群模式
// 2.创建redisson客户端实例
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}

redisson分布式锁版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 查询三级分类(redisson分布式锁版本)
*/
public Map<String, List<Catalog2VO>> getCatalogJsonFromDBWithRedissonLock() {
// 1.抢占分布式锁,同时设置过期时间
RLock lock = redisson.getLock(CategoryConstant.LOCK_KEY_CATALOG_JSON);
lock.lock(30, TimeUnit.SECONDS);
try {
// 2.查询DB
Map<String, List<Catalog2VO>> result = getCatalogJsonFromDB();
return result;
} finally {
// 3.释放锁
lock.unlock();
}
}

优点

自动续期

原理:

  1. 默认过期时间30S。
  2. 业务超长情况下,锁自动续期+30S,利redis看门狗实现。
  3. 如果线程宕机,看门狗不会自动续期,锁会自动过期。
  4. unlock使用lua脚本释放锁,不会出现误删锁。

指定超时不自动续期

查看源码

  1. 当不指定超时时间时,默认30S过期,且启动一个定时任务【自动续期任务】。
  2. 续期时间点=默认过期时间/3,每隔10S执行一次续期。
  3. 当指定超时时间时,不会自动续期

推荐设置过期时间

  1. 可以省略自动续期操作。
  2. 若真的超时未完成,则很有可能是数据库宕机,即使续期也无法完成,不应该无限续期下去。