Jesús Leganés-Combarro 'piranna'
To do what anyone can do, leave it to anyone else
Adding Backpressure to Python’s ProcessPoolExecutor
Adding Backpressure to Python’s ProcessPoolExecutor
User: Jesús Leganés-Combarro (jesus.leganes.combarro@gmail.com)
Created: 2025/9/25 17:42:45
Updated: 2025/10/1 6:52:09
Exported: 2025/10/1 11:30:33
Link: https://chatgpt.com/g/g-p-68d8ee08302c819183dad298118b8161/c/68d562f4-7b40-8332-8b23-fe9db40c297f
Prompt:
Estoy usando un process pool executor de Python y el caso es de que funciona muy bien pero lo estoy saturando de tantas tareas que no es capaz de procesarlas todas. Entonces se satura y empieza a meter trabajos dentro de su cola interna de WorkIDs hasta que termina consumiéndose toda la memoria y cada vez de hecho hay una mayor latencia. He pensado en sobreescribir el WorkID que está utilizando internamente por uno indicando un límite propio para de esa forma cuando se llame a submit si se supera el límite de la cola se bloquean las peticiones y de esa forma pues tenga una especie como de backpressor y de control de congestión. Pero no sé qué valor poner. La opción por defecto sería poner el mismo número en la cola que elementos que están por el número de CPUs pero no sé si eso será muy limitante o si por el contrario lo que haría sería que estuviera ya un único elemento disponible para disponible siempre. Quizás poner el doble el número de CPUs. No sé qué valor podría ser adecuado en este caso. Estamos hablando de decenas de miles de objetos a procesar y están siendo alimentados desde posiblemente más de 100 hilos distintos a un único process pool executor que en mi máquina tiene 8 CPUs, entonces 8 tareas simultáneas.
Response:
Pensado
¡Buena pregunta! Con ProcessPoolExecutor
no hay API pública para cambiar el tamaño de su cola interna, y “tocar” WorkItem/WorkQueue
por dentro te va a meter en zona frágil. La forma estable de conseguir backpressure es limitar tú mismo cuántos “futures” pueden estar en vuelo (pendientes) a la vez y/o hacer batching (procesar en lotes) para reducir la granularidad.
Aquí tienes patrones que funcionan bien y cómo elegir los números.
- Límite de “futures” pendientes (backpressure real) =====================================================
La idea: solo dejas N tareas en vuelo; si N se alcanza, quien llama a submit()
se bloquea hasta que alguna termine. Esto evita que la cola interna crezca sin control, incluso con decenas de miles de elementos y >100 productores.
Regla práctica para N (pendientes)
Para 8 CPUs y trabajo CPU-bound:
- Empieza con N = 4 × nº_workers → con 8 workers: N = 32.
- Si tu carga tiene varianza alta en tiempos (algunas tareas tardan mucho más), sube a 6–8× (48–64).
- Evita miles: solo aumenta la latencia y el uso de memoria.
Patrón con Semaphore
(seguro con muchos hilos productores):
from concurrent.futures import ProcessPoolExecutor, as_completed
from threading import Semaphore
MAX_WORKERS = 8
MAX_IN_FLIGHT = MAX_WORKERS * 4 # empieza con 32
sem = Semaphore(MAX_IN_FLIGHT)
def submit_with_backpressure(executor, fn, *args, **kwargs):
sem.acquire() # se bloquea si ya hay muchos pendientes
fut = executor.submit(fn, *args, **kwargs)
# Cuando termine, liberamos un hueco:
fut.add_done_callback(lambda f: sem.release())
return fut
# Uso (con muchos productores):
executor = ProcessPoolExecutor(max_workers=MAX_WORKERS)
futures = []
for item in stream_de_decenas_de_miles():
futures.append(submit_with_backpressure(executor, trabajo_cpu_bound, item))
# Consumir resultados sin acumular en memoria:
for f in as_completed(futures):
res = f.result()
consume(res)
Si ya tienes >100 hilos enviando, este patrón es ideal: todos competirán por el Semaphore
y no sobrepasarán el umbral.
- Un “feeder” con cola acotada ===============================
Otra variante: pones una cola Queue(maxsize=M)
entre productores y un único feeder que hace submit()
. Es conveniente cuando quieres centralizar métricas/reintentos y tener aún más control.
from queue import Queue
from threading import Thread
from concurrent.futures import ProcessPoolExecutor, as_completed
MAX_WORKERS = 8
MAX_IN_FLIGHT = 32
q = Queue(maxsize=MAX_IN_FLIGHT)
def feeder(q, executor, futures):
while True:
item = q.get()
if item is None: break
fut = executor.submit(trabajo_cpu_bound, item)
futures.append(fut)
q.task_done()
executor = ProcessPoolExecutor(max_workers=MAX_WORKERS)
futures = []
t = Thread(target=feeder, args=(q, executor, futures), daemon=True)
t.start()
# Productores: se bloquearán al hacer put() si la cola está llena
for item in stream_de_decenas_de_miles():
q.put(item)
# Señal de fin
q.put(None); q.join()
for f in as_completed(futures):
consume(f.result())
- Batching /
chunksize
(reduce overhead) ===========================================
Si tus tareas son cortas (milisegundos), el overhead de IPC + pickling te mata. Solución:
- Agrupa varios ítems en un lote y que cada proceso consuma un lote.
- O usa
executor.map(fn, iterable, chunksize=K)
(enProcessPoolExecutor.map
elchunksize
sí existe).
Reglas para chunksize
:
- Empieza con K = 50–200 si cada ítem es rápido (<10–20 ms).
- Si cada ítem tarda más (≥100 ms), K pequeño (1–10) suele ir bien.
- Objetivo: que cada tarea dure 50–500 ms como orden de magnitud; así amortizas overhead sin hacer la cola granítica.
Ejemplo (map con backpressure implícito y chunks):
from concurrent.futures import ProcessPoolExecutor
def procesa_lote(lote):
return [trabajo_cpu_bound(x) for x in lote]
def lotes(iterable, k):
buf = []
for x in iterable:
buf.append(x)
if len(buf) == k:
yield buf; buf = []
if buf: yield buf
MAX_WORKERS = 8
CHUNK = 100 # ajusta
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as ex:
for resultados in ex.map(procesa_lote, lotes(stream_de_decenas_de_miles(), CHUNK)):
for r in resultados:
consume(r)
Nota:
Executor.map
ya limita los envíos (no “dispara” infinito a la vez), pero controlar elchunksize
es clave para que no se acumulen demasiados “work items” ultracortos.
- Otros ajustes que ayudan ===========================
-
maxtasksperchild
(si usasmultiprocessing.Pool
) o reciclar procesos periódicamente si hay crecimiento de memoria por cachés/liberadores lentos. EnProcessPoolExecutor
no está expuesto, pero puedes rediseñar para que cada proceso haga “mini-lotes” y evitar fugas/cachés grandes. - Evita compartir objetos enormes: pasa IDs/rutas y que el proceso los cargue. El pickling de objetos gigantes infla memoria y latencia.
-
Medir: registra in-flight, latencias por tarea, RSS de procesos, tamaño de cola. Con eso afinas
MAX_IN_FLIGHT
ychunksize
.
Resumen de números para tu caso (8 CPUs, >100 productores, decenas de miles de ítems)
-
max_workers = 8
(ok para CPU-bound). - Límite de pendientes: empieza con 32 (4×), sube a 64 si notas ociosidad por varianza.
-
Si tareas son cortas: usa
chunksize
50–200 o procesa lotes de ese tamaño dentro del worker. -
No intentes sobreescribir
WorkID
interno: mejor aplica los patrones deSemaphore
/cola acotada; es estable y portable.
Si quieres, dime cuánto tarda de media una tarea individual y si es CPU puro o mezcla E/S, y te propongo MAX_IN_FLIGHT
y chunksize
más finos.
Prompt:
No quiero sobreescribir WorkID interno, sino la queue interna _work_id para que en vez de ser unlimited, tenga un limite máximo, así cuando submit() haga push, si dicho queue está completo, submit esperaría y el hilo que llama a submit se bloquearía, actuando de backpressure
Response:
Pensado
Tiene sentido lo que buscas (que submit()
se bloquee cuando haya demasiados trabajos en cola), pero en ProcessPoolExecutor
eso no se puede conseguir de forma estable cambiando la cola interna:
- Lo que llamas _work_id no es una cola pública; internamente hay varias estructuras privadas (
_pending_work_items
,call_queue
, etc.) y, en versiones recientes, la cola de envíos es unmultiprocessing.SimpleQueue
sin límite creado dentro de hilos/funciones privadas. No hay un “hook” soportado para reemplazarla conQueue(maxsize=...)
sin tocar código interno (frágil entre versiones y plataformas).
La forma segura de obtener el mismo efecto (backpressure en submit
) es envolver el ejecutor y bloquear justo antes de llamar a submit()
usando un semáforo que represente “capacidad restante”. Es funcionalmente equivalente a tener una cola acotada y, a diferencia de tocar lo interno, es estable y portable.
Aquí tienes un “drop-in” que bloquea en submit()
:
from concurrent.futures import ProcessPoolExecutor, Future
from threading import Semaphore
from typing import Any, Callable
class BoundedProcessPool:
def __init__(self, max_workers: int, max_in_flight: int):
self._ex = ProcessPoolExecutor(max_workers=max_workers)
self._sem = Semaphore(max_in_flight)
@property
def _executor(self):
return self._ex
def submit(self, fn: Callable, *args, **kwargs) -> Future:
# Bloquea aquí igual que una Queue(maxsize) haría en put()
self._sem.acquire()
fut = self._ex.submit(fn, *args, **kwargs)
# Cuando termina, liberamos capacidad
fut.add_done_callback(lambda _: self._sem.release())
return fut
# Passthroughs útiles
def map(self, *args, **kwargs):
return self._ex.map(*args, **kwargs)
def shutdown(self, wait=True, cancel_futures=False):
return self._ex.shutdown(wait=wait, cancel_futures=cancel_futures)
¿Qué valor poner?
Para CPU-bound con 8 procesos:
- Empieza con
max_in_flight = 4 × workers
⇒ 32. - Si hay mucha varianza en duración de tareas, sube a 6–8× (48–64).
- Evita cientos/miles: solo aumentas memoria y colas internas sin ganar throughput.
Esto te da exactamente el comportamiento que describes: si hay 32–64 “futures” en vuelo, submit()
se bloquea y ejerces backpressure sobre esos >100 hilos productores.
Alternativas/afinados
-
Executor.map(..., chunksize=K)
: si tus tareas son muy cortas, subechunksize
(p. ej. 50–200) para amortizar el overhead de IPC/pickling y reducir el ritmo de envíos. -
Feeder con
Queue(maxsize=M)
: un hilo único que hacesubmit()
desde unaQueue
acotada también te da bloqueo enput()
y centraliza métricas/reintentos.
Nota: intentar sustituir la cola interna del
ProcessPoolExecutor
(p. ej., cambiar unSimpleQueue
porQueue(maxsize)
) implica reimplementar o “monkey-patch” de clases privadas y puede romperse con una actualización menor de Python o cambiar de plataforma. El wrapper de arriba te da la misma semántica con cero deuda técnica.
Si me dices la latencia media de cada tarea y su dispersión (p50/p95) te sugiero un max_in_flight
y, si procede, un chunksize
todavía más ajustados para tu caso (decenas de miles de ítems, 8 procesos, 100+ productores).
Prompt:
El código fuente de ProcessPoolExecutor está en https://github.com/python/cpython/blob/3.13/Lib/concurrent/futures/process.py. Sigo pensando que lo más sencillo es sobreescribir la queue _work_ids, quizás en una clase hija. De hecho es la última instrucción del constructor, así que se puede cambiar en el constructor de la clase hija, y nadie tendría ninguna referencia al queue original, e incluso se puede comprobar su existencia para detectar cambios en la implementación. Puedo usar _max_workers para calcular el tamaño del nuevo queue de _work_ids, pero no sé qué valor usar. Veo que está usando el número de CPUs + 1 en la cola interna de calls ids, no he visto si bloquea, pero si lo hace, a la API publica no le afecta
Response:
Pensado
¡Totalmente viable! En 3.13 ProcessPoolExecutor
crea _work_ids = queue.Queue()
sin límite y el submit()
mete ahí el id de trabajo; un queue manager thread lo va vaciando hacia _call_queue
, que sí tiene límite _max_workers + EXTRA_QUEUED_CALLS
(y EXTRA_QUEUED_CALLS = 1
). Por tanto, si sustituyes _work_ids
por una Queue(maxsize=K)
, submit()
bloqueará cuando haya K ids pendientes, que es justo el backpressure que buscas. Esto se ve en el propio (derivado) fuente: creación de colas y el submit()
que hace self._work_ids.put(self._queue_count)
; además, el queue manager se arranca después y recibe self._work_ids
por parámetro, así que si lo reemplazas en __init__
antes de arrancarlo, no hay referencias colgantes. natashabatalha.github.io
¿Qué valor poner para maxsize=K
?
Piensa que el throughput lo limita _call_queue
≈ workers + 1
(porque una vez ahí ya no se puede cancelar), y _work_ids
es un buffer adicional que solo añade memoria y latencia si es muy grande. Reglas prácticas para CPU-bound:
-
Base:
K = 4 × workers
→ con 8 procesos, K = 32. - Alta varianza de tiempos (colas desbalanceadas): sube a 6–8× → K = 48–64.
-
Evita cientos/miles: no aumentan el throughput (el cuello es
call_queue
) y sí engordanpending_work_items
consumiendo RAM. - Si tus tareas son muy cortas, además de limitar K, usa
map(..., chunksize=...)
para que cada envío valga la pena y no inundes las colas con micro-tareas. La doc demap(chunksize)
lo recomienda explícitamente. Python documentation
Implementación segura (subclase)
import queue
from concurrent.futures import ProcessPoolExecutor
class BoundedWorkIdsProcessPool(ProcessPoolExecutor):
def __init__(self, max_workers=None, *, max_work_ids=None, **kw):
super().__init__(max_workers=max_workers, **kw)
# Asegúrate de reemplazar ANTES de que arranque el queue manager
# (aún no ha llamado a _start_queue_management_thread en __init__).
if max_work_ids is None:
# Regla por defecto: 4× procesos
max_work_ids = self._max_workers * 4
# Comprobación defensiva: en 3.13 _work_ids es queue.Queue()
if not isinstance(self._work_ids, queue.Queue):
raise RuntimeError("Impl. inesperada: _work_ids no es queue.Queue")
self._work_ids = queue.Queue(maxsize=max_work_ids)
Por qué funciona: en 3.13 el constructor crea:
self._call_queue = _SafeQueue(max_size=self._max_workers + EXTRA_QUEUED_CALLS, ...)
self._result_queue = mp_context.SimpleQueue()
-
self._work_ids = queue.Queue()
(sin límite por defecto)
y más tarde el queue manager se arranca conargs=(..., self._work_ids, self._call_queue, ...)
. Cambiar_work_ids
en tu subclase antes de que arranque ese hilo hace que todo el pool use tu cola acotada. natashabatalha.github.io+1
Cálculo rápido del máximo de “aceptados” antes de bloquear
Con workers = W
, call_queue ≈ W+1
y tu K
para _work_ids
, el número máximo de futures aceptados (pendientes + en vuelo) antes de que submit()
bloquee es del orden de K + (W+1) + W
(ids en _work_ids
+ elementos en _call_queue
+ tareas corriendo). Por eso K moderado (32–64) limita memoria sin afectar throughput, ya que el cuello de botella real es W
y la cola de llamadas W+1
. natashabatalha.github.io
Extra: por qué no más grande
El propio comentario del fuente mantiene _call_queue
pequeña porque lo que entra ahí ya no se puede cancelar; subir demasiado K aumenta latencia y memoria sin rendimiento adicional. natashabatalha.github.io
Si me dices p50/p95 de duración de tarea, te afino K (y, si procede, un chunksize
) para tus “decenas de miles” con 8 procesos y >100 productores.
Prompt:
Suelen ejecutarse 5-10 tareas por segundo, a veces incluso menos
Response:
Pensado
Perfecto: con ese ritmo (≈ 5–10 tareas/seg en total y 8 workers), tus tareas duran en media ~0.8–1.6 s. Para tareas “largas” así, no necesitas un búfer grande: un límite pequeño mantiene la latencia baja sin estrangular el throughput.
Recomendación de maxsize
para _work_ids
-
Base:
K = 2 × W
→ con W=8, K = 16. - Si ves ociosidad por varianza (algunas tareas tardan bastante más): sube a K = 3 × W ( 24 ).
-
Tope razonable: K = 4 × W ( 32 ). Más de eso sólo añade memoria y latencia, no más rendimiento (el cuello sigue siendo
W
y_call_queue ≈ W+1
).
Intuición (tiempo de espera en cola)
Antes de bloquear submit()
, podrás acumular aprox.
K + (W+1)
llamadas encoladas + W
en ejecución.
Con K=16: total ≈ 33 pendientes/ejecución. Si procesas 5–10/s, esa cola extra representa ~2–6 s de espera máxima antes de que una nueva tarea empiece a ejecutarse. Si te parece bien ese “horizonte”, K=16–24 es perfecto; si quieres aún menos espera, baja a 12.
chunksize
Como cada tarea dura ~1 s, déjalo en 1 (o 2–4 si de verdad amortiza algo de setup). El batching grande ayuda cuando las tareas son de milisegundos, no aquí.
Notas prácticas para evitar picos de memoria
- No acumules todos los
Future
en una lista enorme. Consume conas_completed()
y libera referencias pronto. - Si tienes >100 productores, tu enfoque de cola acotada en
_work_ids
o el wrapper conSemaphore
son equivalentes en semántica de backpressure; elige la que prefieras, pero mantén K en 16–24 para empezar y observa métricas (ocupación, ociosidad, p95 de latencia).
¿Mi sugerencia concreta para tu máquina (8 CPUs, 5–10/s)?
Empieza con K = 16. Si detectas ociosidad (CPU < 90%) por varianza, prueba K = 24. Mantente por debajo de 32.
Prompt:
¿Qué valor es adecuado según el número de CPUs? Por no poner un valor fijo
Response:
Pensado
Si quieres que dependa solo de los CPUs (workers), usa esta regla:
-
Por defecto (CPU-bound, tareas ~0.5–2 s):
K = 2 × W
-
Si hay varianza notable en duración:
K = 3 × W
-
Límite superior razonable:
K ≤ 4 × W
Donde W
= max_workers
(normalmente nº de CPUs).
Por qué
El throughput real lo fija el pool (W tareas en ejecución) y la _call_queue
(~W+1). _work_ids
solo debe ser un buffer pequeño para absorber rachas desde muchos productores; más allá de 4×W solo añades memoria y latencia sin ganar rendimiento.
Tabla rápida
CPUs (W) | K (base = 2×W) | K (varianza = 3×W) | K (tope = 4×W) |
---|---|---|---|
4 | 8 | 12 | 16 |
8 | 16 | 24 | 32 |
16 | 32 | 48 | 64 |
32 | 64 | 96 | 128 |
Si quieres afinar un pelín (sin métricas)
- Tareas más largas (≥2 s): tiende a
K = 2×W
. - Tareas irregulares (colas desbalanceadas): sube a
3×W
. - Tareas muy cortas (<<0.2 s): mejor aumentar
chunksize
que K; si aun así quieres buffer,K = 6–8×W
.
Con tus cifras (8 CPUs, 5–10 tareas/s → ~1 s por tarea):
Empieza con K = 16
, si ves ociosidad por varianza pasa a K = 24
, y no superes 32.
Prompt:
class queue.Queue(maxsize=0) Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
¿que nombre queda mejor para la clase, LimitedProcessPoolExecutor o BoundedProcessPoolExecutor? ¿Alguna otra alternativa?
Response:
Muy buena pregunta 🙂. Ambos nombres son entendibles, pero transmiten matices distintos:
-
BoundedProcessPoolExecutor
→ Se inspira en la jerga de colas en Python/Java (queue.Queue
tiene bounded queue).
→ Indica que ahora el ejecutor tiene un límite superior en tareas pendientes.
→ Suena más técnico y “canon”. -
LimitedProcessPoolExecutor
→ Se entiende rápido en castellano/inglés: “ejecutor limitado”.
→ Pero es más ambiguo: ¿limitado en qué? ¿workers, memoria, tareas?
Alternativas claras
-
BlockingProcessPoolExecutor
→ enfatiza quesubmit()
bloquea al alcanzar el límite. -
ThrottledProcessPoolExecutor
→ resalta la idea de control de caudal/backpressure. -
BackpressuredProcessPoolExecutor
→ muy explícito, aunque un poco largo. -
CappedProcessPoolExecutor
→ variante corta de “bounded/limited”.
Recomendación
Si lo que quieres es alinear con la terminología de colas (queue.Queue(maxsize)
→ bounded queue), el nombre más limpio y familiar es:
👉 BoundedProcessPoolExecutor
Transmite que es un ProcessPoolExecutor
pero con cola acotada, y cualquiera que haya usado queue.Queue
lo asociará enseguida.
¿Quieres que te muestre un ejemplo de docstring corto para esa clase, que explique claramente el comportamiento de “submit bloqueante al alcanzar maxsize”?
Prompt:
descarta BackpressuredProcessPoolExecutor y CappedProcessPoolExecutor
Response:
Perfecto 👍, entonces descartamos BackpressuredProcessPoolExecutor y CappedProcessPoolExecutor.
Con lo que te queda en la mesa:
-
BoundedProcessPoolExecutor
✔️ Más técnico, consistente conqueue.Queue(maxsize=…)
(bounded queue).
✔️ Comunica con precisión: es un pool igual que el estándar, pero con cola acotada.
✔️ Es el término más familiar para quien venga de Python/Java. -
LimitedProcessPoolExecutor
✔️ Claro y legible.
❌ Ambiguo: no dice qué está limitado (workers, memoria, tareas…). -
BlockingProcessPoolExecutor
✔️ Pone el foco en el efecto observable:submit()
se bloquea si la cola está llena.
❌ Describe el síntoma más que la causa; puede sonar demasiado genérico. -
ThrottledProcessPoolExecutor
✔️ Evoca control de caudal.
❌ Menos usado en Python estándar, más en entornos de redes / APIs.
👉 Mi recomendación fuerte es BoundedProcessPoolExecutor
, porque:
- Se alinea con la terminología estándar de estructuras de datos en Python.
- Es claro para cualquiera que haya leído el docstring de
queue.Queue
. - Evita la ambigüedad de “Limited” y es más conciso que “Throttled/Blocking”.
¿Quieres que te prepare un docstring para esa clase que explique explícitamente el nuevo comportamiento de submit()
(bloqueante al alcanzar el límite), de manera similar a como lo documenta queue.Queue
?
Prompt:
descarta Limited
Response:
Perfecto 🚀, entonces nos quedamos con:
BoundedProcessPoolExecutor
BlockingProcessPoolExecutor
ThrottledProcessPoolExecutor
De esos tres, el más claro y canónico en el ecosistema Python es BoundedProcessPoolExecutor
, porque conecta directamente con la idea de bounded queue (queue.Queue(maxsize=...)
).
BlockingProcessPoolExecutor
también es válido si quieres resaltar el efecto visible (que submit()
bloquea), pero es menos estándar.
ThrottledProcessPoolExecutor
es más “de redes” y menos reconocible en contexto de colas.
👉 Mi recomendación final: usa BoundedProcessPoolExecutor
.
¿Quieres que te redacte un docstring detallado para esa clase, al estilo de la librería estándar, explicando la semántica de maxsize
y el comportamiento bloqueante de submit()
?
Prompt:
descarta Throttled
Response:
Perfecto 🙌, entonces la terna queda reducida a:
BoundedProcessPoolExecutor
BlockingProcessPoolExecutor
De esos dos:
-
BoundedProcessPoolExecutor
→ el nombre más canónico en Python, porque refleja la semántica de una cola bounded (igual quequeue.Queue(maxsize)
), y cualquiera que conozca esa API lo va a asociar al instante. -
BlockingProcessPoolExecutor
→ pone el foco en el efecto observable (submit()
bloquea), pero es más genérico y menos alineado con la terminología estándar.
👉 Mi recomendación final: quédate con BoundedProcessPoolExecutor
.
¿Quieres que te prepare directamente el docstring y el snippet de clase final con ese nombre, listo para usar como drop-in replacement de ProcessPoolExecutor
?
Prompt:
¿cual suena mejor para alguien que no sepa que internamente hay un queue?
Response:
Muy buena observación 👌.
Si piensas en usuarios que no saben ni les importa que dentro haya una queue
, lo que entienden es “puedo enviar trabajos, pero si hay demasiados, submit()
se bloquea hasta que haya hueco”.
-
BoundedProcessPoolExecutor
: suena correcto para quien ya conoce la jerga (bounded queue), pero a alguien “ajeno” le puede sonar técnico/abstracto. -
BlockingProcessPoolExecutor
: comunica directamente el efecto observable — “esto bloquea si no hay capacidad”. Aunque no sepas nada de colas internas, entiendes lo esencial.
👉 Para un público no experto en internals, BlockingProcessPoolExecutor
suele sonar más natural y explicativo.
👉 Para un público más técnico o acostumbrado a Python stdlib, BoundedProcessPoolExecutor
es la opción “de casa”.
¿Quieres que te proponga un nombre “intermedio” que comunique claramente a ambos perfiles, algo como BlockingBoundedProcessPoolExecutor
(aunque sea un poco largo), o prefieres elegir entre claridad para todos (Blocking) y consistencia con la terminología estándar (Bounded)?
Prompt:
muestrame alguna alternativa mas, de cara a hacer una propuesta para incorporarlo en la libreria estandar de Python. Igualmente, ¿mejor una clase hija, o añadir un parametro a ProcessPoolExecutor?
Response:
Pensado
¡Buena idea plantearlo “como si fuera para la stdlib”! Te dejo (1) alternativas de nombre “presentables” y (2) una propuesta de API que encajaría mejor en concurrent.futures
que introducir una clase nueva.
Nombres alternativos (para una clase o para el concepto)
Ordenados del más claro para no iniciados → más “canónico” para iniciados:
-
BlockingProcessPoolExecutor — comunica el efecto visible:
submit()
bloquea al llenarse la capacidad. - BoundedProcessPoolExecutor — alinea con la jerga de bounded queue; estándar para gente “stdlib”.
- CapacityLimitedProcessPoolExecutor — explícito en que limitas capacidad de envíos.
- FlowControlledProcessPoolExecutor — evoca control de flujo / backpressure, sin detalles de implementación.
- LimitedBacklogProcessPoolExecutor — pone foco en el backlog máximo de envíos.
Si el objetivo es la stdlib, yo priorizaría Blocking… (claridad para el 100% de usuarios) o Bounded… (consistencia terminológica). Entre ambas, “Bounded” es más stdlib-like.
¿Nueva clase o nuevo parámetro?
Recomiendo añadir un parámetro al constructor de ProcessPoolExecutor
(y, idealmente, a ThreadPoolExecutor
para simetría). Razones:
- Evitas proliferar tipos y type checks en código existente.
- La funcionalidad es un detalle de política de envío, no un tipo nuevo de ejecutor.
- Es descubrible (aparece en el
help()
y docstring del mismo ejecutor que todos ya usan). - Mantienes compatibilidad total: por defecto, el comportamiento actual (cola sin límite) no cambia.
Propuesta concreta de API
Nuevos parámetros (constructor)
-
max_submit_backlog: int | None = None
Límite superior de “trabajos aceptados pero no iniciados” (pendientes en_work_ids
+ en la_call_queue
).-
None
→ como hoy: ilimitado. -
>= 0
→ límite efectivo; cuando se alcance,submit()
bloquea hasta que haya hueco.
-
-
submit_backlog_timeout: float | None = None
(opcional, se puede dejar para una segunda iteración)-
None
→ bloquea indefinidamente. -
>= 0
→ si no hay hueco antes detimeout
,submit()
lanzaTimeoutError
(o una excepción específica, ver abajo).
-
Alternativamente, si no se quiere introducir bloqueo por defecto cuando hay límite, podría añadirse:
block_on_submit: bool = True
y, si esFalse
,submit()
lanzaríaRejectedSubmission
(oqueue.Full
) al exceder el límite. Pero añadir una nueva excepción públicaconcurrent.futures.RejectedSubmission
es más trabajo de API; empezar con bloqueo es más simple y útil en la mayoría de casos.
Semántica documentable (breve)
- “Cuando
max_submit_backlog
es un entero positivo,submit()
se bloquea si el número de trabajos pendientes (aún no iniciados) alcanza ese límite. Se desbloquea cuando el queue manager consume un id de trabajo o cuando un worker libera capacidad. Este mecanismo proporciona backpressure a los productores y acota el uso de memoria.”
Elegir el nombre del parámetro
Opciones claras y “stdlib-friendly”:
-
max_submit_backlog
(mi favorita: precisa y corta) -
max_pending_tasks
/max_pending_calls
-
max_queued_tasks
(ligeramente más acoplada al detalle “queue”)
Para la stdlib, max_submit_backlog
describe exactamente lo que el usuario percibe (cuántos submit pueden “acumularse”).
Valores por defecto y compatibilidad
-
max_submit_backlog=None
preserva al 100% el comportamiento actual. - Si se introduce
submit_backlog_timeout
, suNone
mantiene bloqueo ilimitado;0.0
permitiría un modo “no bloquear, falla inmediato”.
¿Y una clase hija?
Se podría aceptar un alias tipo BoundedProcessPoolExecutor
que simplemente pase max_submit_backlog
al padre. Pero no es necesario y añade mantenimiento. Si te piden una “clase ejemplo” para el “How-To”, se puede incluir en la documentación como recipe.
Resumen de propuesta para llevar a discusión
-
API: añadir
max_submit_backlog: int | None = None
(y opcionalsubmit_backlog_timeout: float | None = None
) aProcessPoolExecutor
(y aThreadPoolExecutor
por simetría). -
Comportamiento: cuando el backlog alcanza el límite,
submit()
bloquea hasta que haya capacidad (o hastatimeout
). -
Nombres: si hubiera una clase receptáculo en ejemplos,
BoundedProcessPoolExecutor
oBlockingProcessPoolExecutor
; para el parámetro,max_submit_backlog
. - Rationale: control de memoria/latencia, backpressure nativo, sin cambios para quien no lo use.
Si quieres, te escribo un bloque de docstring listo para PR y un pequeño test (tipo test_concurrent_futures
) que verifique que submit()
bloquea cuando se alcanza el límite y se desbloquea al completarse una tarea.
Prompt:
¿y si quiero que calcule max_submit_backlog automaticamente? ¿quizas max_submit_backlog=True?
Response:
¡Muy buena idea! 🚦 Tener un valor “automático” evita obligar al usuario a calcular el backlog ideal según CPUs.
En la stdlib esto suele resolverse con:
-
None
→ ilimitado (como hoy). - Un entero
>0
→ límite fijo. - Un valor especial (ej.
True
, o una constante tipoAUTO
) → cálculo automático.
Diseño posible
def __init__(self, max_workers=None, *, max_submit_backlog=None, ...):
...
if max_submit_backlog is None:
self._max_submit_backlog = math.inf # ilimitado, actual
elif max_submit_backlog is True:
# heurística automática
self._max_submit_backlog = self._max_workers * 2
else:
self._max_submit_backlog = int(max_submit_backlog)
Semántica
-
None
(default): comportamiento histórico → sin límite. -
int > 0
: backlog máximo explícito. -
True
: usa una heurística automática en función demax_workers
.
¿Qué heurística usar?
Para tareas de segundos (como en tu caso, 5–10/s con 8 CPUs), un backlog de 2–3× max_workers
suele dar buen balance (evita ociosidad sin inflar la memoria).
→ max_submit_backlog = 2 * max_workers
es simple y razonable.
→ Si se quisiera más sofisticado, se podría exponer en docstring: “El valor automático usa 2 × max_workers
, lo que mantiene un pequeño colchón de tareas pendientes sin permitir que crezca ilimitadamente”.
Alternativas de API más “stdlib-like”
En vez de True
, Python a veces usa sentinelas con constantes públicas:
from concurrent.futures import AUTO_BACKLOG
ProcessPoolExecutor(max_submit_backlog=AUTO_BACKLOG)
O incluso un Ellipsis
(...
), aunque es menos claro.
Pero True
es muy natural para el usuario (“quiero que se autocalcule”).
Ejemplo de docstring (extracto)
max_submit_backlog (int | bool | None, default=None)
Máximo número de tareas pendientes (aceptadas pero aún no iniciadas).
- None (por defecto): sin límite (comportamiento actual).
- int > 0: límite fijo; ``submit()`` se bloquea si se alcanza.
- True: calcula automáticamente un límite basado en max_workers
(actualmente ``2 × max_workers``).
👉 Conclusión: sí, max_submit_backlog=True
es una forma clara y práctica de pedir heurística automática. Para la stdlib probablemente se discutiría si usar True
o una constante como AUTO
por legibilidad, pero el mecanismo es sólido.
¿Quieres que te bosqueje un PR-style cambio a la signatura de ProcessPoolExecutor.__init__
mostrando cómo quedaría con max_submit_backlog
y el cálculo automático con True
?
Prompt:
¿porque estas parseando el int?
Response:
Tienes toda la razón 👌, si pensamos en cómo se diseña la stdlib, no sería necesario “parsear” explícitamente un int
.
En mi pseudocódigo anterior puse algo tipo:
elif max_submit_backlog is True:
self._max_submit_backlog = self._max_workers * 2
else:
self._max_submit_backlog = int(max_submit_backlog)
…pero eso es un exceso: si el usuario pasa ya un int
, basta con aceptarlo tal cual (y quizás validar que sea > 0
). La conversión a int()
no es idiomática ni en ThreadPoolExecutor
ni en ProcessPoolExecutor
, donde los parámetros se documentan como enteros y si alguien pasa un float
u otro tipo, lo normal es dejar que Python rompa solo.
Lo más natural sería:
if max_submit_backlog is None:
self._max_submit_backlog = math.inf # ilimitado
elif max_submit_backlog is True:
self._max_submit_backlog = self._max_workers * 2
elif isinstance(max_submit_backlog, int) and max_submit_backlog > 0:
self._max_submit_backlog = max_submit_backlog
else:
raise ValueError("max_submit_backlog must be None, True, or a positive int")
Ventajas de este enfoque
- No intentamos “convertir” tipos, como
int("10")
oint(3.5)
(que aceptaría 3 silenciosamente). - La semántica queda clara y estricta (igual que con otros parámetros de la stdlib).
- La excepción informativa
ValueError
es el camino habitual en la biblioteca estándar cuando se pasa un tipo inesperado.
👉 En resumen: no conviene “parsear” el int
. Basta con aceptar enteros positivos, None
o True
. Eso es lo más coherente con el estilo de la librería estándar.
¿Quieres que te prepare un fragmento de código al estilo PR en CPython, con la signatura modificada, el bloque de validación como arriba, y un docstring acorde?
Prompt:
¿la stdlib incorpora anotaciones de tipos?
Response:
Sí ✅.
Desde Python 3.9 en adelante, casi todos los módulos grandes de la stdlib han ido incorporando type annotations (PEP 484 / PEP 585 / PEP 604 / PEP 563 / PEP 649). En el caso concreto de concurrent.futures
, ya tiene stubs bastante completos en typeshed, y en las versiones recientes de CPython (3.11+), el código fuente mismo incluye anotaciones en varios métodos públicos.
Por ejemplo, en CPython 3.13 (lo que enlazabas antes):
class Executor:
def submit(self, fn: Callable[..., _T], /, *args: object, **kwargs: object) -> Future[_T]: ...
def map(self, func: Callable[..., _T], *iterables: Iterable[object],
timeout: float | None = None,
chunksize: int = 1) -> Iterator[_T]: ...
Es decir, usan PEP 604 unions (float | None
), Callable
, Iterable
, genéricos en Future[_T]
, etc.
Estilo en la stdlib
- Parámetros nuevos: si se añaden, deben venir ya con anotación de tipo.
- Si un parámetro puede aceptar un valor centinela (
None
,True
…), se anota con una unión explícita.
Ejemplo:(aquímax_submit_backlog: int | Literal[True] | None = None
Literal[True]
para documentar el “modo automático”). - También es común usar
bool | int | None
si no se quiere forzarLiteral
.
👉 Así que sí, si propones añadir max_submit_backlog
, lo correcto sería anotarlo en la signatura pública.
¿Quieres que te arme un borrador de signatura de ProcessPoolExecutor.__init__
con esa anotación incluida (int | Literal[True] | None
) y el docstring al estilo de la stdlib?
Prompt:
¿porque usar Literal[True] y no simplemente True?
Response:
Muy buena observación 👌.
En anotaciones de tipo, True
(a secas) no significa “el valor literal booleano True
”, sino el tipo bool
. Es decir:
def f(x: True): ...
…no está permitido como “tipo”; te dará un error de sintaxis.
Para poder expresar que un parámetro acepta exactamente el valor literal True
(y no False
, ni cualquier otro bool
), la forma canónica desde PEP 586 es:
from typing import Literal
def f(x: Literal[True]) -> None: ...
Eso comunica al type checker (mypy, pyright, etc.) que el único valor booleano aceptado es True
.
Ejemplo aplicado
En tu caso, si el API quiere aceptar:
-
None
→ ilimitado (comportamiento histórico) -
int > 0
→ límite fijo -
True
→ cálculo automático
La anotación sería:
from typing import Literal
def __init__(self,
max_workers: int | None = None,
*,
max_submit_backlog: int | Literal[True] | None = None,
**kwargs) -> None: ...
Alternativas
- Podrías declarar simplemente
int | bool | None
, pero ahí un type checker aceptaría cualquierbool
, incluyendoFalse
. Eso sería menos preciso. - Usar
Literal[True]
comunica claramente la intención: soloTrue
es válido como centinela.
👉 Por eso, en la stdlib o en typeshed, cuando se quiere distinguir un True
especial, se usa Literal[True]
.
¿Quieres que te muestre cómo quedaría la sección del docstring de ProcessPoolExecutor.__init__
con esa anotación y la explicación de los tres modos (None
, int
, True
)?
Prompt:
bounded_process_pool_executor.py
Python
esta es la version final del modulo, funciona exactamente como yo queria, pausando los hilos producers cuando el ProcessPoolExecutor esta saturado. La ejecucion tiene algunos parones por esta razon, y el uso del disco duro no es continuado creo que por lo mismo, pero ya la memoria no crece descontroladamente y se ejecuta la aplicacion sin problemas
Response:
Acabo de revisar tu módulo bounded_process_pool_executor.py
y está muy limpio y bien planteado ✅.
Qué has conseguido
-
Límite configurable en la cola de envíos (
max_queue_size
), con tres modos:-
None
→ ilimitado (igual queProcessPoolExecutor
original). -
True
→ se calcula automáticamente como2 × max_workers
. -
int > 0
→ límite fijo definido por el usuario.
-
- Implementado con un
Semaphore
que bloquea ensubmit()
cuando se alcanza el máximo, liberándose al completarse cadaFuture
. -
cast(int, self._max_workers)
es correcto aquí porque_max_workers
siempre se inicializa en elsuper().__init__
. - Anotaciones de tipo claras con
Optional[int | Literal[True]]
.
Efectos prácticos
- Ahora tus productores se pausan cuando el pool está saturado → lograste el backpressure real.
- Eso explica que veas “parones” en la ejecución y que el acceso a disco no sea continuo: los productores se frenan hasta que el pool libera hueco.
-
Memoria estable: al no poder apilar decenas de miles de
Future
pendientes, la RAM deja de crecer sin control.
Pequeñas mejoras opcionales
-
Nombre del parámetro: si lo quieres proponer para stdlib, quizá renombrarlo a algo más “neutral” como
max_submit_backlog
(describe mejor lo que limita). -
Validación explícita: en vez de
assert max_queue_size > 0
, usarif max_queue_size <= 0: raise ValueError(...)
(más idiomático stdlib). -
Docstring más detallado: aclarar que cuando se alcanza el límite,
submit()
se bloquea hasta que alguna tarea termine.
Ejemplo de docstring tipo stdlib:
"""
:param max_queue_size: Máximo de tareas pendientes permitidas.
- None (default): sin límite, como ProcessPoolExecutor original.
- int > 0: límite fijo; ``submit()`` se bloquea si se alcanza.
- True: calcula un límite automático (actualmente 2 × max_workers).
"""
En resumen: lo que tienes ya funciona exactamente como debe y resuelve tu problema de saturación 👍.
Si lo quieres pulir de cara a un PR a la stdlib, la clave sería renombrar el parámetro a algo más genérico (max_submit_backlog
) y añadir validación + docstring formal.
¿Quieres que te arme un ejemplo de cómo quedaría exactamente la signatura y docstring de ProcessPoolExecutor.__init__
con este nuevo argumento, al estilo de un PR oficial de CPython?
Prompt:
al final no lo voy a proponer para que lo incluyan en la libreria estandar de python, no es tan sencillo como añadir un limite a la cola (lo seria si se refactorizaran mas aspectos, como eliminar el _work_id y meter directamente la tarea en la cola), y tampoco es tan complicado como para incluirlo como una clase hija, mejor implementarlo ad-hoc, o como mucho en una libreria externa. Resume la conversacion en un articulo para mi blog en ingles. Sigue implicitamente la metodologia STAR, pero que no se note en los titulos de las secciones: cuenta el problema, las alternativas estudiadas, como se han calculado los valores por defecto, en que caso usar cada uno, que se ha hecho y como, y el resultado final. No olvides incluir el codigo final, añadiendo tus sugerencias
Response:
Here’s a draft for your blog article, written as a narrative case study (implicitly following STAR). I included the background, exploration of alternatives, design decisions, the final implementation (with my suggested refinements), and outcomes.
Adding Backpressure to Python’s ProcessPoolExecutor
Some time ago I hit a practical limitation with Python’s ProcessPoolExecutor
: when feeding it tens of thousands of tasks from hundreds of producer threads, the executor happily accepted them all. The result? Memory usage ballooned, latency increased, and eventually the whole system became unstable.
The root issue: by default, ProcessPoolExecutor
accepts an unbounded number of submissions. The internal queue of work IDs grows without limit, and since every submit()
creates a Future
, you can easily flood memory if your producers are much faster than the pool can drain tasks.
I needed backpressure: a way to make submit()
block when the executor is saturated, so producers naturally pause until capacity frees up.
Options Considered
Several strategies came up during exploration:
-
Monkey-patching internals
Replace the executor’s private_work_ids
queue with a bounded one. Technically possible (the last field initialized in the constructor), but fragile across Python versions. -
External semaphore
Wrap the executor in a class that holds aSemaphore
. Eachsubmit()
acquires a slot; each task completion releases it. This approach is stable and easy to customize. -
Bounded queue feeder
Insert aQueue(maxsize=N)
between producers and a dedicated feeder thread that callssubmit()
. Producers block onput()
when the queue is full. -
Batching
Increase the task granularity (usingchunksize
inmap()
or by grouping tasks manually). This reduces overhead but doesn’t directly solve the memory growth problem.
Ultimately, I wanted something drop-in compatible with the existing executor API. That meant subclassing with a simple extension.
Choosing Defaults
The key design choice is how large the backlog of pending tasks should be before producers block. The number should be proportional to the number of workers:
- 2 × workers: a balanced default for CPU-bound tasks of ~1s duration.
- 3 × workers: useful when task durations vary a lot.
- 4 × workers: a practical upper bound; beyond that, memory and latency increase without improving throughput.
Example: with 8 CPU workers and tasks taking around one second, max_queue_size=16
keeps memory stable and ensures low latency. If variance is high, bump it to 24. Rarely is more than 32 necessary.
The Implementation
Here is the final class I ended up using. It subclasses ProcessPoolExecutor
and adds a bounded queue via a semaphore. The design includes type annotations (as in the standard library), a clear docstring, and stricter validation:
from concurrent.futures import ProcessPoolExecutor, Future
from threading import Semaphore
from typing import Literal, Optional, cast, Callable, Any
class BoundedProcessPoolExecutor(ProcessPoolExecutor):
"""
A ProcessPoolExecutor with a bounded submission queue.
Unlike the standard implementation, `submit()` will block once
the number of pending tasks reaches `max_queue_size`. This provides
natural backpressure to producers and prevents unbounded memory growth.
"""
def __init__(
self,
max_queue_size: Optional[int | Literal[True]] = None,
*args: Any,
**kwargs: Any,
) -> None:
"""
:param max_queue_size:
- None (default): unlimited (same as ProcessPoolExecutor).
- int > 0: maximum number of tasks allowed to be pending.
`submit()` blocks if this limit is reached.
- True: automatic mode, sets the queue size to 2 × max_workers.
"""
super().__init__(*args, **kwargs)
if max_queue_size is None:
self._semaphore: Semaphore | None = None
else:
if max_queue_size is True:
max_queue_size = cast(int, self._max_workers) * 2
if not isinstance(max_queue_size, int) or max_queue_size <= 0:
raise ValueError(
"max_queue_size must be None, True, or a positive int"
)
self._semaphore = Semaphore(max_queue_size)
def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future:
semaphore = self._semaphore
if semaphore:
semaphore.acquire()
future = super().submit(fn, *args, **kwargs)
if semaphore:
future.add_done_callback(lambda f: semaphore.release())
return future
Results
With this subclass in place:
- Producers pause automatically when the pool is full, instead of flooding memory.
-
Memory stays flat, no runaway growth from thousands of pending
Future
objects. - Latency stabilizes, since tasks no longer wait behind massive queues.
- Throughput remains consistent, limited only by the number of workers.
There are visible pauses in producer threads and even some “gaps” in disk activity, but that’s expected: it’s the backpressure mechanism working as intended.
Takeaways
This isn’t a perfect candidate for inclusion in the Python standard library — that would require deeper refactoring (e.g., eliminating the _work_id
indirection). But as an ad-hoc solution or external utility, it works extremely well and is simple to reason about.
If you’re pushing ProcessPoolExecutor
with tens of thousands of tasks, this bounded version is a safe and effective drop-in replacement that keeps your memory in check and your system responsive.
Powered by ChatGPT Exporter