跳到内容

队列

Pydantic 非常有助于验证进出队列的数据。下面,我们将探讨如何使用各种队列系统验证/序列化数据。

Redis 队列

Redis 是一种流行的内存数据结构存储。

为了在本地运行此示例,您首先需要安装 Redis并在本地启动服务器。

这是一个简单的示例,展示了如何使用 Pydantic 来:1. 序列化数据以推送到队列 2. 反序列化和验证从队列中弹出的数据

import redis

from pydantic import BaseModel, EmailStr


class User(BaseModel):
    id: int
    name: str
    email: EmailStr


r = redis.Redis(host='localhost', port=6379, db=0)
QUEUE_NAME = 'user_queue'


def push_to_queue(user_data: User) -> None:
    serialized_data = user_data.model_dump_json()
    r.rpush(QUEUE_NAME, user_data.model_dump_json())
    print(f'Added to queue: {serialized_data}')


user1 = User(id=1, name='John Doe', email='[email protected]')
user2 = User(id=2, name='Jane Doe', email='[email protected]')

push_to_queue(user1)
#> Added to queue: {"id":1,"name":"John Doe","email":"[email protected]"}

push_to_queue(user2)
#> Added to queue: {"id":2,"name":"Jane Doe","email":"[email protected]"}


def pop_from_queue() -> None:
    data = r.lpop(QUEUE_NAME)

    if data:
        user = User.model_validate_json(data)
        print(f'Validated user: {repr(user)}')
    else:
        print('Queue is empty')


pop_from_queue()
#> Validated user: User(id=1, name='John Doe', email='[email protected]')

pop_from_queue()
#> Validated user: User(id=2, name='Jane Doe', email='[email protected]')

pop_from_queue()
#> Queue is empty