mirror of
https://github.com/Manoj-HV30/clawrity.git
synced 2026-05-16 19:35:21 +00:00
response redundancy fixed and proper backend communication
This commit is contained in:
+152
-89
@@ -31,8 +31,8 @@ logger = logging.getLogger(__name__)
|
||||
# Thread pool for processing LLM pipeline without blocking event handlers
|
||||
_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="clawrity-slack")
|
||||
|
||||
# Module-level guard: only one SlackHandler should be active at a time
|
||||
_active_handler: Optional["SlackHandler"] = None
|
||||
# Module-level reference to prevent multiple handlers
|
||||
_active_handler = None
|
||||
|
||||
|
||||
class SlackHandler:
|
||||
@@ -51,32 +51,25 @@ class SlackHandler:
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# Bot Token (xoxb-...) — from .env SLACK_BOT_TOKEN
|
||||
# This is the OAuth token installed to your workspace.
|
||||
# ---------------------------------------------------------------
|
||||
self.bot_token = settings.slack_bot_token
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# App-Level Token (xapp-...) — from .env SLACK_APP_TOKEN
|
||||
# Required for Socket Mode. Generated in Slack app settings.
|
||||
# ---------------------------------------------------------------
|
||||
self.app_token = settings.slack_app_token
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# Signing Secret — from .env SLACK_SIGNING_SECRET
|
||||
# Used to verify incoming requests from Slack.
|
||||
# ---------------------------------------------------------------
|
||||
self.signing_secret = settings.slack_signing_secret
|
||||
|
||||
self.app = None
|
||||
self.handler = None
|
||||
|
||||
# Deduplication: track recently processed event timestamps
|
||||
# Slack retries events if handler is slow — this prevents duplicates
|
||||
self._processed_events: Set[str] = set()
|
||||
# Deduplication: track recently processed message timestamps.
|
||||
# Slack Socket Mode retries deliver different envelope_ids but
|
||||
# the underlying message "ts" stays the same.
|
||||
self._processed_ts: Set[str] = set()
|
||||
self._processed_lock = threading.Lock()
|
||||
|
||||
# Per-user processing lock: prevents duplicate responses when
|
||||
# Slack delivers the same event multiple times before dedup catches it.
|
||||
# Only one message per user is processed at a time.
|
||||
self._busy_users: Set[str] = set()
|
||||
self._busy_lock = threading.Lock()
|
||||
|
||||
def _validate_tokens(self) -> bool:
|
||||
"""Check that all required Slack tokens are configured."""
|
||||
if not self.bot_token:
|
||||
@@ -93,47 +86,51 @@ class SlackHandler:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _is_duplicate_event(self, event: dict) -> bool:
|
||||
"""Check if we've already processed this event (Slack retry dedup)."""
|
||||
# Use multiple fields to build a robust dedup key.
|
||||
# client_msg_id is unique per user message (present on message events,
|
||||
# but NOT on app_mention events). event_ts is present on both.
|
||||
# We store keys for all strategies so cross-event-type dedup works.
|
||||
msg_id = event.get("client_msg_id")
|
||||
event_ts = event.get("event_ts") or event.get("ts", "")
|
||||
user = event.get("user", "")
|
||||
def _is_duplicate(self, event: dict) -> bool:
|
||||
"""
|
||||
De-duplicate events using the message 'ts' field.
|
||||
|
||||
# Build candidate keys
|
||||
keys = set()
|
||||
if msg_id:
|
||||
keys.add(f"msg:{msg_id}")
|
||||
if event_ts:
|
||||
keys.add(f"ts:{event_ts}")
|
||||
# Fallback: combine event type + ts + user for events without client_msg_id
|
||||
event_type = event.get("type", "")
|
||||
if event_ts and user:
|
||||
keys.add(f"evt:{event_type}:{event_ts}:{user}")
|
||||
|
||||
if not keys:
|
||||
When Slack retries an event via Socket Mode, it delivers a new
|
||||
envelope with a different envelope_id/event_ts, but the underlying
|
||||
message timestamp ('ts') is identical. We key on 'ts' to catch retries.
|
||||
"""
|
||||
ts = event.get("ts", "")
|
||||
if not ts:
|
||||
logger.info(f"DEDUP: No ts in event, skipping dedup check")
|
||||
return False
|
||||
|
||||
with self._processed_lock:
|
||||
# Check ALL keys — if any match, it's a duplicate
|
||||
for key in keys:
|
||||
if key in self._processed_events:
|
||||
logger.debug(f"Skipping duplicate event (matched key: {key})")
|
||||
return True
|
||||
if ts in self._processed_ts:
|
||||
logger.info(f"DEDUP: Duplicate detected ts={ts}")
|
||||
return True
|
||||
self._processed_ts.add(ts)
|
||||
logger.info(f"DEDUP: New event registered ts={ts}")
|
||||
|
||||
# Register ALL keys so cross-event-type dedup works
|
||||
# (app_mention and message for the same user message share event_ts)
|
||||
self._processed_events.update(keys)
|
||||
|
||||
# Prune old entries (keep set from growing indefinitely)
|
||||
if len(self._processed_events) > 500:
|
||||
self._processed_events = set(list(self._processed_events)[-200:])
|
||||
# Prune old entries
|
||||
if len(self._processed_ts) > 500:
|
||||
self._processed_ts = set(list(self._processed_ts)[-200:])
|
||||
|
||||
return False
|
||||
|
||||
def _acquire_user(self, user_id: str) -> bool:
|
||||
"""
|
||||
Try to acquire the per-user processing lock.
|
||||
Returns True if acquired (caller should process), False if already busy.
|
||||
"""
|
||||
with self._busy_lock:
|
||||
if user_id in self._busy_users:
|
||||
logger.info(f"DEDUP: User {user_id} already being processed, skipping")
|
||||
return False
|
||||
self._busy_users.add(user_id)
|
||||
logger.info(f"DEDUP: Acquired user {user_id}")
|
||||
return True
|
||||
|
||||
def _release_user(self, user_id: str):
|
||||
"""Release the per-user processing lock."""
|
||||
with self._busy_lock:
|
||||
self._busy_users.discard(user_id)
|
||||
logger.info(f"DEDUP: Released user {user_id}")
|
||||
|
||||
def _setup_app(self):
|
||||
"""Initialize Slack Bolt App and register event handlers."""
|
||||
from slack_bolt import App
|
||||
@@ -156,15 +153,24 @@ class SlackHandler:
|
||||
# --- Event: Bot mentioned in a channel ---
|
||||
@self.app.event("app_mention")
|
||||
def handle_mention(event, say, context):
|
||||
# Return IMMEDIATELY so Slack gets ack — process in background
|
||||
if self._is_duplicate_event(event):
|
||||
user_id = event.get("user", "")
|
||||
ts = event.get("ts", "")
|
||||
text = event.get("text", "")[:120]
|
||||
channel = event.get("channel", "")
|
||||
logger.info(
|
||||
f"[app_mention] ts={ts} user={user_id} channel={channel} text={text}"
|
||||
)
|
||||
if self._is_duplicate(event):
|
||||
return
|
||||
_executor.submit(self._handle_event, event, say, context)
|
||||
if not self._acquire_user(user_id):
|
||||
return
|
||||
logger.info(f"[app_mention] Submitting to thread pool for user={user_id}")
|
||||
_executor.submit(self._handle_event_safe, event, say, context)
|
||||
|
||||
# --- Event: Direct message to bot ---
|
||||
@self.app.event("message")
|
||||
def handle_message(event, say, context):
|
||||
# Ignore bot's own messages and message_changed events
|
||||
# Ignore bot's own messages and subtypes
|
||||
if event.get("subtype") in (
|
||||
"bot_message",
|
||||
"message_changed",
|
||||
@@ -173,56 +179,113 @@ class SlackHandler:
|
||||
return
|
||||
if event.get("bot_id"):
|
||||
return
|
||||
# Ignore if this is from the bot itself
|
||||
if self._bot_user_id and event.get("user") == self._bot_user_id:
|
||||
return
|
||||
# Skip channel messages that contain a bot mention —
|
||||
# those are handled by the app_mention handler above.
|
||||
# Only process DMs here (channel_type == "im").
|
||||
channel_type = event.get("channel_type", "")
|
||||
if channel_type != "im":
|
||||
# Only DMs — channel mentions are handled by app_mention
|
||||
if event.get("channel_type", "") != "im":
|
||||
return
|
||||
if self._is_duplicate_event(event):
|
||||
|
||||
user_id = event.get("user", "")
|
||||
ts = event.get("ts", "")
|
||||
text = event.get("text", "")[:120]
|
||||
logger.info(f"[message/DM] ts={ts} user={user_id} text={text}")
|
||||
if self._is_duplicate(event):
|
||||
return
|
||||
# Return IMMEDIATELY — process in background
|
||||
_executor.submit(self._handle_event, event, say, context)
|
||||
if not self._acquire_user(user_id):
|
||||
return
|
||||
logger.info(f"[message/DM] Submitting to thread pool for user={user_id}")
|
||||
_executor.submit(self._handle_event_safe, event, say, context)
|
||||
|
||||
self.handler = SocketModeHandler(self.app, self.app_token)
|
||||
|
||||
def _handle_event_safe(self, event: dict, say, context):
|
||||
"""Wrapper that catches all exceptions and releases user lock."""
|
||||
user_id = event.get("user", "")
|
||||
event_ts = event.get("ts", "")
|
||||
text_preview = event.get("text", "")[:80]
|
||||
logger.info(
|
||||
f"[handle_event_safe] START user={user_id} ts={event_ts} text={text_preview}"
|
||||
)
|
||||
try:
|
||||
self._handle_event(event, say, context)
|
||||
logger.info(f"[handle_event_safe] DONE user={user_id} ts={event_ts}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[handle_event_safe] UNHANDLED ERROR user={user_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
try:
|
||||
say(
|
||||
"❌ I encountered an error processing your request. Please try again."
|
||||
)
|
||||
except Exception as say_err:
|
||||
logger.error(
|
||||
f"[handle_event_safe] Failed to send error to Slack: {say_err}"
|
||||
)
|
||||
finally:
|
||||
self._release_user(user_id)
|
||||
|
||||
def _handle_event(self, event: dict, say, context):
|
||||
"""Process an incoming Slack event (runs in background thread)."""
|
||||
team_id = context.get("team_id", None) if context else None
|
||||
message = self.adapter.normalise_slack(event, team_id=team_id)
|
||||
logger.info(
|
||||
f"[handle_event] normalised: client_id={message.client_id} "
|
||||
f"text={message.text[:60] if message.text else '(empty)'}"
|
||||
)
|
||||
|
||||
if not message.text:
|
||||
logger.info("[handle_event] empty text, returning")
|
||||
return
|
||||
|
||||
if message.client_id == "unknown":
|
||||
say("⚠️ Could not identify your workspace. Please contact support.")
|
||||
return
|
||||
|
||||
client_config = self.client_configs.get(message.client_id)
|
||||
if not client_config:
|
||||
say(f"⚠️ No configuration found for client: {message.client_id}")
|
||||
return
|
||||
|
||||
logger.info("[handle_event] calling orchestrator...")
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
team_id = context.get("team_id", None) if context else None
|
||||
message = self.adapter.normalise_slack(event, team_id=team_id)
|
||||
result = loop.run_until_complete(
|
||||
self.orchestrator.process(message, client_config)
|
||||
)
|
||||
response_text = result.get("response", "")
|
||||
if not response_text:
|
||||
response_text = "I wasn't able to generate a response. Please try rephrasing your question."
|
||||
|
||||
if not message.text:
|
||||
return
|
||||
|
||||
if message.client_id == "unknown":
|
||||
say("⚠️ Could not identify your workspace. Please contact support.")
|
||||
return
|
||||
|
||||
client_config = self.client_configs.get(message.client_id)
|
||||
if not client_config:
|
||||
say(f"⚠️ No configuration found for client: {message.client_id}")
|
||||
return
|
||||
|
||||
# Run the orchestrator pipeline (async in sync context)
|
||||
loop = asyncio.new_event_loop()
|
||||
try:
|
||||
result = loop.run_until_complete(
|
||||
self.orchestrator.process(message, client_config)
|
||||
)
|
||||
say(result["response"])
|
||||
finally:
|
||||
loop.close()
|
||||
logger.info(
|
||||
f"[handle_event] orchestrator done, response={len(response_text)} chars, "
|
||||
f"qa_score={result.get('qa_score', 0):.2f}, retries={result.get('retries', 0)}"
|
||||
)
|
||||
|
||||
# Slack has a 4000 char limit for messages; split if needed
|
||||
if len(response_text) > 3900:
|
||||
chunks = [
|
||||
response_text[i : i + 3900]
|
||||
for i in range(0, len(response_text), 3900)
|
||||
]
|
||||
for i, chunk in enumerate(chunks):
|
||||
say(chunk)
|
||||
logger.info(f"[handle_event] sent chunk {i + 1}/{len(chunks)}")
|
||||
else:
|
||||
say(response_text)
|
||||
logger.info("[handle_event] say() called successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Slack event handler error: {e}", exc_info=True)
|
||||
say(
|
||||
error_msg = (
|
||||
"❌ I encountered an error processing your request. "
|
||||
"Please try again or contact support."
|
||||
)
|
||||
try:
|
||||
say(error_msg)
|
||||
except Exception as say_err:
|
||||
logger.error(f"Failed to send error message to Slack: {say_err}")
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
def start(self):
|
||||
"""Start the Slack bot in a background thread."""
|
||||
|
||||
Reference in New Issue
Block a user