The Fix
pip install celery==4.4.0rc5
Based on closed celery/celery issue #4733 · PR/commit linked
@@ -58,6 +58,20 @@ def __init__(self, *args, **kwargs):
self.subscribed_to = set()
+ def on_after_fork(self):
+ self.backend.client.connection_pool.reset()
+ if self._pubsub is not None:
import multiprocessing
from celery import Celery
import time
# How many tasks to try
num_attempts = 200
# concurrent worker child processes
cwcp = 24
# Collect AsyncResult objects in a list
tasks_list = []
# Use get() directly without checking for task state first
use_get_without_state_check = False
# Function to be executed by Celery worker
def sum_function(*args):
time.sleep(1)
return sum(args)
# Start Celery worker process and add task
def start_celery():
# Assume server has RabbitMQ and Redis already installed and listening
# Create a Celery worker
app = Celery("my_app_name", broker="amqp://",
#backend="cache+memcached://")
backend="redis://")
# Add and register task
task = app.task(name="tasks." + sum_function.__name__)(sum_function)
app.tasks.register(task)
# Start process
p = multiprocessing.Process(
target=app.worker_main,
kwargs={
'argv': ['worker', '-E', '-c', str(cwcp)]
}
)
p.daemon = True
p.start()
return app
if __name__ == "__main__":
app = start_celery()
for i in range(num_attempts):
tasks_list.append(
app.send_task('tasks.' + sum_function.__name__,
args=[1, 2],
kwargs={}
)
)
print("%s tasks submitted" % len(tasks_list))
get_finished_timestamps = {}
result_fetch_start_time = time.time()
results_count = [0]*num_attempts
while sum(results_count) < num_attempts:
for i in range(len(tasks_list)):
if use_get_without_state_check:
results_count[i] = 1
res = tasks_list[i].get()
assert(res == 3)
get_finished_timestamps[i] = time.time()
else:
if tasks_list[i].state == "SUCCESS":
results_count[i] = 1
res = tasks_list[i].get()
assert(res == 3)
get_finished_timestamps[i] = time.time()
# sleep to stop CPU spinning all the time
time.sleep(0.1)
# print out timestamps when get() calls were finished
for k in sorted(get_finished_timestamps, key=get_finished_timestamps.get):
print(k, get_finished_timestamps[k])
# Display total get() time
result_fetch_end_time = time.time()
print("%s secs to fetch" %
(result_fetch_end_time - result_fetch_start_time))
Re-run the minimal reproduction on your broken version, then apply the fix and re-run.
Option A — Upgrade to fixed release\npip install celery==4.4.0rc5\nWhen NOT to use: This fix is not applicable if using a different backend than Redis.\n\n
Why This Fix Works in Production
- Trigger: It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists!
- Mechanism: The Redis backend was not resetting the connection pool after task state changes
- Why the fix works: Addresses performance issues with the Redis backend by resetting the connection pool and unsubscribing from channels after a task's state changes to ready. (first fixed release: 4.4.0rc5).
- If left unfixed, this can cause silent data inconsistencies that propagate (bad cache entries, incorrect downstream decisions).
Why This Breaks in Prod
- Shows up under Python 3.6.3 in real deployments (not just unit tests).
- The Redis backend was not resetting the connection pool after task state changes
- Production symptom (often without a traceback): It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists!
Proof / Evidence
- GitHub issue: #4733
- Fix PR: https://github.com/celery/celery/pull/4666
- First fixed release: 4.4.0rc5
- Reproduced locally: No (not executed)
- Last verified: 2026-02-09
- Confidence: 0.85
- Did this fix it?: Yes (upstream fix exists)
- Own content ratio: 0.43
Discussion
High-signal excerpts from the issue thread (symptoms, repros, edge-cases).
“Yep, thanks a lot for the advice! Closing...”
“Can you elaborate on the hardware specifications of the server? Generally increasing concurrency beyond the number of CPUs will result in excessive context switching, probably…”
“- Have you also hiredis installed? It improves efficiency by speeding up response message parsing”
“Hey George, We just installed hiredis and tried again, but it's not making any difference”
Failure Signature (Search String)
- It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists!
- Here is some code to reproduce the issue if anyone is interested (simply run the code and change the `use_get_without_state_check` variable from `False` to `True` to see it slow
Copy-friendly signature
Failure Signature
-----------------
It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists!
Here is some code to reproduce the issue if anyone is interested (simply run the code and change the `use_get_without_state_check` variable from `False` to `True` to see it slow down.
Error Message
Signature-only (no traceback captured)
Error Message
-------------
It appears that something was changed between 4.1.0 and the current version, and this issue no longer exists!
Here is some code to reproduce the issue if anyone is interested (simply run the code and change the `use_get_without_state_check` variable from `False` to `True` to see it slow down.
Minimal Reproduction
import multiprocessing
from celery import Celery
import time
# How many tasks to try
num_attempts = 200
# concurrent worker child processes
cwcp = 24
# Collect AsyncResult objects in a list
tasks_list = []
# Use get() directly without checking for task state first
use_get_without_state_check = False
# Function to be executed by Celery worker
def sum_function(*args):
time.sleep(1)
return sum(args)
# Start Celery worker process and add task
def start_celery():
# Assume server has RabbitMQ and Redis already installed and listening
# Create a Celery worker
app = Celery("my_app_name", broker="amqp://",
#backend="cache+memcached://")
backend="redis://")
# Add and register task
task = app.task(name="tasks." + sum_function.__name__)(sum_function)
app.tasks.register(task)
# Start process
p = multiprocessing.Process(
target=app.worker_main,
kwargs={
'argv': ['worker', '-E', '-c', str(cwcp)]
}
)
p.daemon = True
p.start()
return app
if __name__ == "__main__":
app = start_celery()
for i in range(num_attempts):
tasks_list.append(
app.send_task('tasks.' + sum_function.__name__,
args=[1, 2],
kwargs={}
)
)
print("%s tasks submitted" % len(tasks_list))
get_finished_timestamps = {}
result_fetch_start_time = time.time()
results_count = [0]*num_attempts
while sum(results_count) < num_attempts:
for i in range(len(tasks_list)):
if use_get_without_state_check:
results_count[i] = 1
res = tasks_list[i].get()
assert(res == 3)
get_finished_timestamps[i] = time.time()
else:
if tasks_list[i].state == "SUCCESS":
results_count[i] = 1
res = tasks_list[i].get()
assert(res == 3)
get_finished_timestamps[i] = time.time()
# sleep to stop CPU spinning all the time
time.sleep(0.1)
# print out timestamps when get() calls were finished
for k in sorted(get_finished_timestamps, key=get_finished_timestamps.get):
print(k, get_finished_timestamps[k])
# Display total get() time
result_fetch_end_time = time.time()
print("%s secs to fetch" %
(result_fetch_end_time - result_fetch_start_time))
Environment
- Python: 3.6.3
Why It Broke
The Redis backend was not resetting the connection pool after task state changes
Fix Options (Details)
Option A — Upgrade to fixed release Safe default (recommended)
pip install celery==4.4.0rc5
Use when you can deploy the upstream fix. It is usually lower-risk than long-lived workarounds.
Option D — Guard side-effects with OnceOnly Guardrail for side-effects
Mitigate duplicate external side-effects under retries/timeouts/agent loops by gating the operation before calling external systems.
- Place OnceOnly between your code/agent and real side-effects (Stripe, emails, CRM, APIs).
- Use a stable key per side-effect (e.g., customer_id + action + idempotency_key).
- Fail-safe: configure fail-open vs fail-closed based on blast radius and spend risk.
- This does NOT fix data corruption; it only prevents duplicate side-effects.
Show example snippet (optional)
from onceonly import OnceOnly
import os
once = OnceOnly(api_key=os.environ["ONCEONLY_API_KEY"], fail_open=True)
# Stable idempotency key per real side-effect.
# Use a request id / job id / webhook delivery id / Stripe event id, etc.
event_id = "evt_..." # replace
key = f"stripe:webhook:{event_id}"
res = once.check_lock(key=key, ttl=3600)
if res.duplicate:
return {"status": "already_processed"}
# Safe to execute the side-effect exactly once.
handle_event(event_id)
Fix reference: https://github.com/celery/celery/pull/4666
First fixed release: 4.4.0rc5
Last verified: 2026-02-09. Validate in your environment.
When NOT to Use This Fix
- This fix is not applicable if using a different backend than Redis.
- Do not use this to hide logic bugs or data corruption. Use it to block duplicate external side-effects and enforce tool permissions/spend caps.
Verify Fix
Re-run the minimal reproduction on your broken version, then apply the fix and re-run.
Did This Fix Work in Your Case?
Quick signal helps us prioritize which fixes to verify and improve.
Prevention
- Capture the exact failing error string in logs and tests so you can reproduce via a minimal script.
- Pin production dependencies and upgrade only with a reproducible test that hits the failing path.
Version Compatibility Table
| Version | Status |
|---|---|
| 4.4.0rc5 | Fixed |
Related Issues
No related fixes found.
Sources
We don’t republish the full GitHub discussion text. Use the links above for context.