Adding Backpressure to Python’s ProcessPoolExecutor

Adding Backpressure to Python’s ProcessPoolExecutor

User: Jesús Leganés-Combarro (jesus.leganes.combarro@gmail.com)
Created: 2025/9/25 17:42:45
Updated: 2025/10/1 6:52:09
Exported: 2025/10/1 11:30:33
Link: https://chatgpt.com/g/g-p-68d8ee08302c819183dad298118b8161/c/68d562f4-7b40-8332-8b23-fe9db40c297f

Prompt:

Estoy usando un process pool executor de Python y el caso es de que funciona muy bien pero lo estoy saturando de tantas tareas que no es capaz de procesarlas todas. Entonces se satura y empieza a meter trabajos dentro de su cola interna de WorkIDs hasta que termina consumiéndose toda la memoria y cada vez de hecho hay una mayor latencia. He pensado en sobreescribir el WorkID que está utilizando internamente por uno indicando un límite propio para de esa forma cuando se llame a submit si se supera el límite de la cola se bloquean las peticiones y de esa forma pues tenga una especie como de backpressor y de control de congestión. Pero no sé qué valor poner. La opción por defecto sería poner el mismo número en la cola que elementos que están por el número de CPUs pero no sé si eso será muy limitante o si por el contrario lo que haría sería que estuviera ya un único elemento disponible para disponible siempre. Quizás poner el doble el número de CPUs. No sé qué valor podría ser adecuado en este caso. Estamos hablando de decenas de miles de objetos a procesar y están siendo alimentados desde posiblemente más de 100 hilos distintos a un único process pool executor que en mi máquina tiene 8 CPUs, entonces 8 tareas simultáneas.

Response:

Pensado

¡Buena pregunta! Con ProcessPoolExecutor no hay API pública para cambiar el tamaño de su cola interna, y “tocar” WorkItem/WorkQueue por dentro te va a meter en zona frágil. La forma estable de conseguir backpressure es limitar tú mismo cuántos “futures” pueden estar en vuelo (pendientes) a la vez y/o hacer batching (procesar en lotes) para reducir la granularidad.

Aquí tienes patrones que funcionan bien y cómo elegir los números.


  1. Límite de “futures” pendientes (backpressure real) =====================================================

La idea: solo dejas N tareas en vuelo; si N se alcanza, quien llama a submit() se bloquea hasta que alguna termine. Esto evita que la cola interna crezca sin control, incluso con decenas de miles de elementos y >100 productores.

Regla práctica para N (pendientes)
Para 8 CPUs y trabajo CPU-bound:

  • Empieza con N = 4 × nº_workers → con 8 workers: N = 32.
  • Si tu carga tiene varianza alta en tiempos (algunas tareas tardan mucho más), sube a 6–8× (48–64).
  • Evita miles: solo aumenta la latencia y el uso de memoria.

Patrón con Semaphore (seguro con muchos hilos productores):

from concurrent.futures import ProcessPoolExecutor, as_completed
from threading import Semaphore

MAX_WORKERS = 8
MAX_IN_FLIGHT = MAX_WORKERS * 4  # empieza con 32
sem = Semaphore(MAX_IN_FLIGHT)

def submit_with_backpressure(executor, fn, *args, **kwargs):
    sem.acquire()  # se bloquea si ya hay muchos pendientes
    fut = executor.submit(fn, *args, **kwargs)
    # Cuando termine, liberamos un hueco:
    fut.add_done_callback(lambda f: sem.release())
    return fut

# Uso (con muchos productores):
executor = ProcessPoolExecutor(max_workers=MAX_WORKERS)
futures = []
for item in stream_de_decenas_de_miles():
    futures.append(submit_with_backpressure(executor, trabajo_cpu_bound, item))

# Consumir resultados sin acumular en memoria:
for f in as_completed(futures):
    res = f.result()
    consume(res)

Si ya tienes >100 hilos enviando, este patrón es ideal: todos competirán por el Semaphore y no sobrepasarán el umbral.


  1. Un “feeder” con cola acotada ===============================

Otra variante: pones una cola Queue(maxsize=M) entre productores y un único feeder que hace submit(). Es conveniente cuando quieres centralizar métricas/reintentos y tener aún más control.

from queue import Queue
from threading import Thread
from concurrent.futures import ProcessPoolExecutor, as_completed

MAX_WORKERS = 8
MAX_IN_FLIGHT = 32
q = Queue(maxsize=MAX_IN_FLIGHT)

def feeder(q, executor, futures):
    while True:
        item = q.get()
        if item is None: break
        fut = executor.submit(trabajo_cpu_bound, item)
        futures.append(fut)
        q.task_done()

executor = ProcessPoolExecutor(max_workers=MAX_WORKERS)
futures = []
t = Thread(target=feeder, args=(q, executor, futures), daemon=True)
t.start()

# Productores: se bloquearán al hacer put() si la cola está llena
for item in stream_de_decenas_de_miles():
    q.put(item)

# Señal de fin
q.put(None); q.join()

for f in as_completed(futures):
    consume(f.result())

  1. Batching / chunksize (reduce overhead) ===========================================

Si tus tareas son cortas (milisegundos), el overhead de IPC + pickling te mata. Solución:

  • Agrupa varios ítems en un lote y que cada proceso consuma un lote.
  • O usa executor.map(fn, iterable, chunksize=K) (en ProcessPoolExecutor.map el chunksize sí existe).

Reglas para chunksize:

  • Empieza con K = 50–200 si cada ítem es rápido (<10–20 ms).
  • Si cada ítem tarda más (≥100 ms), K pequeño (1–10) suele ir bien.
  • Objetivo: que cada tarea dure 50–500 ms como orden de magnitud; así amortizas overhead sin hacer la cola granítica.

Ejemplo (map con backpressure implícito y chunks):

from concurrent.futures import ProcessPoolExecutor

