Jump to solution
Verify

The Fix

pip install celery==4.4.0rc5

Based on closed celery/celery issue #4733 · PR/commit linked

Jump to Verify Open PR/Commit
@@ -58,6 +58,20 @@ def __init__(self, *args, **kwargs): self.subscribed_to = set() + def on_after_fork(self): + self.backend.client.connection_pool.reset() + if self._pubsub is not None:
repro.py
import multiprocessing from celery import Celery import time # How many tasks to try num_attempts = 200 # concurrent worker child processes cwcp = 24 # Collect AsyncResult objects in a list tasks_list = [] # Use get() directly without checking for task state first use_get_without_state_check = False # Function to be executed by Celery worker def sum_function(*args): time.sleep(1) return sum(args) # Start Celery worker process and add task def start_celery(): # Assume server has RabbitMQ and Redis already installed and listening # Create a Celery worker app = Celery("my_app_name", broker="amqp://", #backend="cache+memcached://") backend="redis://") # Add and register task task = app.task(name="tasks." + sum_function.__name__)(sum_function) app.tasks.register(task) # Start process p = multiprocessing.Process( target=app.worker_main, kwargs={ 'argv': ['worker', '-E', '-c', str(cwcp)] } ) p.daemon = True p.start() return app if __name__ == "__main__": app = start_celery() for i in range(num_attempts): tasks_list.append( app.send_task('tasks.' + sum_function.__name__, args=[1, 2], kwargs={} ) ) print("%s tasks submitted" % len(tasks_list)) get_finished_timestamps = {} result_fetch_start_time = time.time() results_count = [0]*num_attempts while sum(results_count) < num_attempts: for i in range(len(tasks_list)): if use_get_without_state_check: results_count[i] = 1 res = tasks_list[i].get() assert(res == 3) get_finished_timestamps[i] = time.time() else: if tasks_list[i].state == "SUCCESS": results_count[i] = 1 res = tasks_list[i].get() assert(res == 3) get_finished_timestamps[i] = time.time() # sleep to stop CPU spinning all the time time.sleep(0.1) # print out timestamps when get() calls were finished for k in sorted(get_finished_timestamps, key=get_finished_timestamps.get): print(k, get_finished_timestamps[k]) # Display total get() time result_fetch_end_time = time.time() print("%s secs to fetch" % (result_fetch_end_time - result_fetch_start_time))
verify
Re-run the minimal reproduction on your broken version, then apply the fix and re-run.
fix.md
Option A — Upgrade to fixed release\npip install celery==4.4.0rc5\nWhen NOT to use: This fix is not applicable if using a different backend than Redis.\n\n

Why This Fix Works in Production

  • Trigger: It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists!
  • Mechanism: The Redis backend was not resetting the connection pool after task state changes
  • Why the fix works: Addresses performance issues with the Redis backend by resetting the connection pool and unsubscribing from channels after a task's state changes to ready. (first fixed release: 4.4.0rc5).
Production impact:
  • If left unfixed, this can cause silent data inconsistencies that propagate (bad cache entries, incorrect downstream decisions).

Why This Breaks in Prod

  • Shows up under Python 3.6.3 in real deployments (not just unit tests).
  • The Redis backend was not resetting the connection pool after task state changes
  • Production symptom (often without a traceback): It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists!

Proof / Evidence

  • GitHub issue: #4733
  • Fix PR: https://github.com/celery/celery/pull/4666
  • First fixed release: 4.4.0rc5
  • Reproduced locally: No (not executed)
  • Last verified: 2026-02-09
  • Confidence: 0.85
  • Did this fix it?: Yes (upstream fix exists)
  • Own content ratio: 0.43

Discussion

High-signal excerpts from the issue thread (symptoms, repros, edge-cases).

“Yep, thanks a lot for the advice! Closing...”
@taibende · 2018-05-17 · confirmation · source
“Can you elaborate on the hardware specifications of the server? Generally increasing concurrency beyond the number of CPUs will result in excessive context switching, probably…”
@georgepsarakis · 2018-05-15 · source
“- Have you also hiredis installed? It improves efficiency by speeding up response message parsing”
@georgepsarakis · 2018-05-16 · source
“Hey George, We just installed hiredis and tried again, but it's not making any difference”
@taibende · 2018-05-17 · source

