一、环境准备与连接配置
1.1 安装Redis客户端库
pip install redis
1.2 连接Redis服务器
import redis
# 基础连接
r = redis.Redis(
host='localhost',
port=6379,
db=0,
password='yourpassword', # 无密码可省略
decode_responses=True # 自动解码为字符串
)
# 连接池方式(推荐生产环境使用)
pool = redis.ConnectionPool(
max_connections=50,
**config # 包含host/port等参数
)
r_pool = redis.Redis(connection_pool=pool)
![图片[1]_Python操作Redis键值对完全指南_知途无界](https://zhituwujie.com/wp-content/uploads/2025/07/d2b5ca33bd20250720093417.png)
二、键值对基础操作
2.1 添加/修改键值对
# 简单设置
r.set('username', '张三')
# 带过期时间(30秒)
r.setex('temp_token', 30, 'a1b2c3')
# 批量设置
r.mset({
'email': 'user@example.com',
'age': '28'
})
# 不存在时才设置(原子操作)
r.setnx('unique_id', '10086')
2.2 删除操作
# 单个删除
r.delete('username')
# 批量删除
r.delete('email', 'age')
# 模式匹配删除(先查询再删除)
keys = r.keys('temp_*')
if keys:
r.delete(*keys) # 注意解包操作
三、高级数据结构操作
3.1 哈希类型(Hash)
# 添加哈希字段
r.hset('user:1001', mapping={
'name': '李四',
'points': '1500'
})
# 获取单个字段
name = r.hget('user:1001', 'name')
# 获取全部字段
user_data = r.hgetall('user:1001')
# 删除指定字段
r.hdel('user:1001', 'points')
3.2 列表类型(List)
# 左端插入
r.lpush('messages', 'msg1', 'msg2')
# 右端插入
r.rpush('messages', 'msg3')
# 范围获取
msgs = r.lrange('messages', 0, -1)
# 弹出元素
first = r.lpop('messages')
四、事务与管道操作
4.1 事务处理
with r.pipeline() as pipe:
try:
# 开启事务
pipe.multi()
pipe.set('counter', '100')
pipe.incr('counter')
pipe.expire('counter', 60)
# 执行事务
pipe.execute()
except redis.exceptions.WatchError:
print("事务执行失败")
4.2 管道批量操作
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.get('key2')
pipe.delete('key3')
results = pipe.execute() # 返回[True, 'value2', 1]
五、过期时间管理
5.1 设置过期时间
# 设置键时直接指定
r.setex('session:1', 3600, 'data')
# 对已存在键设置
r.expire('user:1001', 7200) # 2小时
# 指定到期时间戳
import time
r.expireat('temp_data', int(time.time()) + 1800)
5.2 查询剩余时间
ttl = r.ttl('session:1') # 返回剩余秒数
if ttl == -2:
print("键不存在")
elif ttl == -1:
print("键存在但未设置过期")
六、安全删除模式
6.1 安全删除流程
sequenceDiagram
客户端->>Redis: WATCH key
客户端->>Redis: GET key
客户端->>Redis: MULTI
客户端->>Redis: DEL key
客户端->>Redis: EXEC
Redis-->>客户端: 返回删除结果
6.2 代码实现
def safe_delete(key):
with r.pipeline() as pipe:
while True:
try:
pipe.watch(key)
if not pipe.exists(key):
pipe.unwatch()
return False
pipe.multi()
pipe.delete(key)
if pipe.execute()[0]:
return True
except redis.WatchError:
continue
七、性能优化技巧
7.1 批量操作对比
| 操作方式 | 网络开销 | 原子性 | 代码复杂度 |
|---|---|---|---|
| 单命令循环 | 高 | 无 | 低 |
| 事务(MULTI) | 中 | 有 | 中 |
| 管道(Pipeline) | 低 | 无 | 高 |
7.2 内存优化建议
# 使用哈希压缩小对象
r.hset('small:obj', mapping={
'f1': 'v1',
'f2': 'v2'
})
# 使用列表分片
for i in range(0, 1000, 100):
r.rpush('big_list', *range(i, i+100))
八、异常处理与监控
8.1 常见异常类型
try:
r.get('nonexistent_key')
except redis.exceptions.ConnectionError:
print("连接失败")
except redis.exceptions.ResponseError:
print("命令执行错误")
except Exception as e:
print(f"未知错误: {str(e)}")
8.2 健康检查
def check_redis_health():
try:
return r.ping() and r.info()['connected_clients'] < 1000
except:
return False
九、实际应用案例
9.1 用户会话管理
class SessionManager:
def __init__(self, redis_conn):
self.r = redis_conn
def create_session(self, user_id, data, ttl=1800):
key = f"session:{user_id}"
self.r.hset(key, mapping=data)
self.r.expire(key, ttl)
return key
def get_session(self, user_id):
return self.r.hgetall(f"session:{user_id}")
def destroy_session(self, user_id):
return bool(self.r.delete(f"session:{user_id}"))
9.2 简易消息队列
class SimpleQueue:
def __init__(self, name, redis_conn):
self.key = f"queue:{name}"
self.r = redis_conn
def put(self, item):
self.r.rpush(self.key, json.dumps(item))
def get(self, timeout=30):
_, data = self.r.blpop(self.key, timeout=timeout)
return json.loads(data) if data else None
十、Redis键命名规范
10.1 推荐命名结构
graph LR
A[业务模块] --> B[数据类型]
B --> C[ID/名称]
B --> D[字段]
示例:
user:1001:profileorder:20230715:itemsconfig:payment:timeout
10.2 键名设计原则
- 使用冒号分层(
:) - 避免过长(<256字节)
- 不含空格和特殊字符
- 版本控制(如
v1:user:1001)
完整示例代码
import redis
import json
class RedisManager:
def __init__(self, **config):
self.pool = redis.ConnectionPool(**config)
self.r = redis.Redis(connection_pool=self.pool)
def add_data(self, key, value, ttl=None):
"""添加/更新键值对"""
if ttl:
return self.r.setex(key, ttl, value)
return self.r.set(key, value)
def batch_add(self, items, ttl=None):
"""批量添加数据"""
with self.r.pipeline() as pipe:
for key, value in items.items():
if ttl:
pipe.setex(key, ttl, value)
else:
pipe.set(key, value)
return pipe.execute()
def delete(self, *keys):
"""删除键值对"""
return self.r.delete(*keys)
def safe_delete(self, pattern):
"""模式匹配安全删除"""
keys = self.r.keys(pattern)
if not keys:
return 0
return self.r.delete(*keys)
def get_with_fallback(self, key, fallback_func, ttl=300):
"""缓存穿透保护"""
value = self.r.get(key)
if value is not None:
return json.loads(value)
# 执行回退函数获取数据
data = fallback_func()
self.r.setex(key, ttl, json.dumps(data))
return data
# 使用示例
if __name__ == '__main__':
config = {
'host': 'localhost',
'port': 6379,
'db': 0,
'decode_responses': False # 保留原始bytes数据
}
manager = RedisManager(**config)
# 添加数据
manager.add_data('test:str', 'hello', ttl=60)
# 批量操作
items = {
'user:1001:name': '张三',
'user:1001:age': '28'
}
manager.batch_add(items, ttl=3600)
# 安全删除
manager.safe_delete('temp:*')
# 带缓存的查询
def fetch_from_db():
return {'data': '从数据库获取的原始数据'}
data = manager.get_with_fallback(
'cache:some_key',
fetch_from_db
)
print(data)
关键要点总结
- 连接管理:生产环境务必使用连接池
- 原子操作:利用
setnx等实现分布式锁 - 批量处理:管道(pipeline)提升吞吐量
- 内存优化:小对象使用Hash压缩存储
- 异常处理:网络抖动时自动重试机制
- 命名规范:层次化键名便于维护
- 安全删除:复杂场景使用WATCH+MULTI
通过本指南,您已掌握Python操作Redis键值对的完整技能栈。建议根据实际业务需求:
- 高频访问数据设置合理过期时间
- 大Value考虑分片存储
- 敏感操作添加事务保障
- 定期监控内存使用情况
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END

























暂无评论内容