def procesa_lote(lote):
    return [trabajo_cpu_bound(x) for x in lote]

def lotes(iterable, k):
    buf = []
    for x in iterable:
        buf.append(x)
        if len(buf) == k:
            yield buf; buf = []
    if buf: yield buf

MAX_WORKERS = 8
CHUNK = 100  # ajusta
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as ex:
    for resultados in ex.map(procesa_lote, lotes(stream_de_decenas_de_miles(), CHUNK)):
        for r in resultados:
            consume(r)

Nota: Executor.map ya limita los envíos (no “dispara” infinito a la vez), pero controlar el chunksize es clave para que no se acumulen demasiados “work items” ultracortos.


  1. Otros ajustes que ayudan ===========================
  • maxtasksperchild (si usas multiprocessing.Pool) o reciclar procesos periódicamente si hay crecimiento de memoria por cachés/liberadores lentos. En ProcessPoolExecutor no está expuesto, pero puedes rediseñar para que cada proceso haga “mini-lotes” y evitar fugas/cachés grandes.
  • Evita compartir objetos enormes: pasa IDs/rutas y que el proceso los cargue. El pickling de objetos gigantes infla memoria y latencia.
  • Medir: registra in-flight, latencias por tarea, RSS de procesos, tamaño de cola. Con eso afinas MAX_IN_FLIGHT y chunksize.

Resumen de números para tu caso (8 CPUs, >100 productores, decenas de miles de ítems)

  • max_workers = 8 (ok para CPU-bound).
  • Límite de pendientes: empieza con 32 (4×), sube a 64 si notas ociosidad por varianza.
  • Si tareas son cortas: usa chunksize 50–200 o procesa lotes de ese tamaño dentro del worker.
  • No intentes sobreescribir WorkID interno: mejor aplica los patrones de Semaphore/cola acotada; es estable y portable.

Si quieres, dime cuánto tarda de media una tarea individual y si es CPU puro o mezcla E/S, y te propongo MAX_IN_FLIGHT y chunksize más finos.

Prompt:

No quiero sobreescribir WorkID interno, sino la queue interna _work_id para que en vez de ser unlimited, tenga un limite máximo, así cuando submit() haga push, si dicho queue está completo, submit esperaría y el hilo que llama a submit se bloquearía, actuando de backpressure

Response:

Pensado

Tiene sentido lo que buscas (que submit() se bloquee cuando haya demasiados trabajos en cola), pero en ProcessPoolExecutor eso no se puede conseguir de forma estable cambiando la cola interna:

  • Lo que llamas _work_id no es una cola pública; internamente hay varias estructuras privadas (_pending_work_items, call_queue, etc.) y, en versiones recientes, la cola de envíos es un multiprocessing.SimpleQueue sin límite creado dentro de hilos/funciones privadas. No hay un “hook” soportado para reemplazarla con Queue(maxsize=...) sin tocar código interno (frágil entre versiones y plataformas).

La forma segura de obtener el mismo efecto (backpressure en submit) es envolver el ejecutor y bloquear justo antes de llamar a submit() usando un semáforo que represente “capacidad restante”. Es funcionalmente equivalente a tener una cola acotada y, a diferencia de tocar lo interno, es estable y portable.

Aquí tienes un “drop-in” que bloquea en submit():

from concurrent.futures import ProcessPoolExecutor, Future
from threading import Semaphore
from typing import Any, Callable

class BoundedProcessPool:
    def __init__(self, max_workers: int, max_in_flight: int):
        self._ex = ProcessPoolExecutor(max_workers=max_workers)
        self._sem = Semaphore(max_in_flight)

    @property
    def _executor(self):
        return self._ex

    def submit(self, fn: Callable, *args, **kwargs) -> Future:
        # Bloquea aquí igual que una Queue(maxsize) haría en put()
        self._sem.acquire()
        fut = self._ex.submit(fn, *args, **kwargs)
        # Cuando termina, liberamos capacidad
        fut.add_done_callback(lambda _: self._sem.release())
        return fut

    # Passthroughs útiles
    def map(self, *args, **kwargs):
        return self._ex.map(*args, **kwargs)

    def shutdown(self, wait=True, cancel_futures=False):
        return self._ex.shutdown(wait=wait, cancel_futures=cancel_futures)

¿Qué valor poner?

Para CPU-bound con 8 procesos:

  • Empieza con max_in_flight = 4 × workers ⇒ 32.
  • Si hay mucha varianza en duración de tareas, sube a 6–8× (48–64).
  • Evita cientos/miles: solo aumentas memoria y colas internas sin ganar throughput.

Esto te da exactamente el comportamiento que describes: si hay 32–64 “futures” en vuelo, submit() se bloquea y ejerces backpressure sobre esos >100 hilos productores.

Alternativas/afinados

  • Executor.map(..., chunksize=K): si tus tareas son muy cortas, sube chunksize (p. ej. 50–200) para amortizar el overhead de IPC/pickling y reducir el ritmo de envíos.
  • Feeder con Queue(maxsize=M): un hilo único que hace submit() desde una Queue acotada también te da bloqueo en put() y centraliza métricas/reintentos.

Nota: intentar sustituir la cola interna del ProcessPoolExecutor (p. ej., cambiar un SimpleQueue por Queue(maxsize)) implica reimplementar o “monkey-patch” de clases privadas y puede romperse con una actualización menor de Python o cambiar de plataforma. El wrapper de arriba te da la misma semántica con cero deuda técnica.

Si me dices la latencia media de cada tarea y su dispersión (p50/p95) te sugiero un max_in_flight y, si procede, un chunksize todavía más ajustados para tu caso (decenas de miles de ítems, 8 procesos, 100+ productores).

Prompt:

