The Fix
pip install redis==7.1.0
Based on closed redis/redis-py issue #1572 · PR/commit linked
Production note: Most teams hit this during upgrades or environment changes. Roll out with a canary and smoke critical endpoints (health, OpenAPI/docs) before 100%.
@@ -1259,7 +1259,10 @@ def parse_response(self, block=True, timeout=0):
self.check_health()
- if not block and not conn.can_read(timeout=timeout):
+ if(
+ not block
import redis
import itertools
import threading
from time import sleep
redis = redis.Redis()
pubsub = redis.pubsub()
pubsub.subscribe(topic=print)
class PubSubWorkerThread(threading.Thread):
def __init__(self, pubsub, sleep_time, daemon=False):
super(PubSubWorkerThread, self).__init__()
self.daemon = daemon
self.pubsub = pubsub
self.sleep_time = sleep_time
self._running = threading.Event()
def run(self):
if self._running.is_set():
return
self._running.set()
pubsub = self.pubsub
sleep_time = self.sleep_time
while self._running.is_set():
try:
pubsub.get_message(ignore_subscribe_messages=True,
timeout=sleep_time)
except Exception:
print('get_message failed')
sleep(2)
pubsub.close()
def stop(self):
# trip the flag so the run loop exits. the run loop will
# close the pubsub connection, which disconnects the socket
# and returns the connection to the pool.
self._running.clear()
thread = PubSubWorkerThread(pubsub, 1)
thread.start()
for i in itertools.count():
try:
redis.publish('topic', str(i))
except Exception:
print('publish failed')
sleep(1)
Re-run the minimal reproduction on your broken version, then apply the fix and re-run.
Option A — Upgrade to fixed release\npip install redis==7.1.0\nWhen NOT to use: This fix should not be used if the application requires strict connection handling without automatic retries.\n\n
Why This Fix Works in Production
- Trigger: pubsub.get_message(ignore_subscribe_messages=True,
- Mechanism: PubSub's get_message method did not handle connection errors properly, causing it to fail on reconnect
- Why the fix works: Auto-reconnect functionality for PubSub's get_message method was implemented to handle connection errors properly. (first fixed release: 7.1.0).
- If left unfixed, the same config can fail only in production (env differences), causing startup failures or partial feature outages.
Why This Breaks in Prod
- Shows up under Python 3.9.6 in real deployments (not just unit tests).
- PubSub's get_message method did not handle connection errors properly, causing it to fail on reconnect
- Surfaces as: Traceback (most recent call last):
Proof / Evidence
- GitHub issue: #1572
- Fix PR: https://github.com/redis/redis-py/pull/1574
- First fixed release: 7.1.0
- Reproduced locally: No (not executed)
- Last verified: 2026-02-09
- Confidence: 0.85
- Did this fix it?: Yes (upstream fix exists)
- Own content ratio: 0.34
Discussion
High-signal excerpts from the issue thread (symptoms, repros, edge-cases).
“Thanks @luhn for the great write up, test case, and fix. Much appreciated! Merging.”
Failure Signature (Search String)
- pubsub.get_message(ignore_subscribe_messages=True,
Error Message
Stack trace
Error Message
-------------
Traceback (most recent call last):
File "/Users/luhn/Code/redis-py/test.py", line 24, in run
pubsub.get_message(ignore_subscribe_messages=True,
File "/Users/luhn/Code/redis-py/redis/client.py", line 3617, in get_message
response = self.parse_response(block=False, timeout=timeout)
File "/Users/luhn/Code/redis-py/redis/client.py", line 3503, in parse_response
if not block and not conn.can_read(timeout=timeout):
File "/Users/luhn/Code/redis-py/redis/connection.py", line 734, in can_read
return self._parser.can_read(timeout)
File "/Users/luhn/Code/redis-py/redis/connection.py", line 321, in can_read
return self._buffer and self._buffer.can_read(timeout)
File "/Users/luhn/Code/redis-py/redis/connection.py", line 230, in can_read
self._read_from_socket(timeout=timeout,
File "/Users/luhn/Code/redis-py/redis/connection.py", line 201, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
Minimal Reproduction
import redis
import itertools
import threading
from time import sleep
redis = redis.Redis()
pubsub = redis.pubsub()
pubsub.subscribe(topic=print)
class PubSubWorkerThread(threading.Thread):
def __init__(self, pubsub, sleep_time, daemon=False):
super(PubSubWorkerThread, self).__init__()
self.daemon = daemon
self.pubsub = pubsub
self.sleep_time = sleep_time
self._running = threading.Event()
def run(self):
if self._running.is_set():
return
self._running.set()
pubsub = self.pubsub
sleep_time = self.sleep_time
while self._running.is_set():
try:
pubsub.get_message(ignore_subscribe_messages=True,
timeout=sleep_time)
except Exception:
print('get_message failed')
sleep(2)
pubsub.close()
def stop(self):
# trip the flag so the run loop exits. the run loop will
# close the pubsub connection, which disconnects the socket
# and returns the connection to the pool.
self._running.clear()
thread = PubSubWorkerThread(pubsub, 1)
thread.start()
for i in itertools.count():
try:
redis.publish('topic', str(i))
except Exception:
print('publish failed')
sleep(1)
Environment
- Python: 3.9.6
What Broke
get_message fails to reconnect after Redis server restarts, leading to message retrieval issues.
Why It Broke
PubSub's get_message method did not handle connection errors properly, causing it to fail on reconnect
Fix Options (Details)
Option A — Upgrade to fixed release Safe default (recommended)
pip install redis==7.1.0
Use when you can deploy the upstream fix. It is usually lower-risk than long-lived workarounds.
Option D — Guard side-effects with OnceOnly Guardrail for side-effects
Mitigate duplicate external side-effects under retries/timeouts/agent loops by gating the operation before calling external systems.
- Place OnceOnly between your code/agent and real side-effects (Stripe, emails, CRM, APIs).
- Use a stable key per side-effect (e.g., customer_id + action + idempotency_key).
- Fail-safe: configure fail-open vs fail-closed based on blast radius and spend risk.
Show example snippet (optional)
from onceonly import OnceOnly
import os
once = OnceOnly(api_key=os.environ["ONCEONLY_API_KEY"], fail_open=True)
# Stable idempotency key per real side-effect.
# Use a request id / job id / webhook delivery id / Stripe event id, etc.
event_id = "evt_..." # replace
key = f"stripe:webhook:{event_id}"
res = once.check_lock(key=key, ttl=3600)
if res.duplicate:
return {"status": "already_processed"}
# Safe to execute the side-effect exactly once.
handle_event(event_id)
Fix reference: https://github.com/redis/redis-py/pull/1574
First fixed release: 7.1.0
Last verified: 2026-02-09. Validate in your environment.
When NOT to Use This Fix
- This fix should not be used if the application requires strict connection handling without automatic retries.
- Do not use this to hide logic bugs or data corruption. Use it to block duplicate external side-effects and enforce tool permissions/spend caps.
Verify Fix
Re-run the minimal reproduction on your broken version, then apply the fix and re-run.
Did This Fix Work in Your Case?
Quick signal helps us prioritize which fixes to verify and improve.
Prevention
- Make timeouts explicit and test them (unit + integration) to avoid silent behavior changes.
- Instrument retries (attempt count + reason) and alert on spikes to catch dependency slowdowns.
Version Compatibility Table
| Version | Status |
|---|---|
| 7.1.0 | Fixed |
Related Issues
No related fixes found.
Sources
We don’t republish the full GitHub discussion text. Use the links above for context.