sql >> Database teknologi >  >> RDS >> PostgreSQL

Django ORM lækker forbindelser, når du bruger ThreadPoolExecutor

Mit gæt er, at ThreadPoolExecutor er ikke det, der skaber DB-forbindelsen, men de trådede job er dem, der holder forbindelsen. Jeg har allerede været nødt til at håndtere det her.

Jeg endte med at bygge denne wrapper for at sikre, at tråde lukkes manuelt, når der udføres job i en ThreadPoolExecutor. Dette burde være nyttigt for at sikre, at forbindelser ikke lækkes, indtil videre har jeg ikke set nogen læk, mens jeg brugte denne kode.

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)

Så kan du bare bruge dette:

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))

...og vær sikker på, at forbindelserne vil lukke.




  1. Postgresql SELECT tilfældig med unik værdi

  2. Hvordan abonnerer man ny bruger på notifikationer?

  3. Hvordan kan OpenShift-beholder lære sit billed-id?

  4. Spring Data Join med specifikationer