El código fuente de ProcessPoolExecutor está en https://github.com/python/cpython/blob/3.13/Lib/concurrent/futures/process.py. Sigo pensando que lo más sencillo es sobreescribir la queue _work_ids, quizás en una clase hija. De hecho es la última instrucción del constructor, así que se puede cambiar en el constructor de la clase hija, y nadie tendría ninguna referencia al queue original, e incluso se puede comprobar su existencia para detectar cambios en la implementación. Puedo usar _max_workers para calcular el tamaño del nuevo queue de _work_ids, pero no sé qué valor usar. Veo que está usando el número de CPUs + 1 en la cola interna de calls ids, no he visto si bloquea, pero si lo hace, a la API publica no le afecta

Response:

Pensado

¡Totalmente viable! En 3.13 ProcessPoolExecutor crea _work_ids = queue.Queue() sin límite y el submit() mete ahí el id de trabajo; un queue manager thread lo va vaciando hacia _call_queue, que tiene límite _max_workers + EXTRA_QUEUED_CALLS (y EXTRA_QUEUED_CALLS = 1). Por tanto, si sustituyes _work_ids por una Queue(maxsize=K), submit() bloqueará cuando haya K ids pendientes, que es justo el backpressure que buscas. Esto se ve en el propio (derivado) fuente: creación de colas y el submit() que hace self._work_ids.put(self._queue_count); además, el queue manager se arranca después y recibe self._work_ids por parámetro, así que si lo reemplazas en __init__ antes de arrancarlo, no hay referencias colgantes. natashabatalha.github.io

¿Qué valor poner para maxsize=K?

Piensa que el throughput lo limita _call_queueworkers + 1 (porque una vez ahí ya no se puede cancelar), y _work_ids es un buffer adicional que solo añade memoria y latencia si es muy grande. Reglas prácticas para CPU-bound:

  • Base: K = 4 × workers → con 8 procesos, K = 32.
  • Alta varianza de tiempos (colas desbalanceadas): sube a 6–8×K = 48–64.
  • Evita cientos/miles: no aumentan el throughput (el cuello es call_queue) y sí engordan pending_work_items consumiendo RAM.
  • Si tus tareas son muy cortas, además de limitar K, usa map(..., chunksize=...) para que cada envío valga la pena y no inundes las colas con micro-tareas. La doc de map(chunksize) lo recomienda explícitamente. Python documentation

Implementación segura (subclase)

import queue
from concurrent.futures import ProcessPoolExecutor

class BoundedWorkIdsProcessPool(ProcessPoolExecutor):
    def __init__(self, max_workers=None, *, max_work_ids=None, **kw):
        super().__init__(max_workers=max_workers, **kw)
        # Asegúrate de reemplazar ANTES de que arranque el queue manager
        # (aún no ha llamado a _start_queue_management_thread en __init__).
        if max_work_ids is None:
            # Regla por defecto: 4× procesos
            max_work_ids = self._max_workers * 4
        # Comprobación defensiva: en 3.13 _work_ids es queue.Queue()
        if not isinstance(self._work_ids, queue.Queue):
            raise RuntimeError("Impl. inesperada: _work_ids no es queue.Queue")
        self._work_ids = queue.Queue(maxsize=max_work_ids)

Por qué funciona: en 3.13 el constructor crea:

  • self._call_queue = _SafeQueue(max_size=self._max_workers + EXTRA_QUEUED_CALLS, ...)
  • self._result_queue = mp_context.SimpleQueue()
  • self._work_ids = queue.Queue() (sin límite por defecto)
    y más tarde el queue manager se arranca con args=(..., self._work_ids, self._call_queue, ...). Cambiar _work_ids en tu subclase antes de que arranque ese hilo hace que todo el pool use tu cola acotada. natashabatalha.github.io+1

Cálculo rápido del máximo de “aceptados” antes de bloquear

Con workers = W, call_queue ≈ W+1 y tu K para _work_ids, el número máximo de futures aceptados (pendientes + en vuelo) antes de que submit() bloquee es del orden de K + (W+1) + W (ids en _work_ids + elementos en _call_queue + tareas corriendo). Por eso K moderado (32–64) limita memoria sin afectar throughput, ya que el cuello de botella real es W y la cola de llamadas W+1. natashabatalha.github.io

Extra: por qué no más grande

El propio comentario del fuente mantiene _call_queue pequeña porque lo que entra ahí ya no se puede cancelar; subir demasiado K aumenta latencia y memoria sin rendimiento adicional. natashabatalha.github.io

Si me dices p50/p95 de duración de tarea, te afino K (y, si procede, un chunksize) para tus “decenas de miles” con 8 procesos y >100 productores.

Prompt:

Suelen ejecutarse 5-10 tareas por segundo, a veces incluso menos

Response:

Pensado

Perfecto: con ese ritmo (≈ 5–10 tareas/seg en total y 8 workers), tus tareas duran en media ~0.8–1.6 s. Para tareas “largas” así, no necesitas un búfer grande: un límite pequeño mantiene la latencia baja sin estrangular el throughput.

Recomendación de maxsize para _work_ids

  • Base: K = 2 × W → con W=8, K = 16.
  • Si ves ociosidad por varianza (algunas tareas tardan bastante más): sube a K = 3 × W ( 24 ).
  • Tope razonable: K = 4 × W ( 32 ). Más de eso sólo añade memoria y latencia, no más rendimiento (el cuello sigue siendo W y _call_queue ≈ W+1).

Intuición (tiempo de espera en cola)

Antes de bloquear submit(), podrás acumular aprox.
K + (W+1) llamadas encoladas + W en ejecución.
Con K=16: total ≈ 33 pendientes/ejecución. Si procesas 5–10/s, esa cola extra representa ~2–6 s de espera máxima antes de que una nueva tarea empiece a ejecutarse. Si te parece bien ese “horizonte”, K=16–24 es perfecto; si quieres aún menos espera, baja a 12.

chunksize

Como cada tarea dura ~1 s, déjalo en 1 (o 2–4 si de verdad amortiza algo de setup). El batching grande ayuda cuando las tareas son de milisegundos, no aquí.