Failure Signature (Search String)

  • It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists!
  • Here is some code to reproduce the issue if anyone is interested (simply run the code and change the `use_get_without_state_check` variable from `False` to `True` to see it slow
Copy-friendly signature
signature.txt
Failure Signature ----------------- It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists! Here is some code to reproduce the issue if anyone is interested (simply run the code and change the `use_get_without_state_check` variable from `False` to `True` to see it slow down.

Error Message

Signature-only (no traceback captured)
error.txt
Error Message ------------- It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists! Here is some code to reproduce the issue if anyone is interested (simply run the code and change the `use_get_without_state_check` variable from `False` to `True` to see it slow down.

Minimal Reproduction

repro.py
import multiprocessing from celery import Celery import time # How many tasks to try num_attempts = 200 # concurrent worker child processes cwcp = 24 # Collect AsyncResult objects in a list tasks_list = [] # Use get() directly without checking for task state first use_get_without_state_check = False # Function to be executed by Celery worker def sum_function(*args): time.sleep(1) return sum(args) # Start Celery worker process and add task def start_celery(): # Assume server has RabbitMQ and Redis already installed and listening # Create a Celery worker app = Celery("my_app_name", broker="amqp://", #backend="cache+memcached://") backend="redis://") # Add and register task task = app.task(name="tasks." + sum_function.__name__)(sum_function) app.tasks.register(task) # Start process p = multiprocessing.Process( target=app.worker_main, kwargs={ 'argv': ['worker', '-E', '-c', str(cwcp)] } ) p.daemon = True p.start() return app if __name__ == "__main__": app = start_celery() for i in range(num_attempts): tasks_list.append( app.send_task('tasks.' + sum_function.__name__, args=[1, 2], kwargs={} ) ) print("%s tasks submitted" % len(tasks_list)) get_finished_timestamps = {} result_fetch_start_time = time.time() results_count = [0]*num_attempts while sum(results_count) < num_attempts: for i in range(len(tasks_list)): if use_get_without_state_check: results_count[i] = 1 res = tasks_list[i].get() assert(res == 3) get_finished_timestamps[i] = time.time() else: if tasks_list[i].state == "SUCCESS": results_count[i] = 1 res = tasks_list[i].get() assert(res == 3) get_finished_timestamps[i] = time.time() # sleep to stop CPU spinning all the time time.sleep(0.1) # print out timestamps when get() calls were finished for k in sorted(get_finished_timestamps, key=get_finished_timestamps.get): print(k, get_finished_timestamps[k]) # Display total get() time result_fetch_end_time = time.time() print("%s secs to fetch" % (result_fetch_end_time - result_fetch_start_time))

Environment

  • Python: 3.6.3

Why It Broke

The Redis backend was not resetting the connection pool after task state changes

Fix Options (Details)

Option A — Upgrade to fixed release Safe default (recommended)

pip install celery==4.4.0rc5

When NOT to use: This fix is not applicable if using a different backend than Redis.

Use when you can deploy the upstream fix. It is usually lower-risk than long-lived workarounds.

Option D — Guard side-effects with OnceOnly Guardrail for side-effects

Mitigate duplicate external side-effects under retries/timeouts/agent loops by gating the operation before calling external systems.

  • Place OnceOnly between your code/agent and real side-effects (Stripe, emails, CRM, APIs).
  • Use a stable key per side-effect (e.g., customer_id + action + idempotency_key).
  • Fail-safe: configure fail-open vs fail-closed based on blast radius and spend risk.
  • This does NOT fix data corruption; it only prevents duplicate side-effects.
Show example snippet (optional)
onceonly.py
from onceonly import OnceOnly import os once = OnceOnly(api_key=os.environ["ONCEONLY_API_KEY"], fail_open=True) # Stable idempotency key per real side-effect. # Use a request id / job id / webhook delivery id / Stripe event id, etc. event_id = "evt_..." # replace key = f"stripe:webhook:{event_id}" res = once.check_lock(key=key, ttl=3600) if res.duplicate: return {"status": "already_processed"} # Safe to execute the side-effect exactly once. handle_event(event_id)

See OnceOnly SDK

When NOT to use: Do not use this to hide logic bugs or data corruption. Use it to block duplicate external side-effects and enforce tool permissions/spend caps.

Fix reference: https://github.com/celery/celery/pull/4666

First fixed release: 4.4.0rc5

Last verified: 2026-02-09. Validate in your environment.

Get updates

We publish verified fixes weekly. No spam.

Subscribe

When NOT to Use This Fix

  • This fix is not applicable if using a different backend than Redis.
  • Do not use this to hide logic bugs or data corruption. Use it to block duplicate external side-effects and enforce tool permissions/spend caps.

Verify Fix

verify
Re-run the minimal reproduction on your broken version, then apply the fix and re-run.

Did This Fix Work in Your Case?

Quick signal helps us prioritize which fixes to verify and improve.

Prevention

  • Capture the exact failing error string in logs and tests so you can reproduce via a minimal script.
  • Pin production dependencies and upgrade only with a reproducible test that hits the failing path.

Version Compatibility Table

VersionStatus
4.4.0rc5 Fixed

Related Issues

No related fixes found.

Sources

We don’t republish the full GitHub discussion text. Use the links above for context.