队列
Pydantic 非常有助于验证进出队列的数据。下面,我们将探讨如何使用各种队列系统验证/序列化数据。
Redis 队列¶
Redis 是一种流行的内存数据结构存储。
为了在本地运行此示例,您首先需要安装 Redis并在本地启动服务器。
这是一个简单的示例,展示了如何使用 Pydantic 来:1. 序列化数据以推送到队列 2. 反序列化和验证从队列中弹出的数据
import redis
from pydantic import BaseModel, EmailStr
class User(BaseModel):
id: int
name: str
email: EmailStr
r = redis.Redis(host='localhost', port=6379, db=0)
QUEUE_NAME = 'user_queue'
def push_to_queue(user_data: User) -> None:
serialized_data = user_data.model_dump_json()
r.rpush(QUEUE_NAME, user_data.model_dump_json())
print(f'Added to queue: {serialized_data}')
user1 = User(id=1, name='John Doe', email='[email protected]')
user2 = User(id=2, name='Jane Doe', email='[email protected]')
push_to_queue(user1)
#> Added to queue: {"id":1,"name":"John Doe","email":"[email protected]"}
push_to_queue(user2)
#> Added to queue: {"id":2,"name":"Jane Doe","email":"[email protected]"}
def pop_from_queue() -> None:
data = r.lpop(QUEUE_NAME)
if data:
user = User.model_validate_json(data)
print(f'Validated user: {repr(user)}')
else:
print('Queue is empty')
pop_from_queue()
#> Validated user: User(id=1, name='John Doe', email='[email protected]')
pop_from_queue()
#> Validated user: User(id=2, name='Jane Doe', email='[email protected]')
pop_from_queue()
#> Queue is empty