Notas prácticas para evitar picos de memoria

  • No acumules todos los Future en una lista enorme. Consume con as_completed() y libera referencias pronto.
  • Si tienes >100 productores, tu enfoque de cola acotada en _work_ids o el wrapper con Semaphore son equivalentes en semántica de backpressure; elige la que prefieras, pero mantén K en 16–24 para empezar y observa métricas (ocupación, ociosidad, p95 de latencia).

¿Mi sugerencia concreta para tu máquina (8 CPUs, 5–10/s)?
Empieza con K = 16. Si detectas ociosidad (CPU < 90%) por varianza, prueba K = 24. Mantente por debajo de 32.

Prompt:

¿Qué valor es adecuado según el número de CPUs? Por no poner un valor fijo

Response:

Pensado

Si quieres que dependa solo de los CPUs (workers), usa esta regla:

  • Por defecto (CPU-bound, tareas ~0.5–2 s):
    K = 2 × W
  • Si hay varianza notable en duración:
    K = 3 × W
  • Límite superior razonable:
    K ≤ 4 × W

Donde W = max_workers (normalmente nº de CPUs).

Por qué

El throughput real lo fija el pool (W tareas en ejecución) y la _call_queue (~W+1). _work_ids solo debe ser un buffer pequeño para absorber rachas desde muchos productores; más allá de 4×W solo añades memoria y latencia sin ganar rendimiento.

Tabla rápida

CPUs (W) K (base = 2×W) K (varianza = 3×W) K (tope = 4×W)
4 8 12 16
8 16 24 32
16 32 48 64
32 64 96 128

Si quieres afinar un pelín (sin métricas)

  • Tareas más largas (≥2 s): tiende a K = 2×W.
  • Tareas irregulares (colas desbalanceadas): sube a 3×W.
  • Tareas muy cortas (<<0.2 s): mejor aumentar chunksize que K; si aun así quieres buffer, K = 6–8×W.

Con tus cifras (8 CPUs, 5–10 tareas/s → ~1 s por tarea):
Empieza con K = 16, si ves ociosidad por varianza pasa a K = 24, y no superes 32.

Prompt:

class queue.Queue(maxsize=0) Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

¿que nombre queda mejor para la clase, LimitedProcessPoolExecutor o BoundedProcessPoolExecutor? ¿Alguna otra alternativa?

Response:

Muy buena pregunta 🙂. Ambos nombres son entendibles, pero transmiten matices distintos:

  • BoundedProcessPoolExecutor
    → Se inspira en la jerga de colas en Python/Java (queue.Queue tiene bounded queue).
    → Indica que ahora el ejecutor tiene un límite superior en tareas pendientes.
    → Suena más técnico y “canon”.
  • LimitedProcessPoolExecutor
    → Se entiende rápido en castellano/inglés: “ejecutor limitado”.
    → Pero es más ambiguo: ¿limitado en qué? ¿workers, memoria, tareas?

Alternativas claras

  • BlockingProcessPoolExecutor → enfatiza que submit() bloquea al alcanzar el límite.
  • ThrottledProcessPoolExecutor → resalta la idea de control de caudal/backpressure.
  • BackpressuredProcessPoolExecutor → muy explícito, aunque un poco largo.
  • CappedProcessPoolExecutor → variante corta de “bounded/limited”.

Recomendación

Si lo que quieres es alinear con la terminología de colas (queue.Queue(maxsize)bounded queue), el nombre más limpio y familiar es:

👉 BoundedProcessPoolExecutor

Transmite que es un ProcessPoolExecutor pero con cola acotada, y cualquiera que haya usado queue.Queue lo asociará enseguida.

¿Quieres que te muestre un ejemplo de docstring corto para esa clase, que explique claramente el comportamiento de “submit bloqueante al alcanzar maxsize”?

Prompt:

descarta BackpressuredProcessPoolExecutor y CappedProcessPoolExecutor

Response:

Perfecto 👍, entonces descartamos BackpressuredProcessPoolExecutor y CappedProcessPoolExecutor.

Con lo que te queda en la mesa:

  • BoundedProcessPoolExecutor
    ✔️ Más técnico, consistente con queue.Queue(maxsize=…) (bounded queue).
    ✔️ Comunica con precisión: es un pool igual que el estándar, pero con cola acotada.
    ✔️ Es el término más familiar para quien venga de Python/Java.
  • LimitedProcessPoolExecutor
    ✔️ Claro y legible.
    ❌ Ambiguo: no dice qué está limitado (workers, memoria, tareas…).
  • BlockingProcessPoolExecutor
    ✔️ Pone el foco en el efecto observable: submit() se bloquea si la cola está llena.
    ❌ Describe el síntoma más que la causa; puede sonar demasiado genérico.
  • ThrottledProcessPoolExecutor
    ✔️ Evoca control de caudal.
    ❌ Menos usado en Python estándar, más en entornos de redes / APIs.

👉 Mi recomendación fuerte es BoundedProcessPoolExecutor, porque:

  • Se alinea con la terminología estándar de estructuras de datos en Python.
  • Es claro para cualquiera que haya leído el docstring de queue.Queue.
  • Evita la ambigüedad de “Limited” y es más conciso que “Throttled/Blocking”.

¿Quieres que te prepare un docstring para esa clase que explique explícitamente el nuevo comportamiento de submit() (bloqueante al alcanzar el límite), de manera similar a como lo documenta queue.Queue?

Prompt:

descarta Limited

Response:

Perfecto 🚀, entonces nos quedamos con:

  • BoundedProcessPoolExecutor
  • BlockingProcessPoolExecutor
  • ThrottledProcessPoolExecutor

De esos tres, el más claro y canónico en el ecosistema Python es BoundedProcessPoolExecutor, porque conecta directamente con la idea de bounded queue (queue.Queue(maxsize=...)).

