使用 Django 查询数据库的分表
10月 18, 2019
业务背景 #
最近已经转到 Go 下面进行日常工作的开发,我的一个后台项目有一些内部轻量的数据查询需要,考虑需求的复杂度不高,就平常自己和同事几个人使用,没必要专门开发一个前端项目来进行数据查询,于是我想到了老本行 Django,由于 Django 出色的后台管理功能,我几乎不用做多少开发,只需要配置下 Model 层结构就可以了,唯一的问题就是这个 Go 项目的数据在数据库中是分表管理的。
Model 对象 #
如果数据库进行了分表,这时要在上层搭建 Django 应用,就也需要在 Django 的 Model 层实现能够根据一定规则访问不同分表号的功能。
class OrderModel(models.Model):
user_id = models.BigIntegerField()
order_sn = models.CharField(max_length=45)
class Meta:
db_table = 'table_00'
_user_model_dict = {}
@classmethod
def get_user_db_model(cls, user_id=None):
suffix = "00"
if not user_id:
table_name = "table_00"
else:
suffix = get_table_no(user_id)
table_name = 'table_%s' % suffix
if table_name in cls._user_model_dict:
return cls._user_model_dict[table_name]
class Meta:
db_table = table_name
abstract = False
user_db_model = type(str('table_%s' % suffix), (models.Model,), {
'Meta': Meta,
'__module__': cls.__module__,
'user_id': models.BigIntegerField(),
'order_sn': models.CharField(max_length=45),
})
cls._user_model_dict[table_name] = user_db_model
return user_db_model
如上代码,通过在 get_user_db_model
里根据 uid 计算实际的分表号,最后构建新的 model 对象缓存并返回。使用方法如下:
m = OrderModel.get_user_db_model(user_id=123)
qs = m.objects.get(order_sn="ABC")
Admin 查询 #
由于我的场景主要是在 Django Admin 里查询数据,所以也需要修改一下 admin.py
里的 ModelAdmin
对象,原来 admin.py
中一般是继承 admin.ModelAdmin
然后自定义查询字段:
from django.db import models
@admin.register(LogEntry)
class DjangoLogAdmin(admin.ModelAdmin):
list_display = ['id', 'content_type', 'user',
'object_repr', 'change_message', 'action_time']
search_fields = ['user_id']
而其中 admin.ModelAdmin
的实现中,get_queryset
方法的查询是不能查询分表的,所以我需要重写这个方法,由于我的业务分表是根据 uid 的,故在 admin 后台查询时也需要加上 uid 才能定位到表。
class DivisionAdmin(admin.ModelAdmin):
def get_queryset(self, request):
if request.GET.get('q'):
query = request.GET.copy()
params = [q.strip() for q in request.GET['q'].split(' ')]
user_id = params[0]
db_model = self.model.get_user_db_model(user_id)
if len(params) == 1:
if user_id.isdigit():
query['q'] = user_id
else:
messages.add_message(request, messages.WARNING, '查询参数错误,需要的格式为[user_id]+[空格]+[查询参数]')
return super(DivisionAdmin, self).get_queryset(request)
else:
query['q'] = params[1]
request.GET = query
if db_model.__name__.startswith("table_name"):
qs = db_model.objects.filter(buyer_id=user_id)
else:
qs = db_model.objects.filter(user_id=user_id)
ordering = self.get_ordering(request)
if ordering:
qs = qs.order_by(*ordering)
return qs
return super(DivisionAdmin, self).get_queryset(request)
然后在 admin.py
中用 DivisionAdmin
替换掉之前继承的 admin.ModelAdmin
即可了。
@admin.register(LogEntry)
class DjangoLogAdmin(DivisionAdmin):
list_display = ['id', 'content_type', 'user',
'object_repr', 'change_message', 'action_time']
search_fields = ['user_id']