Tengo problemas para entender cómo abrir y cerrar adecuadamente las sesiones de base de datos de manera eficiente, como entendí por la documentación sqlalchemy, si uso scoped_session para construir mi objeto Session, y luego uso la Sesión devuelta objeto para crear sesiones, es seguro para el hilo, por lo que básicamente cada hilo tendrá su propia sesión y no habrá problemas con ella. Ahora el ejemplo siguiente funciona, lo puse en un ciclo infinito para ver si cierra las sesiones correctamente y si lo controlé correctamente (en mysql ejecutando "SHOW PROCESSLIST;"), las conexiones siguen creciendo, no las cierra , aunque utilicé session.close(), e incluso eliminé el objeto scoped_session al final de cada ejecución. ¿Qué estoy haciendo mal? Mi objetivo en una aplicación más grande es usar la cantidad mínima de conexiones de base de datos requeridas, porque mi implementación de trabajo actual crea una nueva sesión en cada método donde es necesaria y la cierra antes de regresar, lo que parece ineficaz.SQLAlchemy adecuada gestión de sesión en aplicaciones multi-hilo
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel
DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)
def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue
finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return
def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)
for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()
self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()
if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()
Gracias por la información, que era muy útil. ¡Atentamente! – andrean