BlockingProcessPoolExecutor también es válido si quieres resaltar el efecto visible (que submit() bloquea), pero es menos estándar.
ThrottledProcessPoolExecutor es más “de redes” y menos reconocible en contexto de colas.

👉 Mi recomendación final: usa BoundedProcessPoolExecutor.

¿Quieres que te redacte un docstring detallado para esa clase, al estilo de la librería estándar, explicando la semántica de maxsize y el comportamiento bloqueante de submit()?

Prompt:

descarta Throttled

Response:

Perfecto 🙌, entonces la terna queda reducida a:

  • BoundedProcessPoolExecutor
  • BlockingProcessPoolExecutor

De esos dos:

  • BoundedProcessPoolExecutor → el nombre más canónico en Python, porque refleja la semántica de una cola bounded (igual que queue.Queue(maxsize)), y cualquiera que conozca esa API lo va a asociar al instante.
  • BlockingProcessPoolExecutor → pone el foco en el efecto observable (submit() bloquea), pero es más genérico y menos alineado con la terminología estándar.

👉 Mi recomendación final: quédate con BoundedProcessPoolExecutor.

¿Quieres que te prepare directamente el docstring y el snippet de clase final con ese nombre, listo para usar como drop-in replacement de ProcessPoolExecutor?

Prompt:

¿cual suena mejor para alguien que no sepa que internamente hay un queue?

Response:

Muy buena observación 👌.
Si piensas en usuarios que no saben ni les importa que dentro haya una queue, lo que entienden es “puedo enviar trabajos, pero si hay demasiados, submit() se bloquea hasta que haya hueco”.

  • BoundedProcessPoolExecutor: suena correcto para quien ya conoce la jerga (bounded queue), pero a alguien “ajeno” le puede sonar técnico/abstracto.
  • BlockingProcessPoolExecutor: comunica directamente el efecto observable — “esto bloquea si no hay capacidad”. Aunque no sepas nada de colas internas, entiendes lo esencial.

👉 Para un público no experto en internals, BlockingProcessPoolExecutor suele sonar más natural y explicativo.
👉 Para un público más técnico o acostumbrado a Python stdlib, BoundedProcessPoolExecutor es la opción “de casa”.


¿Quieres que te proponga un nombre “intermedio” que comunique claramente a ambos perfiles, algo como BlockingBoundedProcessPoolExecutor (aunque sea un poco largo), o prefieres elegir entre claridad para todos (Blocking) y consistencia con la terminología estándar (Bounded)?

Prompt:

muestrame alguna alternativa mas, de cara a hacer una propuesta para incorporarlo en la libreria estandar de Python. Igualmente, ¿mejor una clase hija, o añadir un parametro a ProcessPoolExecutor?

Response:

Pensado

¡Buena idea plantearlo “como si fuera para la stdlib”! Te dejo (1) alternativas de nombre “presentables” y (2) una propuesta de API que encajaría mejor en concurrent.futures que introducir una clase nueva.

Nombres alternativos (para una clase o para el concepto)

Ordenados del más claro para no iniciados → más “canónico” para iniciados:

  1. BlockingProcessPoolExecutor — comunica el efecto visible: submit() bloquea al llenarse la capacidad.
  2. BoundedProcessPoolExecutor — alinea con la jerga de bounded queue; estándar para gente “stdlib”.
  3. CapacityLimitedProcessPoolExecutor — explícito en que limitas capacidad de envíos.
  4. FlowControlledProcessPoolExecutor — evoca control de flujo / backpressure, sin detalles de implementación.
  5. LimitedBacklogProcessPoolExecutor — pone foco en el backlog máximo de envíos.

Si el objetivo es la stdlib, yo priorizaría Blocking… (claridad para el 100% de usuarios) o Bounded… (consistencia terminológica). Entre ambas, “Bounded” es más stdlib-like.

¿Nueva clase o nuevo parámetro?

Recomiendo añadir un parámetro al constructor de ProcessPoolExecutor (y, idealmente, a ThreadPoolExecutor para simetría). Razones:

  • Evitas proliferar tipos y type checks en código existente.
  • La funcionalidad es un detalle de política de envío, no un tipo nuevo de ejecutor.
  • Es descubrible (aparece en el help() y docstring del mismo ejecutor que todos ya usan).
  • Mantienes compatibilidad total: por defecto, el comportamiento actual (cola sin límite) no cambia.

Propuesta concreta de API

Nuevos parámetros (constructor)

  • max_submit_backlog: int | None = None
    Límite superior de “trabajos aceptados pero no iniciados” (pendientes en _work_ids + en la _call_queue).
    • None → como hoy: ilimitado.
    • >= 0 → límite efectivo; cuando se alcance, submit() bloquea hasta que haya hueco.
  • submit_backlog_timeout: float | None = None (opcional, se puede dejar para una segunda iteración)
    • None → bloquea indefinidamente.
    • >= 0 → si no hay hueco antes de timeout, submit() lanza TimeoutError (o una excepción específica, ver abajo).

Alternativamente, si no se quiere introducir bloqueo por defecto cuando hay límite, podría añadirse:

  • block_on_submit: bool = True
    y, si es False, submit() lanzaría RejectedSubmission (o queue.Full) al exceder el límite. Pero añadir una nueva excepción pública concurrent.futures.RejectedSubmission es más trabajo de API; empezar con bloqueo es más simple y útil en la mayoría de casos.

Semántica documentable (breve)

  • “Cuando max_submit_backlog es un entero positivo, submit() se bloquea si el número de trabajos pendientes (aún no iniciados) alcanza ese límite. Se desbloquea cuando el queue manager consume un id de trabajo o cuando un worker libera capacidad. Este mecanismo proporciona backpressure a los productores y acota el uso de memoria.”

Elegir el nombre del parámetro

Opciones claras y “stdlib-friendly”:

  • max_submit_backlog (mi favorita: precisa y corta)
  • max_pending_tasks / max_pending_calls
  • max_queued_tasks (ligeramente más acoplada al detalle “queue”)

