使用 Django 查询数据库的分表

使用 Django 查询数据库的分表

10月 18, 2019
Python, MySQL, Django

业务背景 #

最近已经转到 Go 下面进行日常工作的开发,我的一个后台项目有一些内部轻量的数据查询需要,考虑需求的复杂度不高,就平常自己和同事几个人使用,没必要专门开发一个前端项目来进行数据查询,于是我想到了老本行 Django,由于 Django 出色的后台管理功能,我几乎不用做多少开发,只需要配置下 Model 层结构就可以了,唯一的问题就是这个 Go 项目的数据在数据库中是分表管理的。

Model 对象 #

如果数据库进行了分表,这时要在上层搭建 Django 应用,就也需要在 Django 的 Model 层实现能够根据一定规则访问不同分表号的功能。

class OrderModel(models.Model):
    user_id = models.BigIntegerField()
    order_sn = models.CharField(max_length=45)

    class Meta:
        db_table = 'table_00'

    _user_model_dict = {}

    @classmethod
    def get_user_db_model(cls, user_id=None):
        suffix = "00"
        if not user_id:
            table_name = "table_00"
        else:
            suffix = get_table_no(user_id)
            table_name = 'table_%s' % suffix
        if table_name in cls._user_model_dict:
            return cls._user_model_dict[table_name]

        class Meta:
            db_table = table_name
            abstract = False

        user_db_model = type(str('table_%s' % suffix), (models.Model,), {
            'Meta': Meta,
            '__module__': cls.__module__,
            'user_id': models.BigIntegerField(),
            'order_sn': models.CharField(max_length=45),
        })

        cls._user_model_dict[table_name] = user_db_model
        return user_db_model

如上代码,通过在 get_user_db_model 里根据 uid 计算实际的分表号,最后构建新的 model 对象缓存并返回。使用方法如下:

m = OrderModel.get_user_db_model(user_id=123)
qs = m.objects.get(order_sn="ABC")

Admin 查询 #

由于我的场景主要是在 Django Admin 里查询数据,所以也需要修改一下 admin.py 里的 ModelAdmin 对象,原来 admin.py 中一般是继承 admin.ModelAdmin 然后自定义查询字段:

from django.db import models

@admin.register(LogEntry)
class DjangoLogAdmin(admin.ModelAdmin):
    list_display = ['id', 'content_type', 'user',
                    'object_repr', 'change_message', 'action_time']
    search_fields = ['user_id']

而其中 admin.ModelAdmin 的实现中,get_queryset 方法的查询是不能查询分表的,所以我需要重写这个方法,由于我的业务分表是根据 uid 的,故在 admin 后台查询时也需要加上 uid 才能定位到表。

class DivisionAdmin(admin.ModelAdmin):
    def get_queryset(self, request):
        if request.GET.get('q'):
            query = request.GET.copy()
            params = [q.strip() for q in request.GET['q'].split(' ')]
            user_id = params[0]
            db_model = self.model.get_user_db_model(user_id)
            if len(params) == 1:
                if user_id.isdigit():
                    query['q'] = user_id
                else:
                    messages.add_message(request, messages.WARNING, '查询参数错误,需要的格式为[user_id]+[空格]+[查询参数]')
                    return super(DivisionAdmin, self).get_queryset(request)
            else:
                query['q'] = params[1]

            request.GET = query
            if db_model.__name__.startswith("table_name"):
                qs = db_model.objects.filter(buyer_id=user_id)
            else:
                qs = db_model.objects.filter(user_id=user_id)
            ordering = self.get_ordering(request)
            if ordering:
                qs = qs.order_by(*ordering)
            return qs
        return super(DivisionAdmin, self).get_queryset(request)

然后在 admin.py 中用 DivisionAdmin 替换掉之前继承的 admin.ModelAdmin 即可了。

@admin.register(LogEntry)
class DjangoLogAdmin(DivisionAdmin):
    list_display = ['id', 'content_type', 'user',
                    'object_repr', 'change_message', 'action_time']
    search_fields = ['user_id']

参考 #

本文共 669 字,上次修改于 Feb 25, 2024,以 CC 署名-非商业性使用-禁止演绎 4.0 国际 协议进行许可。

相关文章

» Ubuntu 下部署 Django 应用

» Django 中 N+1 查询问题优化

» Django 的中间件执行顺序

» Django 的软删除设计

» 浅谈 Django-REST-Framework 的设计与源码