Para la stdlib, max_submit_backlog describe exactamente lo que el usuario percibe (cuántos submit pueden “acumularse”).

Valores por defecto y compatibilidad

  • max_submit_backlog=None preserva al 100% el comportamiento actual.
  • Si se introduce submit_backlog_timeout, su None mantiene bloqueo ilimitado; 0.0 permitiría un modo “no bloquear, falla inmediato”.

¿Y una clase hija?

Se podría aceptar un alias tipo BoundedProcessPoolExecutor que simplemente pase max_submit_backlog al padre. Pero no es necesario y añade mantenimiento. Si te piden una “clase ejemplo” para el “How-To”, se puede incluir en la documentación como recipe.

Resumen de propuesta para llevar a discusión

  • API: añadir max_submit_backlog: int | None = None (y opcional submit_backlog_timeout: float | None = None) a ProcessPoolExecutor (y a ThreadPoolExecutor por simetría).
  • Comportamiento: cuando el backlog alcanza el límite, submit() bloquea hasta que haya capacidad (o hasta timeout).
  • Nombres: si hubiera una clase receptáculo en ejemplos, BoundedProcessPoolExecutor o BlockingProcessPoolExecutor; para el parámetro, max_submit_backlog.
  • Rationale: control de memoria/latencia, backpressure nativo, sin cambios para quien no lo use.

Si quieres, te escribo un bloque de docstring listo para PR y un pequeño test (tipo test_concurrent_futures) que verifique que submit() bloquea cuando se alcanza el límite y se desbloquea al completarse una tarea.

Prompt:

¿y si quiero que calcule max_submit_backlog automaticamente? ¿quizas max_submit_backlog=True?

Response:

¡Muy buena idea! 🚦 Tener un valor “automático” evita obligar al usuario a calcular el backlog ideal según CPUs.

En la stdlib esto suele resolverse con:

  • None → ilimitado (como hoy).
  • Un entero >0 → límite fijo.
  • Un valor especial (ej. True, o una constante tipo AUTO) → cálculo automático.

Diseño posible

def __init__(self, max_workers=None, *, max_submit_backlog=None, ...):
    ...
    if max_submit_backlog is None:
        self._max_submit_backlog = math.inf   # ilimitado, actual
    elif max_submit_backlog is True:
        # heurística automática
        self._max_submit_backlog = self._max_workers * 2
    else:
        self._max_submit_backlog = int(max_submit_backlog)

Semántica

  • None (default): comportamiento histórico → sin límite.
  • int > 0: backlog máximo explícito.
  • True: usa una heurística automática en función de max_workers.

¿Qué heurística usar?

Para tareas de segundos (como en tu caso, 5–10/s con 8 CPUs), un backlog de 2–3× max_workers suele dar buen balance (evita ociosidad sin inflar la memoria).
max_submit_backlog = 2 * max_workers es simple y razonable.
→ Si se quisiera más sofisticado, se podría exponer en docstring: “El valor automático usa 2 × max_workers, lo que mantiene un pequeño colchón de tareas pendientes sin permitir que crezca ilimitadamente”.


Alternativas de API más “stdlib-like”

En vez de True, Python a veces usa sentinelas con constantes públicas:

from concurrent.futures import AUTO_BACKLOG
ProcessPoolExecutor(max_submit_backlog=AUTO_BACKLOG)

O incluso un Ellipsis (...), aunque es menos claro.

Pero True es muy natural para el usuario (“quiero que se autocalcule”).


Ejemplo de docstring (extracto)

max_submit_backlog (int | bool | None, default=None)
    Máximo número de tareas pendientes (aceptadas pero aún no iniciadas).
    - None (por defecto): sin límite (comportamiento actual).
    - int > 0: límite fijo; ``submit()`` se bloquea si se alcanza.
    - True: calcula automáticamente un límite basado en max_workers
      (actualmente ``2 × max_workers``).

👉 Conclusión: , max_submit_backlog=True es una forma clara y práctica de pedir heurística automática. Para la stdlib probablemente se discutiría si usar True o una constante como AUTO por legibilidad, pero el mecanismo es sólido.

¿Quieres que te bosqueje un PR-style cambio a la signatura de ProcessPoolExecutor.__init__ mostrando cómo quedaría con max_submit_backlog y el cálculo automático con True?

Prompt:

¿porque estas parseando el int?

Response:

Tienes toda la razón 👌, si pensamos en cómo se diseña la stdlib, no sería necesario “parsear” explícitamente un int.

En mi pseudocódigo anterior puse algo tipo:

elif max_submit_backlog is True:
    self._max_submit_backlog = self._max_workers * 2
else:
    self._max_submit_backlog = int(max_submit_backlog)

…pero eso es un exceso: si el usuario pasa ya un int, basta con aceptarlo tal cual (y quizás validar que sea > 0). La conversión a int() no es idiomática ni en ThreadPoolExecutor ni en ProcessPoolExecutor, donde los parámetros se documentan como enteros y si alguien pasa un float u otro tipo, lo normal es dejar que Python rompa solo.

Lo más natural sería:

if max_submit_backlog is None:
    self._max_submit_backlog = math.inf   # ilimitado
elif max_submit_backlog is True:
    self._max_submit_backlog = self._max_workers * 2
elif isinstance(max_submit_backlog, int) and max_submit_backlog > 0:
    self._max_submit_backlog = max_submit_backlog
else:
    raise ValueError("max_submit_backlog must be None, True, or a positive int")

Ventajas de este enfoque

  • No intentamos “convertir” tipos, como int("10") o int(3.5) (que aceptaría 3 silenciosamente).
  • La semántica queda clara y estricta (igual que con otros parámetros de la stdlib).
  • La excepción informativa ValueError es el camino habitual en la biblioteca estándar cuando se pasa un tipo inesperado.

👉 En resumen: no conviene “parsear” el int. Basta con aceptar enteros positivos, None o True. Eso es lo más coherente con el estilo de la librería estándar.

¿Quieres que te prepare un fragmento de código al estilo PR en CPython, con la signatura modificada, el bloque de validación como arriba, y un docstring acorde?

Prompt:

¿la stdlib incorpora anotaciones de tipos?

Response:

Sí ✅.

Desde Python 3.9 en adelante, casi todos los módulos grandes de la stdlib han ido incorporando type annotations (PEP 484 / PEP 585 / PEP 604 / PEP 563 / PEP 649). En el caso concreto de concurrent.futures, ya tiene stubs bastante completos en typeshed, y en las versiones recientes de CPython (3.11+), el código fuente mismo incluye anotaciones en varios métodos públicos.

Por ejemplo, en CPython 3.13 (lo que enlazabas antes):

class Executor:
    def submit(self, fn: Callable[..., _T], /, *args: object, **kwargs: object) -> Future[_T]: ...
    def map(self, func: Callable[..., _T], *iterables: Iterable[object],
            timeout: float | None = None,
            chunksize: int = 1) -> Iterator[_T]: ...

Es decir, usan PEP 604 unions (float | None), Callable, Iterable, genéricos en Future[_T], etc.

Estilo en la stdlib

  • Parámetros nuevos: si se añaden, deben venir ya con anotación de tipo.
  • Si un parámetro puede aceptar un valor centinela (None, True…), se anota con una unión explícita.
    Ejemplo:
    max_submit_backlog: int | Literal[True] | None = None
    
    (aquí Literal[True] para documentar el “modo automático”).
  • También es común usar bool | int | None si no se quiere forzar Literal.

👉 Así que sí, si propones añadir max_submit_backlog, lo correcto sería anotarlo en la signatura pública.

¿Quieres que te arme un borrador de signatura de ProcessPoolExecutor.__init__ con esa anotación incluida (int | Literal[True] | None) y el docstring al estilo de la stdlib?

Prompt:

¿porque usar Literal[True] y no simplemente True?

Response:

Muy buena observación 👌.

En anotaciones de tipo, True (a secas) no significa “el valor literal booleano True”, sino el tipo bool. Es decir:

def f(x: True): ...

…no está permitido como “tipo”; te dará un error de sintaxis.

Para poder expresar que un parámetro acepta exactamente el valor literal True (y no False, ni cualquier otro bool), la forma canónica desde PEP 586 es:

from typing import Literal

def f(x: Literal[True]) -> None: ...

Eso comunica al type checker (mypy, pyright, etc.) que el único valor booleano aceptado es True.


Ejemplo aplicado

En tu caso, si el API quiere aceptar:

  • None → ilimitado (comportamiento histórico)
  • int > 0 → límite fijo
  • True → cálculo automático

La anotación sería:

from typing import Literal

def __init__(self,
             max_workers: int | None = None,
             *,
             max_submit_backlog: int | Literal[True] | None = None,
             **kwargs) -> None: ...

Alternativas

  • Podrías declarar simplemente int | bool | None, pero ahí un type checker aceptaría cualquier bool, incluyendo False. Eso sería menos preciso.
  • Usar Literal[True] comunica claramente la intención: solo True es válido como centinela.

👉 Por eso, en la stdlib o en typeshed, cuando se quiere distinguir un True especial, se usa Literal[True].

¿Quieres que te muestre cómo quedaría la sección del docstring de ProcessPoolExecutor.__init__ con esa anotación y la explicación de los tres modos (None, int, True)?

Prompt:

bounded_process_pool_executor.py

Python

esta es la version final del modulo, funciona exactamente como yo queria, pausando los hilos producers cuando el ProcessPoolExecutor esta saturado. La ejecucion tiene algunos parones por esta razon, y el uso del disco duro no es continuado creo que por lo mismo, pero ya la memoria no crece descontroladamente y se ejecuta la aplicacion sin problemas

Response:

Acabo de revisar tu módulo bounded_process_pool_executor.py y está muy limpio y bien planteado ✅.

Qué has conseguido

  • Límite configurable en la cola de envíos (max_queue_size), con tres modos:
    • None → ilimitado (igual que ProcessPoolExecutor original).
    • True → se calcula automáticamente como 2 × max_workers.
    • int > 0 → límite fijo definido por el usuario.
  • Implementado con un Semaphore que bloquea en submit() cuando se alcanza el máximo, liberándose al completarse cada Future.
  • cast(int, self._max_workers) es correcto aquí porque _max_workers siempre se inicializa en el super().__init__.
  • Anotaciones de tipo claras con Optional[int | Literal[True]].

Efectos prácticos

  • Ahora tus productores se pausan cuando el pool está saturado → lograste el backpressure real.
  • Eso explica que veas “parones” en la ejecución y que el acceso a disco no sea continuo: los productores se frenan hasta que el pool libera hueco.
  • Memoria estable: al no poder apilar decenas de miles de Future pendientes, la RAM deja de crecer sin control.

Pequeñas mejoras opcionales

  • Nombre del parámetro: si lo quieres proponer para stdlib, quizá renombrarlo a algo más “neutral” como max_submit_backlog (describe mejor lo que limita).
  • Validación explícita: en vez de assert max_queue_size > 0, usar if max_queue_size <= 0: raise ValueError(...) (más idiomático stdlib).
  • Docstring más detallado: aclarar que cuando se alcanza el límite, submit() se bloquea hasta que alguna tarea termine.

Ejemplo de docstring tipo stdlib:

"""
:param max_queue_size: Máximo de tareas pendientes permitidas.
    - None (default): sin límite, como ProcessPoolExecutor original.
    - int > 0: límite fijo; ``submit()`` se bloquea si se alcanza.
    - True: calcula un límite automático (actualmente 2 × max_workers).
"""

En resumen: lo que tienes ya funciona exactamente como debe y resuelve tu problema de saturación 👍.
Si lo quieres pulir de cara a un PR a la stdlib, la clave sería renombrar el parámetro a algo más genérico (max_submit_backlog) y añadir validación + docstring formal.

¿Quieres que te arme un ejemplo de cómo quedaría exactamente la signatura y docstring de ProcessPoolExecutor.__init__ con este nuevo argumento, al estilo de un PR oficial de CPython?

Prompt:

al final no lo voy a proponer para que lo incluyan en la libreria estandar de python, no es tan sencillo como añadir un limite a la cola (lo seria si se refactorizaran mas aspectos, como eliminar el _work_id y meter directamente la tarea en la cola), y tampoco es tan complicado como para incluirlo como una clase hija, mejor implementarlo ad-hoc, o como mucho en una libreria externa. Resume la conversacion en un articulo para mi blog en ingles. Sigue implicitamente la metodologia STAR, pero que no se note en los titulos de las secciones: cuenta el problema, las alternativas estudiadas, como se han calculado los valores por defecto, en que caso usar cada uno, que se ha hecho y como, y el resultado final. No olvides incluir el codigo final, añadiendo tus sugerencias

Response:

Here’s a draft for your blog article, written as a narrative case study (implicitly following STAR). I included the background, exploration of alternatives, design decisions, the final implementation (with my suggested refinements), and outcomes.


Adding Backpressure to Python’s ProcessPoolExecutor

Some time ago I hit a practical limitation with Python’s ProcessPoolExecutor: when feeding it tens of thousands of tasks from hundreds of producer threads, the executor happily accepted them all. The result? Memory usage ballooned, latency increased, and eventually the whole system became unstable.

The root issue: by default, ProcessPoolExecutor accepts an unbounded number of submissions. The internal queue of work IDs grows without limit, and since every submit() creates a Future, you can easily flood memory if your producers are much faster than the pool can drain tasks.

I needed backpressure: a way to make submit() block when the executor is saturated, so producers naturally pause until capacity frees up.


Options Considered

Several strategies came up during exploration:

  1. Monkey-patching internals
    Replace the executor’s private _work_ids queue with a bounded one. Technically possible (the last field initialized in the constructor), but fragile across Python versions.
  2. External semaphore
    Wrap the executor in a class that holds a Semaphore. Each submit() acquires a slot; each task completion releases it. This approach is stable and easy to customize.
  3. Bounded queue feeder
    Insert a Queue(maxsize=N) between producers and a dedicated feeder thread that calls submit(). Producers block on put() when the queue is full.
  4. Batching
    Increase the task granularity (using chunksize in map() or by grouping tasks manually). This reduces overhead but doesn’t directly solve the memory growth problem.

Ultimately, I wanted something drop-in compatible with the existing executor API. That meant subclassing with a simple extension.


Choosing Defaults

The key design choice is how large the backlog of pending tasks should be before producers block. The number should be proportional to the number of workers:

  • 2 × workers: a balanced default for CPU-bound tasks of ~1s duration.
  • 3 × workers: useful when task durations vary a lot.
  • 4 × workers: a practical upper bound; beyond that, memory and latency increase without improving throughput.

Example: with 8 CPU workers and tasks taking around one second, max_queue_size=16 keeps memory stable and ensures low latency. If variance is high, bump it to 24. Rarely is more than 32 necessary.


The Implementation

Here is the final class I ended up using. It subclasses ProcessPoolExecutor and adds a bounded queue via a semaphore. The design includes type annotations (as in the standard library), a clear docstring, and stricter validation:

from concurrent.futures import ProcessPoolExecutor, Future
from threading import Semaphore
from typing import Literal, Optional, cast, Callable, Any

class BoundedProcessPoolExecutor(ProcessPoolExecutor):
    """
    A ProcessPoolExecutor with a bounded submission queue.
    
    Unlike the standard implementation, `submit()` will block once
    the number of pending tasks reaches `max_queue_size`. This provides
    natural backpressure to producers and prevents unbounded memory growth.
    """

    def __init__(
        self,
        max_queue_size: Optional[int | Literal[True]] = None,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """
        :param max_queue_size:
            - None (default): unlimited (same as ProcessPoolExecutor).
            - int > 0: maximum number of tasks allowed to be pending.
                       `submit()` blocks if this limit is reached.
            - True: automatic mode, sets the queue size to 2 × max_workers.
        """
        super().__init__(*args, **kwargs)

        if max_queue_size is None:
            self._semaphore: Semaphore | None = None
        else:
            if max_queue_size is True:
                max_queue_size = cast(int, self._max_workers) * 2
            if not isinstance(max_queue_size, int) or max_queue_size <= 0:
                raise ValueError(
                    "max_queue_size must be None, True, or a positive int"
                )
            self._semaphore = Semaphore(max_queue_size)

    def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future:
        semaphore = self._semaphore
        if semaphore:
            semaphore.acquire()

        future = super().submit(fn, *args, **kwargs)

        if semaphore:
            future.add_done_callback(lambda f: semaphore.release())

        return future

Results

With this subclass in place:

  • Producers pause automatically when the pool is full, instead of flooding memory.
  • Memory stays flat, no runaway growth from thousands of pending Future objects.
  • Latency stabilizes, since tasks no longer wait behind massive queues.
  • Throughput remains consistent, limited only by the number of workers.

There are visible pauses in producer threads and even some “gaps” in disk activity, but that’s expected: it’s the backpressure mechanism working as intended.


Takeaways

This isn’t a perfect candidate for inclusion in the Python standard library — that would require deeper refactoring (e.g., eliminating the _work_id indirection). But as an ad-hoc solution or external utility, it works extremely well and is simple to reason about.

If you’re pushing ProcessPoolExecutor with tens of thousands of tasks, this bounded version is a safe and effective drop-in replacement that keeps your memory in check and your system responsive.


Powered by ChatGPT Exporter