diff --git a/.env.example b/.env.example index 0f886f0..4aa0aa1 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,9 @@ # --- Groq API (free at https://console.groq.com) --- GROQ_API_KEY= +# --- NVIDIA NIM API (alternative to Groq — set one or the other) --- +# NVIDIA_API_KEY= + # --- PostgreSQL + pgvector (docker-compose handles this if using defaults) --- DATABASE_URL=postgresql://user:pass@localhost:5432/clawrity @@ -23,4 +26,5 @@ SLACK_SIGNING_SECRET= TAVILY_API_KEY= # --- Slack Webhook for digest delivery --- +# Create at: Slack App → Incoming Webhooks → Add New Webhook to Workspace ACME_SLACK_WEBHOOK= diff --git a/.gitignore b/.gitignore index 9422ce3..7de4bb6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,10 @@ # === Environment & Secrets === .env *.env +.env.local +.env.production -# === Dataset files — never commit raw or processed data === +# === Dataset files === data/raw/ data/processed/ @@ -12,24 +14,34 @@ __pycache__/ *$py.class *.so *.egg-info/ +*.egg dist/ build/ -*.egg +eggs/ +*.whl +pip-log.txt +pip-delete-this-directory.txt # === Virtual Environment === venv/ .venv/ env/ +ENV/ # === IDE === .vscode/ .idea/ *.swp *.swo +*~ +.project +.settings/ # === OS === .DS_Store Thumbs.db +ehthumbs.db +Desktop.ini # === Logs === logs/ @@ -39,5 +51,25 @@ logs/ # === Docker === pg_data/ -# === Model Cache === +# === Model / Embedding Cache === .cache/ +.cache/huggingface/ +.cache/torch/ + +# === Ruff === +.ruff_cache/ + +# === Jupyter === +.ipynb_checkpoints/ +*.ipynb + +# === Testing === +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml + +# === Misc === +codebase_structure.txt +*.bak +*.tmp diff --git a/README.md b/README.md index a59f470..59ef8de 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,30 @@ # Clawrity -**Multi-channel AI business intelligence agent.** Enterprise clients interact via Slack (or Teams) and get data-grounded answers, daily digests, budget recommendations, ROI forecasts, and competitor/sector intelligence — all specific to their business data. +**Multi-channel AI business intelligence agent.** Ask questions in natural language via Slack or REST API and get data-grounded answers with specific numbers, daily digests, budget recommendations, ROI forecasts, and competitor intelligence. --- ## Architecture -Built on the **OpenClaw pattern**: -- **ProtocolAdapter** — normalises messages from any channel (Slack, Teams, etc.) -- **SOUL.md** — per-client personality, rules, and business context +``` +User (Slack/API) → ProtocolAdapter → Orchestrator → NL-to-SQL → PostgreSQL + ↓ + Gen Agent (LLM) → QA Agent → Response + ↑ + RAG Retriever (pgvector) + ↑ + Scout Agent (web search) +``` + +- **Orchestrator** — coordinates the full pipeline with retry logic +- **Gen Agent** — generates data-grounded responses with specific figures +- **QA Agent** — validates responses for hallucinations (branch names, numbers) +- **Scout Agent** — fetches competitor/sector news via Tavily +- **RAG Retriever** — semantic search over historical business data (pgvector) +- **SOUL.md** — per-client personality and rules - **HEARTBEAT.md** — autonomous daily digest scheduling -All intelligence lives in the Clawrity backend. OpenClaw layer has zero business logic. +--- ## Tech Stack @@ -19,29 +32,60 @@ All intelligence lives in the Clawrity backend. OpenClaw layer has zero business |---|---| | Language | Python 3.11 | | API Framework | FastAPI + uvicorn | -| LLM | Groq API — llama-3.3-70b-versatile | -| Embeddings | sentence-transformers all-MiniLM-L6-v2 (CPU, 384d) | +| LLM | Groq (llama-3.3-70b-versatile) or NVIDIA NIM | +| Embeddings | sentence-transformers all-MiniLM-L6-v2 (384d) | | Database | PostgreSQL + pgvector | -| Channel (dev) | Slack Bolt SDK (Socket Mode) | -| Channel (demo) | Microsoft Teams Bot Framework SDK | -| Scheduler | APScheduler AsyncIOScheduler | +| Channel | Slack Bolt SDK (Socket Mode) | +| Scheduler | APScheduler | | Web Search | Tavily API + DuckDuckGo fallback | | Forecasting | Prophet | -## Quick Start +--- -### 1. Prerequisites +## Quick Start (From Scratch) + +### Prerequisites - Python 3.11+ - Docker & Docker Compose -- Groq API key (free: https://console.groq.com) -- Tavily API key (free: https://app.tavily.com) +- [Groq API key](https://console.groq.com) (free) +- [Tavily API key](https://app.tavily.com) (free) -### 2. Environment Setup +### 1. Clone & Setup + +```bash +git clone +cd clawrity + +# Create virtual environment +python3 -m venv venv +source venv/bin/activate # Linux/Mac +# venv\Scripts\activate # Windows + +# Install dependencies +pip install -r requirements.txt +``` + +### 2. Configure Environment ```bash cp .env.example .env -# Fill in your API keys in .env +``` + +Edit `.env` and fill in your keys: + +```env +GROQ_API_KEY=gsk_... # from console.groq.com +DATABASE_URL=postgresql://user:pass@localhost:5432/clawrity +TAVILY_API_KEY=tvly-... # from app.tavily.com + +# Slack (optional — for Slack integration) +SLACK_BOT_TOKEN=xoxb-... +SLACK_APP_TOKEN=xapp-... +SLACK_SIGNING_SECRET=... + +# Digest webhook (optional) +ACME_SLACK_WEBHOOK=https://hooks.slack.com/services/... ``` ### 3. Start PostgreSQL + pgvector @@ -50,27 +94,26 @@ cp .env.example .env docker compose up -d postgres ``` -### 4. Install Dependencies +Wait ~10 seconds for PostgreSQL to initialize, then verify: ```bash -python -m venv venv -source venv/bin/activate -pip install -r requirements.txt +docker compose ps +# postgres should show "healthy" ``` -### 5. Download Kaggle Datasets +### 4. Download Datasets -Download these two datasets and place them in `data/raw/`: +Download these two Kaggle datasets and place the files in `data/raw/`: 1. **Global Superstore**: https://kaggle.com/datasets/apoorvaappz/global-super-store-dataset 2. **Marketing Campaign Performance**: https://kaggle.com/datasets/manishabhatt22/marketing-campaign-performance-dataset ```bash mkdir -p data/raw data/processed -# Place downloaded files in data/raw/ +# Place Global_Superstore2.csv and marketing_campaign_dataset.csv in data/raw/ ``` -### 6. Seed Demo Data +### 5. Seed Demo Data ```bash python scripts/seed_demo_data.py --client_id acme_corp \ @@ -78,64 +121,86 @@ python scripts/seed_demo_data.py --client_id acme_corp \ --marketing data/raw/marketing_campaign_dataset.csv ``` -### 7. Run RAG Pipeline +### 6. Run RAG Pipeline ```bash python scripts/run_rag_pipeline.py --client_id acme_corp ``` -### 8. Start the API +### 7. Start the Server ```bash uvicorn main:app --reload --port 8000 ``` +Server runs at `http://localhost:8000`. Health check: `http://localhost:8000/health` + +--- + +## Test the API + +```bash +# Simple question +curl -X POST http://localhost:8000/chat \ + -H "Content-Type: application/json" \ + -d '{"client_id": "acme_corp", "message": "What is the total revenue for the Seattle branch?"}' + +# Recommendation question +curl -X POST http://localhost:8000/chat \ + -H "Content-Type: application/json" \ + -d '{"client_id": "acme_corp", "message": "How can we improve revenue for the Seattle branch?"}' + +# Trigger digest +curl -X POST http://localhost:8000/digest \ + -H "Content-Type: application/json" \ + -d '{"client_id": "acme_corp"}' +``` + --- ## Slack Bot Setup (Socket Mode) -### Step 1: Create Slack App +### 1. Create Slack App 1. Go to https://api.slack.com/apps 2. Click **Create New App** → **From scratch** 3. Name it `Clawrity` and select your workspace -### Step 2: Enable Socket Mode +### 2. Enable Socket Mode -1. In the left sidebar, click **Socket Mode** -2. Toggle **Enable Socket Mode** to ON -3. Click **Generate Token** — name it `clawrity-socket` -4. Copy the `xapp-...` token → paste into `.env` as `SLACK_APP_TOKEN` +1. Left sidebar → **Socket Mode** → Toggle ON +2. Generate Token → name it `clawrity-socket` +3. Copy the `xapp-...` token → paste into `.env` as `SLACK_APP_TOKEN` -### Step 3: Configure Bot Token +### 3. Configure Bot Permissions -1. Go to **OAuth & Permissions** -2. Under **Bot Token Scopes**, add: +1. **OAuth & Permissions** → **Bot Token Scopes**, add: - `app_mentions:read` - `chat:write` - `channels:history` - `channels:read` -3. Click **Install to Workspace** -4. Copy the `xoxb-...` token → paste into `.env` as `SLACK_BOT_TOKEN` + - `im:history` + - `im:read` + - `im:write` +2. Click **Install to Workspace** +3. Copy the `xoxb-...` token → paste into `.env` as `SLACK_BOT_TOKEN` -### Step 4: Enable Events +### 4. Enable Events -1. Go to **Event Subscriptions** -2. Toggle **Enable Events** to ON (no Request URL needed in Socket Mode) -3. Under **Subscribe to bot events**, add: +1. **Event Subscriptions** → Toggle ON +2. Under **Subscribe to bot events**, add: - `app_mention` - `message.channels` -4. Click **Save Changes** + - `message.im` +3. Click **Save Changes** -### Step 5: Get Signing Secret +### 5. Get Signing Secret -1. Go to **Basic Information** -2. Under **App Credentials**, copy **Signing Secret** -3. Paste into `.env` as `SLACK_SIGNING_SECRET` +1. **Basic Information** → **App Credentials** +2. Copy **Signing Secret** → paste into `.env` as `SLACK_SIGNING_SECRET` -### Step 6: Invite Bot to Channel +### 6. Invite Bot to Channel -In Slack, go to your desired channel and type: ``` /invite @Clawrity ``` @@ -146,19 +211,40 @@ In Slack, go to your desired channel and type: | Method | Path | Description | |--------|------|-------------| -| POST | `/chat` | Send message → get AI response | -| POST | `/slack/events` | Slack webhook fallback | -| POST | `/compare` | Side-by-side RAG vs no-RAG | -| POST | `/forecast/run/{client_id}` | Trigger Prophet forecasting | -| GET | `/forecast/{client_id}/{branch}` | Get cached forecast | -| GET | `/admin/stats/{client_id}` | RAG monitoring stats | -| GET | `/health` | System status | +| `POST` | `/chat` | Send message → get AI response | +| `POST` | `/compare` | Side-by-side RAG vs no-RAG comparison | +| `POST` | `/scout` | Targeted competitor/market intelligence search | +| `POST` | `/scout/digest` | Full scout agent digest for a client | +| `POST` | `/digest` | Manually trigger daily digest pipeline | +| `GET` | `/admin/stats/{client_id}` | RAG monitoring stats | +| `POST` | `/forecast/run/{client_id}` | Trigger Prophet forecasting | +| `GET` | `/forecast/{client_id}/{branch}` | Get cached forecast | +| `GET` | `/health` | System health check | + +--- + +## Example Questions to Ask + +| Category | Question | +|----------|----------| +| Simple data | "What is the total revenue for the Seattle branch?" | +| Channel analysis | "Show me revenue by channel for Seattle" | +| Rankings | "What are the top 5 branches by revenue?" | +| ROI | "What is the ROI for New York City?" | +| Country drill-down | "Show me total revenue by country for Australia" | +| Recommendations | "How can we improve revenue for the Seattle branch?" | +| Strategy | "What strategy would you recommend for the London branch?" | +| Trends | "What is the revenue trend from 2011 to 2014?" | +| Channel comparison | "Which channel has the highest ROI overall?" | +| Bottom performers | "What are the bottom 10 performing branches?" | + +--- ## Adding a New Client -1. Create `config/clients/client_newclient.yaml` (copy from `client_acme.yaml`) -2. Create `soul/newclient_soul.md` -3. Create `heartbeat/newclient_heartbeat.md` +1. Create `config/clients/client_.yaml` (copy from `client_acme.yaml`) +2. Create `soul/_soul.md` with personality/rules +3. Create `heartbeat/_heartbeat.md` with schedule 4. Place data in `data/raw/` and run seed + RAG scripts 5. Restart — zero code changes required @@ -168,46 +254,69 @@ In Slack, go to your desired channel and type: ``` clawrity/ -├── main.py # FastAPI application -├── config/ # Configuration -│ ├── settings.py # pydantic-settings from .env -│ ├── client_loader.py # YAML client config loader -│ └── clients/client_acme.yaml # Per-client config -├── soul/ # Per-client personality -│ ├── soul_loader.py -│ └── acme_soul.md -├── heartbeat/ # Autonomous digest scheduling -│ ├── heartbeat_loader.py -│ ├── scheduler.py -│ └── acme_heartbeat.md -├── agents/ # AI agents -│ ├── gen_agent.py # Response generation -│ ├── qa_agent.py # Quality assurance -│ ├── orchestrator.py # Pipeline coordinator +├── main.py # FastAPI application + lifespan +├── agents/ +│ ├── orchestrator.py # Pipeline coordinator (retry loop) +│ ├── gen_agent.py # LLM response generation +│ ├── qa_agent.py # Hallucination checker │ └── scout_agent.py # Competitor intelligence -├── skills/ # Capabilities -│ ├── postgres_connector.py # DB connection pool -│ ├── nl_to_sql.py # Natural language → SQL -│ └── web_search.py # Tavily + DuckDuckGo -├── channels/ # Message channels -│ ├── protocol_adapter.py # OpenClaw normalisation +├── config/ +│ ├── settings.py # pydantic-settings from .env +│ ├── llm_client.py # LLM factory (Groq/NVIDIA) with retry +│ ├── client_loader.py # YAML client config loader +│ └── clients/client_acme.yaml +├── channels/ +│ ├── protocol_adapter.py # Message normalisation │ ├── slack_handler.py # Slack Socket Mode │ └── teams_handler.py # Teams stub -├── rag/ # Retrieval-augmented generation -│ ├── preprocessor.py -│ ├── chunker.py -│ ├── vector_store.py -│ ├── retriever.py -│ ├── evaluator.py -│ └── monitoring.py +├── skills/ +│ ├── nl_to_sql.py # Natural language → SQL +│ ├── postgres_connector.py # PostgreSQL + pgvector +│ └── web_search.py # Tavily + DuckDuckGo +├── rag/ +│ ├── preprocessor.py # Data cleaning +│ ├── chunker.py # Semantic chunking +│ ├── vector_store.py # Embed + pgvector store +│ ├── retriever.py # Intent-based retrieval +│ ├── evaluator.py # RAG quality metrics +│ └── monitoring.py # JSONL interaction logging +├── soul/ +│ ├── soul_loader.py +│ └── acme_soul.md +├── heartbeat/ +│ ├── heartbeat_loader.py +│ ├── scheduler.py # APScheduler digest jobs +│ └── acme_heartbeat.md ├── forecasting/ -│ └── prophet_engine.py +│ └── prophet_engine.py # Prophet time series ├── connectors/ │ ├── base_connector.py │ └── csv_connector.py ├── etl/ │ └── normaliser.py -└── scripts/ - ├── seed_demo_data.py - └── run_rag_pipeline.py +├── scripts/ +│ ├── seed_demo_data.py # Seed PostgreSQL from CSV +│ └── run_rag_pipeline.py # Preprocess → chunk → embed +├── docker-compose.yml +├── Dockerfile +└── requirements.txt ``` + +--- + +## Troubleshooting + +| Issue | Fix | +|-------|-----| +| `Connection refused` on /chat | PostgreSQL not running — `docker compose up -d postgres` | +| `Rate limited (429)` | LLM API throttling — system auto-retries with backoff | +| `No module named 'X'` | Activate venv: `source venv/bin/activate` | +| Slack bot not responding | Check `SLACK_BOT_TOKEN` and `SLACK_APP_TOKEN` in `.env` | +| `Clawrity digest unavailable` | Set valid `ACME_SLACK_WEBHOOK` in `.env` | +| Embeddings slow on first run | MiniLM downloads ~80MB on first use — subsequent runs are cached | + +--- + +## License + +Private — internal use only. diff --git a/agents/gen_agent.py b/agents/gen_agent.py index c828626..3d24c04 100644 --- a/agents/gen_agent.py +++ b/agents/gen_agent.py @@ -12,7 +12,7 @@ from typing import List, Optional, Dict import pandas as pd -from config.llm_client import get_llm_client, get_model_name +from config.llm_client import get_llm_client, get_model_name, chat_with_retry logger = logging.getLogger(__name__) @@ -35,6 +35,7 @@ class GenAgent: retry_count: int = 0, strict_data_instruction: Optional[str] = None, supplementary_context: Optional[pd.DataFrame] = None, + sql: Optional[str] = None, ) -> str: """ Generate a data-grounded response. @@ -46,6 +47,7 @@ class GenAgent: rag_chunks: Retrieved chunks with similarity scores (Phase 2) retry_issues: QA Agent issues from previous attempt retry_count: Current retry number (0-2) + sql: The SQL query that produced the data context Returns: Markdown-formatted response string @@ -53,12 +55,19 @@ class GenAgent: temperature = max(0.1, self.base_temperature - (retry_count * 0.2)) prompt = self._build_prompt( - question, soul_content, data_context, rag_chunks, retry_issues, - strict_data_instruction, supplementary_context, + question, + soul_content, + data_context, + rag_chunks, + retry_issues, + strict_data_instruction, + supplementary_context, + sql, ) try: - response = self.client.chat.completions.create( + response = chat_with_retry( + self.client, model=self.model, messages=[ {"role": "system", "content": soul_content}, @@ -108,7 +117,8 @@ class GenAgent: Use bullet points, bold key numbers, and keep it concise.""" try: - response = self.client.chat.completions.create( + response = chat_with_retry( + self.client, model=self.model, messages=[ {"role": "system", "content": soul_content}, @@ -131,18 +141,28 @@ Use bullet points, bold key numbers, and keep it concise.""" retry_issues: Optional[List[str]], strict_data_instruction: Optional[str] = None, supplementary_context: Optional[pd.DataFrame] = None, + sql: Optional[str] = None, ) -> str: """Build the augmented prompt for response generation.""" parts = [] - # Strict data instruction (on retry — prevents hallucination) + # Strict data instruction — prevents hallucination if strict_data_instruction: parts.append(f"## ⚠️ STRICT REQUIREMENT\n{strict_data_instruction}\n") - # Data context + # SQL query that produced the data (so the model knows what filters were applied) + if sql: + parts.append(f"## SQL Query Used\n```sql\n{sql}\n```\n") + + # Data context with computed summaries if data_context is not None and len(data_context) > 0: parts.append("## Data Context (query results for the user's question)") parts.append(data_context.to_markdown(index=False)) + + # Compute summary statistics to help the LLM cite precise numbers + summary = self._compute_summary(data_context) + if summary: + parts.append(f"\n### Computed Summary\n{summary}") else: parts.append("## Data Context\nNo query results available.") @@ -150,22 +170,31 @@ Use bullet points, bold key numbers, and keep it concise.""" if supplementary_context is not None and len(supplementary_context) > 0: parts.append("\n## Benchmark Data (top-performing branches for comparison)") parts.append(supplementary_context.to_markdown(index=False)) + + bench_summary = self._compute_summary(supplementary_context) + if bench_summary: + parts.append(f"\n### Benchmark Summary\n{bench_summary}") + parts.append( - "\nUse this benchmark data to compare the queried branch's performance " - "against top performers. Identify which channels and strategies work " - "best, and recommend specific, actionable improvements based on what " - "top-performing branches are doing differently." + "\n### How to use benchmark data\n" + "Compare the queried branch's metrics against these top performers:\n" + "- If the queried branch's ROI is lower than benchmarks, recommend shifting budget to higher-ROI channels\n" + "- If a channel underperforms vs benchmarks, suggest reducing spend or optimizing it\n" + "- Cite SPECIFIC numbers: 'Your Email ROI is 2.29 vs the top performer's 2.50'\n" + "- Be concrete: 'Shift $X from Facebook to Email based on the ROI difference'" ) # RAG chunks (Phase 2) if rag_chunks: - parts.append("\n## Historical Business Context (retrieved from intelligence layer)") - if strict_data_instruction: - parts.append("⚠️ ONLY use historical context that is about branches/entities in the Data Context above. IGNORE any historical context about other branches.") + parts.append( + "\n## Historical Business Context (retrieved from intelligence layer)" + ) + parts.append( + "⚠️ ONLY use historical context that is about branches/entities in the Data Context above. IGNORE any historical context about other branches." + ) for i, chunk in enumerate(rag_chunks, 1): sim = chunk.get("similarity", 0) parts.append(f"{i}. {chunk['text']} (relevance: {sim:.2f})") - parts.append("\nBase suggestions on historical context. Cite specific data points.") # Retry instructions if retry_issues: @@ -173,12 +202,82 @@ Use bullet points, bold key numbers, and keep it concise.""" parts.append("Your previous response had these problems. Fix them:") for issue in retry_issues: parts.append(f"- {issue}") - parts.append("Be more precise. Only state facts supported by the data above.") - parts.append("Do NOT introduce any new branches, cities, or figures that are not in the Data Context.") + parts.append( + "Be more precise. Only state facts supported by the data above." + ) + parts.append( + "Do NOT introduce any new branches, cities, or figures that are not in the Data Context." + ) # User question parts.append(f"\n## User Question\n{question}") - parts.append("\nProvide a professional, data-grounded response. Cite specific numbers from the data.") + # Response quality instructions + parts.append( + "\n## Response Quality Rules\n" + "1. ALWAYS cite specific numbers from the Data Context (e.g., '$29,941 revenue', 'ROI of 2.29')\n" + "2. When comparing channels or branches, use EXACT figures from the data — never round unless using ~\n" + "3. For recommendations, reference specific metrics: 'Email has ROI 2.29 vs Facebook's 2.06 — consider reallocating budget'\n" + "4. Structure your answer with clear sections: Data Summary → Analysis → Recommendations\n" + "5. Do NOT give generic advice — every recommendation must tie to a specific data point\n" + "6. Do NOT mention branches, cities, or figures that are not in the Data Context above\n" + "7. Keep the response concise but data-dense — prefer bullet points over paragraphs" + ) return "\n".join(parts) + + def _compute_summary(self, df: pd.DataFrame) -> str: + """Compute summary statistics from a DataFrame to help the LLM cite precise numbers.""" + if df is None or len(df) == 0: + return "" + + lines = [] + numeric_cols = df.select_dtypes(include=["number"]).columns.tolist() + + # Total row + totals = {} + for col in numeric_cols: + total = df[col].sum() + if total != 0: + totals[col] = total + + if totals: + total_parts = [] + for col, val in totals.items(): + if val >= 1_000_000: + total_parts.append(f"Total {col}: ${val / 1_000_000:.2f}M") + elif val >= 1_000: + total_parts.append(f"Total {col}: ${val:,.2f}") + else: + total_parts.append(f"Total {col}: {val:,.0f}") + lines.append(" | ".join(total_parts)) + + # ROI if revenue and spend columns exist + rev_col = next((c for c in numeric_cols if "revenue" in c.lower()), None) + spend_col = next((c for c in numeric_cols if "spend" in c.lower()), None) + if rev_col and spend_col: + total_rev = df[rev_col].sum() + total_spend = df[spend_col].sum() + if total_spend > 0: + lines.append(f"Overall ROI: {total_rev / total_spend:.2f}") + + # Per-row highlights (top/bottom) + if rev_col and len(df) > 1: + idx_max = df[rev_col].idxmax() + idx_min = df[rev_col].idxmin() + label_col = None + for candidate in ["branch", "channel", "country", "name"]: + if candidate in df.columns: + label_col = candidate + break + if label_col: + top = df.loc[idx_max] + bot = df.loc[idx_min] + lines.append( + f"Highest {rev_col}: {top[label_col]} (${top[rev_col]:,.2f})" + ) + lines.append( + f"Lowest {rev_col}: {bot[label_col]} (${bot[rev_col]:,.2f})" + ) + + return "\n".join(lines) if lines else "" diff --git a/agents/orchestrator.py b/agents/orchestrator.py index 33356c0..bb952d2 100644 --- a/agents/orchestrator.py +++ b/agents/orchestrator.py @@ -118,7 +118,9 @@ class Orchestrator: qa_threshold = client_config.hallucination_threshold if supplementary_context is not None and len(supplementary_context) > 0: qa_threshold = min(qa_threshold, 0.5) - logger.info(f"Using relaxed QA threshold ({qa_threshold}) for enriched context") + logger.info( + f"Using relaxed QA threshold ({qa_threshold}) for enriched context" + ) best_response = None best_score = 0.0 @@ -128,23 +130,23 @@ class Orchestrator: for attempt in range(MAX_RETRIES + 1): retry_issues = qa_result["issues"] if attempt > 0 else None - # On retry, add explicit data-only instruction to prevent hallucination - strict_data_instruction = None - if attempt > 0: - if supplementary_context is not None and len(supplementary_context) > 0: - strict_data_instruction = ( - "CRITICAL: Only use data from the Data Context and Benchmark Data " - "sections provided. Do NOT invent figures or branch names that are " - "not present in either of those sections. You MAY reference benchmark " - "branches for comparison and recommendations." - ) - else: - strict_data_instruction = ( - "CRITICAL: Do NOT mention any branches, figures, or historical data " - "that are not in the SQL query result provided. Stick strictly to the " - "data. If historical context from RAG is about different branches than " - "what the query returned, IGNORE that context entirely." - ) + # Always provide strict data grounding instruction to prevent + # the Gen Agent from hallucinating branch/figure data from RAG + # chunks that don't match the actual SQL query results. + if supplementary_context is not None and len(supplementary_context) > 0: + strict_data_instruction = ( + "CRITICAL: Only use data from the Data Context and Benchmark Data " + "sections provided. Do NOT invent figures or branch names that are " + "not present in either of those sections. You MAY reference benchmark " + "branches for comparison and recommendations." + ) + else: + strict_data_instruction = ( + "CRITICAL: Do NOT mention any branches, figures, or historical data " + "that are not in the SQL query result provided. Stick strictly to the " + "data. If historical context from RAG is about different branches than " + "what the query returned, IGNORE that context entirely." + ) response = self.gen_agent.generate( question=message.text, @@ -155,6 +157,7 @@ class Orchestrator: retry_count=attempt, strict_data_instruction=strict_data_instruction, supplementary_context=supplementary_context, + sql=sql, ) qa_result = self.qa_agent.evaluate( @@ -163,6 +166,7 @@ class Orchestrator: threshold=qa_threshold, supplementary_context=supplementary_context, user_question=message.text, + sql=sql, ) # Track best response (prefer longer, richer responses over "no data" stubs) @@ -256,7 +260,9 @@ class Orchestrator: top_performers = db.execute_query(enrichment_sql, (client_id,)) if top_performers is not None and len(top_performers) > 0: - logger.info(f"Enrichment: fetched {len(top_performers)} top performer rows") + logger.info( + f"Enrichment: fetched {len(top_performers)} top performer rows" + ) return top_performers except Exception as e: @@ -273,6 +279,7 @@ class Orchestrator: """Log interaction for monitoring.""" try: from rag.monitoring import log_interaction + log_interaction( client_id=client_config.client_id, query=message.text, diff --git a/agents/qa_agent.py b/agents/qa_agent.py index 3200787..1eda335 100644 --- a/agents/qa_agent.py +++ b/agents/qa_agent.py @@ -9,11 +9,12 @@ Threshold from client YAML hallucination_threshold (default 0.75). import json import logging +import re from typing import Optional, List, Dict import pandas as pd -from config.llm_client import get_llm_client, get_model_name +from config.llm_client import get_llm_client, get_model_name, chat_with_retry logger = logging.getLogger(__name__) @@ -32,7 +33,9 @@ Your job: verify that the response ONLY contains claims supported by the provide ### 1. Branch Name Validation (CRITICAL) - Extract ALL branch/city names mentioned in the response - Compare against the branch names in the Data Context above -- If ANY branch name appears in the response but NOT in the Data Context, this is a HALLUCINATION +- Branch/entity names listed under "Valid Entities from User Question" are VALID even if not listed in query results +- Branch/entity names listed under "Branches/entities filtered in SQL WHERE clause" are VALID even if not in result rows (e.g., if SQL has WHERE branch = 'X', then 'X' is valid context) +- If ANY branch name appears in the response but NOT in the Data Context, the valid-entities list, or the SQL WHERE clause filters, this is a HALLUCINATION - Deduct 0.3 from score for EACH unrelated branch mentioned ### 2. Numerical Accuracy (CRITICAL) @@ -83,6 +86,7 @@ class QAAgent: threshold: float = 0.75, supplementary_context: Optional[pd.DataFrame] = None, user_question: str = "", + sql: Optional[str] = None, ) -> Dict: """ Evaluate a response for faithfulness. @@ -93,6 +97,7 @@ class QAAgent: threshold: Minimum score to pass (from client YAML) supplementary_context: Benchmark data (top performers) that is also valid ground truth user_question: The user's original question (entities mentioned here are valid context) + sql: The SQL query that produced the data context (branch/entity filters are valid context) Returns: Dict with score (float), passed (bool), issues (list[str]) @@ -103,6 +108,20 @@ class QAAgent: else: data_str = "No structured data available." + # Include the SQL query so QA understands what filters were applied + # (e.g., branch names in WHERE clause are valid context even if not in result rows) + if sql: + data_str += ( + f"\n\n### SQL Query (defines the data scope)\n```sql\n{sql}\n```" + ) + # Extract branch/entity filters from SQL WHERE clause + where_branches = self._extract_where_entities(sql) + if where_branches: + data_str += ( + f"\nBranches/entities filtered in SQL WHERE clause (VALID context): " + f"{', '.join(sorted(where_branches))}" + ) + # Include supplementary (benchmark) context as valid ground truth if supplementary_context is not None and len(supplementary_context) > 0: data_str += "\n\n### Benchmark Data (also valid ground truth)\n" @@ -110,7 +129,16 @@ class QAAgent: # Include user question so QA knows which entities are valid context if user_question: - data_str += f"\n\n### User Question Context\nThe user asked: \"{user_question}\"\nBranch/entity names mentioned in the user's question are valid to reference in the response." + entities = self._extract_entities(user_question) + if entities: + entity_list = ", ".join(sorted(entities)) + else: + entity_list = "(none)" + data_str += ( + "\n\n### User Question Context\n" + f'The user asked: "{user_question}"\n' + f"Valid Entities from User Question: {entity_list}" + ) prompt = EVAL_PROMPT.format( data_context=data_str, @@ -119,10 +147,14 @@ class QAAgent: ) try: - result = self.client.chat.completions.create( + result = chat_with_retry( + self.client, model=self.model, messages=[ - {"role": "system", "content": "You are a strict QA evaluator. Return only valid JSON. Pay special attention to branch names and figures that appear in the response but NOT in the data context — these are hallucinations."}, + { + "role": "system", + "content": "You are a strict QA evaluator. Return only valid JSON. Pay special attention to branch names and figures that appear in the response but NOT in the data context — these are hallucinations.", + }, {"role": "user", "content": prompt}, ], temperature=0.1, @@ -140,7 +172,11 @@ class QAAgent: except Exception as e: logger.error(f"QA evaluation failed: {e}") # On failure, pass with warning - return {"score": 0.5, "passed": True, "issues": [f"QA evaluation error: {str(e)}"]} + return { + "score": 0.5, + "passed": True, + "issues": [f"QA evaluation error: {str(e)}"], + } def _parse_response(self, raw: str, threshold: float) -> Dict: """Parse JSON response from QA LLM call.""" @@ -162,4 +198,82 @@ class QAAgent: } except (json.JSONDecodeError, ValueError) as e: logger.warning(f"Could not parse QA response: {e}. Raw: {raw[:200]}") - return {"score": 0.5, "passed": True, "issues": ["QA response parsing failed"]} + return { + "score": 0.5, + "passed": True, + "issues": ["QA response parsing failed"], + } + + def _extract_where_entities(self, sql: str) -> List[str]: + """Extract branch/city entity names from SQL WHERE clause filters.""" + if not sql: + return [] + entities = set() + # Match patterns like: branch = 'Seattle', city = 'Toronto' + for match in re.finditer( + r"(?:branch|city|country)\s*=\s*'([^']+)'", + sql, + re.IGNORECASE, + ): + val = match.group(1).strip() + if val and len(val) > 1: + entities.add(val) + # Also handle IN ('val1', 'val2') patterns + for match in re.finditer( + r"(?:branch|city|country)\s+IN\s*\(([^)]+)\)", + sql, + re.IGNORECASE, + ): + for val in re.findall(r"'([^']+)'", match.group(1)): + if val and len(val) > 1: + entities.add(val) + return list(entities) + + def _extract_entities(self, text: str) -> List[str]: + """Extract likely branch/city entities from a user question.""" + if not text: + return [] + + lowered = text.lower() + patterns = [ + r"\bbranch\s+([a-z][a-z\s\-']{1,60})", + r"\bin\s+([a-z][a-z\s\-']{1,60})", + r"\bfor\s+the\s+([a-z][a-z\s\-']{1,60})\s+branch", + ] + + stops = { + "the", + "a", + "an", + "my", + "our", + "this", + "that", + "these", + "those", + "branch", + "branches", + "revenue", + "sales", + "roi", + "profit", + "performance", + } + + entities = set() + for pattern in patterns: + for match in re.findall(pattern, lowered): + candidate = match.strip(" .,!?:;\"'") + candidate = " ".join(candidate.split()) + if not candidate: + continue + if candidate in stops: + continue + if any(word in stops for word in candidate.split()): + candidate = " ".join(w for w in candidate.split() if w not in stops) + candidate = candidate.strip() + if len(candidate) < 2: + continue + entities.add(candidate.title()) + + return list(entities) diff --git a/agents/scout_agent.py b/agents/scout_agent.py index cbe55e4..cf8ee29 100644 --- a/agents/scout_agent.py +++ b/agents/scout_agent.py @@ -12,7 +12,7 @@ import logging from datetime import datetime from typing import Optional -from config.llm_client import get_llm_client, get_model_name +from config.llm_client import get_llm_client, get_model_name, chat_with_retry from config.client_loader import ClientConfig from config.settings import get_settings from skills.web_search import web_search @@ -99,8 +99,7 @@ class ScoutAgent: # Format results for LLM results_text = "\n\n".join( - f"**{r['title']}** ({r['url']})\n{r['content']}" - for r in all_results + f"**{r['title']}** ({r['url']})\n{r['content']}" for r in all_results ) # Summarize with Groq @@ -113,10 +112,14 @@ class ScoutAgent: ) try: - response = self.client.chat.completions.create( + response = chat_with_retry( + self.client, model=self.model, messages=[ - {"role": "system", "content": "You are a business intelligence scout."}, + { + "role": "system", + "content": "You are a business intelligence scout.", + }, {"role": "user", "content": prompt}, ], temperature=0.3, @@ -126,11 +129,15 @@ class ScoutAgent: result = response.choices[0].message.content.strip() if result == "NO_RELEVANT_NEWS": - logger.info(f"[{client_config.client_id}] Scout: no relevant news found") + logger.info( + f"[{client_config.client_id}] Scout: no relevant news found" + ) return None section = f"## 🔭 Market Intelligence\n\n{result}" - logger.info(f"[{client_config.client_id}] Scout: generated intelligence section") + logger.info( + f"[{client_config.client_id}] Scout: generated intelligence section" + ) return section except Exception as e: @@ -157,12 +164,18 @@ class ScoutAgent: scout_config = client_config.scout # Search with the user's query directly - results = web_search(query, max_results=5, lookback_days=scout_config.news_lookback_days) + results = web_search( + query, max_results=5, lookback_days=scout_config.news_lookback_days + ) # Also search with competitor names if they appear in the query for competitor in scout_config.competitors: if competitor.lower() in query.lower(): - extra = web_search(f"{competitor} latest news", max_results=3, lookback_days=scout_config.news_lookback_days) + extra = web_search( + f"{competitor} latest news", + max_results=3, + lookback_days=scout_config.news_lookback_days, + ) results.extend(extra) if not results: @@ -179,8 +192,7 @@ class ScoutAgent: # Format results for LLM results_text = "\n\n".join( - f"**{r['title']}** ({r['url']})\n{r['content']}" - for r in unique_results + f"**{r['title']}** ({r['url']})\n{r['content']}" for r in unique_results ) prompt = QUERY_PROMPT.format( @@ -192,10 +204,14 @@ class ScoutAgent: ) try: - response = self.client.chat.completions.create( + response = chat_with_retry( + self.client, model=self.model, messages=[ - {"role": "system", "content": "You are a business intelligence scout."}, + { + "role": "system", + "content": "You are a business intelligence scout.", + }, {"role": "user", "content": prompt}, ], temperature=0.3, diff --git a/channels/slack_handler.py b/channels/slack_handler.py index adbab4c..735177e 100644 --- a/channels/slack_handler.py +++ b/channels/slack_handler.py @@ -31,8 +31,8 @@ logger = logging.getLogger(__name__) # Thread pool for processing LLM pipeline without blocking event handlers _executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="clawrity-slack") -# Module-level guard: only one SlackHandler should be active at a time -_active_handler: Optional["SlackHandler"] = None +# Module-level reference to prevent multiple handlers +_active_handler = None class SlackHandler: @@ -51,32 +51,25 @@ class SlackHandler: settings = get_settings() - # --------------------------------------------------------------- - # Bot Token (xoxb-...) — from .env SLACK_BOT_TOKEN - # This is the OAuth token installed to your workspace. - # --------------------------------------------------------------- self.bot_token = settings.slack_bot_token - - # --------------------------------------------------------------- - # App-Level Token (xapp-...) — from .env SLACK_APP_TOKEN - # Required for Socket Mode. Generated in Slack app settings. - # --------------------------------------------------------------- self.app_token = settings.slack_app_token - - # --------------------------------------------------------------- - # Signing Secret — from .env SLACK_SIGNING_SECRET - # Used to verify incoming requests from Slack. - # --------------------------------------------------------------- self.signing_secret = settings.slack_signing_secret self.app = None self.handler = None - # Deduplication: track recently processed event timestamps - # Slack retries events if handler is slow — this prevents duplicates - self._processed_events: Set[str] = set() + # Deduplication: track recently processed message timestamps. + # Slack Socket Mode retries deliver different envelope_ids but + # the underlying message "ts" stays the same. + self._processed_ts: Set[str] = set() self._processed_lock = threading.Lock() + # Per-user processing lock: prevents duplicate responses when + # Slack delivers the same event multiple times before dedup catches it. + # Only one message per user is processed at a time. + self._busy_users: Set[str] = set() + self._busy_lock = threading.Lock() + def _validate_tokens(self) -> bool: """Check that all required Slack tokens are configured.""" if not self.bot_token: @@ -93,47 +86,51 @@ class SlackHandler: return False return True - def _is_duplicate_event(self, event: dict) -> bool: - """Check if we've already processed this event (Slack retry dedup).""" - # Use multiple fields to build a robust dedup key. - # client_msg_id is unique per user message (present on message events, - # but NOT on app_mention events). event_ts is present on both. - # We store keys for all strategies so cross-event-type dedup works. - msg_id = event.get("client_msg_id") - event_ts = event.get("event_ts") or event.get("ts", "") - user = event.get("user", "") + def _is_duplicate(self, event: dict) -> bool: + """ + De-duplicate events using the message 'ts' field. - # Build candidate keys - keys = set() - if msg_id: - keys.add(f"msg:{msg_id}") - if event_ts: - keys.add(f"ts:{event_ts}") - # Fallback: combine event type + ts + user for events without client_msg_id - event_type = event.get("type", "") - if event_ts and user: - keys.add(f"evt:{event_type}:{event_ts}:{user}") - - if not keys: + When Slack retries an event via Socket Mode, it delivers a new + envelope with a different envelope_id/event_ts, but the underlying + message timestamp ('ts') is identical. We key on 'ts' to catch retries. + """ + ts = event.get("ts", "") + if not ts: + logger.info(f"DEDUP: No ts in event, skipping dedup check") return False with self._processed_lock: - # Check ALL keys — if any match, it's a duplicate - for key in keys: - if key in self._processed_events: - logger.debug(f"Skipping duplicate event (matched key: {key})") - return True + if ts in self._processed_ts: + logger.info(f"DEDUP: Duplicate detected ts={ts}") + return True + self._processed_ts.add(ts) + logger.info(f"DEDUP: New event registered ts={ts}") - # Register ALL keys so cross-event-type dedup works - # (app_mention and message for the same user message share event_ts) - self._processed_events.update(keys) - - # Prune old entries (keep set from growing indefinitely) - if len(self._processed_events) > 500: - self._processed_events = set(list(self._processed_events)[-200:]) + # Prune old entries + if len(self._processed_ts) > 500: + self._processed_ts = set(list(self._processed_ts)[-200:]) return False + def _acquire_user(self, user_id: str) -> bool: + """ + Try to acquire the per-user processing lock. + Returns True if acquired (caller should process), False if already busy. + """ + with self._busy_lock: + if user_id in self._busy_users: + logger.info(f"DEDUP: User {user_id} already being processed, skipping") + return False + self._busy_users.add(user_id) + logger.info(f"DEDUP: Acquired user {user_id}") + return True + + def _release_user(self, user_id: str): + """Release the per-user processing lock.""" + with self._busy_lock: + self._busy_users.discard(user_id) + logger.info(f"DEDUP: Released user {user_id}") + def _setup_app(self): """Initialize Slack Bolt App and register event handlers.""" from slack_bolt import App @@ -156,15 +153,24 @@ class SlackHandler: # --- Event: Bot mentioned in a channel --- @self.app.event("app_mention") def handle_mention(event, say, context): - # Return IMMEDIATELY so Slack gets ack — process in background - if self._is_duplicate_event(event): + user_id = event.get("user", "") + ts = event.get("ts", "") + text = event.get("text", "")[:120] + channel = event.get("channel", "") + logger.info( + f"[app_mention] ts={ts} user={user_id} channel={channel} text={text}" + ) + if self._is_duplicate(event): return - _executor.submit(self._handle_event, event, say, context) + if not self._acquire_user(user_id): + return + logger.info(f"[app_mention] Submitting to thread pool for user={user_id}") + _executor.submit(self._handle_event_safe, event, say, context) # --- Event: Direct message to bot --- @self.app.event("message") def handle_message(event, say, context): - # Ignore bot's own messages and message_changed events + # Ignore bot's own messages and subtypes if event.get("subtype") in ( "bot_message", "message_changed", @@ -173,56 +179,113 @@ class SlackHandler: return if event.get("bot_id"): return - # Ignore if this is from the bot itself if self._bot_user_id and event.get("user") == self._bot_user_id: return - # Skip channel messages that contain a bot mention — - # those are handled by the app_mention handler above. - # Only process DMs here (channel_type == "im"). - channel_type = event.get("channel_type", "") - if channel_type != "im": + # Only DMs — channel mentions are handled by app_mention + if event.get("channel_type", "") != "im": return - if self._is_duplicate_event(event): + + user_id = event.get("user", "") + ts = event.get("ts", "") + text = event.get("text", "")[:120] + logger.info(f"[message/DM] ts={ts} user={user_id} text={text}") + if self._is_duplicate(event): return - # Return IMMEDIATELY — process in background - _executor.submit(self._handle_event, event, say, context) + if not self._acquire_user(user_id): + return + logger.info(f"[message/DM] Submitting to thread pool for user={user_id}") + _executor.submit(self._handle_event_safe, event, say, context) self.handler = SocketModeHandler(self.app, self.app_token) + def _handle_event_safe(self, event: dict, say, context): + """Wrapper that catches all exceptions and releases user lock.""" + user_id = event.get("user", "") + event_ts = event.get("ts", "") + text_preview = event.get("text", "")[:80] + logger.info( + f"[handle_event_safe] START user={user_id} ts={event_ts} text={text_preview}" + ) + try: + self._handle_event(event, say, context) + logger.info(f"[handle_event_safe] DONE user={user_id} ts={event_ts}") + except Exception as e: + logger.error( + f"[handle_event_safe] UNHANDLED ERROR user={user_id}: {e}", + exc_info=True, + ) + try: + say( + "❌ I encountered an error processing your request. Please try again." + ) + except Exception as say_err: + logger.error( + f"[handle_event_safe] Failed to send error to Slack: {say_err}" + ) + finally: + self._release_user(user_id) + def _handle_event(self, event: dict, say, context): """Process an incoming Slack event (runs in background thread).""" + team_id = context.get("team_id", None) if context else None + message = self.adapter.normalise_slack(event, team_id=team_id) + logger.info( + f"[handle_event] normalised: client_id={message.client_id} " + f"text={message.text[:60] if message.text else '(empty)'}" + ) + + if not message.text: + logger.info("[handle_event] empty text, returning") + return + + if message.client_id == "unknown": + say("⚠️ Could not identify your workspace. Please contact support.") + return + + client_config = self.client_configs.get(message.client_id) + if not client_config: + say(f"⚠️ No configuration found for client: {message.client_id}") + return + + logger.info("[handle_event] calling orchestrator...") + loop = asyncio.new_event_loop() try: - team_id = context.get("team_id", None) if context else None - message = self.adapter.normalise_slack(event, team_id=team_id) + result = loop.run_until_complete( + self.orchestrator.process(message, client_config) + ) + response_text = result.get("response", "") + if not response_text: + response_text = "I wasn't able to generate a response. Please try rephrasing your question." - if not message.text: - return - - if message.client_id == "unknown": - say("⚠️ Could not identify your workspace. Please contact support.") - return - - client_config = self.client_configs.get(message.client_id) - if not client_config: - say(f"⚠️ No configuration found for client: {message.client_id}") - return - - # Run the orchestrator pipeline (async in sync context) - loop = asyncio.new_event_loop() - try: - result = loop.run_until_complete( - self.orchestrator.process(message, client_config) - ) - say(result["response"]) - finally: - loop.close() + logger.info( + f"[handle_event] orchestrator done, response={len(response_text)} chars, " + f"qa_score={result.get('qa_score', 0):.2f}, retries={result.get('retries', 0)}" + ) + # Slack has a 4000 char limit for messages; split if needed + if len(response_text) > 3900: + chunks = [ + response_text[i : i + 3900] + for i in range(0, len(response_text), 3900) + ] + for i, chunk in enumerate(chunks): + say(chunk) + logger.info(f"[handle_event] sent chunk {i + 1}/{len(chunks)}") + else: + say(response_text) + logger.info("[handle_event] say() called successfully") except Exception as e: logger.error(f"Slack event handler error: {e}", exc_info=True) - say( + error_msg = ( "❌ I encountered an error processing your request. " "Please try again or contact support." ) + try: + say(error_msg) + except Exception as say_err: + logger.error(f"Failed to send error message to Slack: {say_err}") + finally: + loop.close() def start(self): """Start the Slack bot in a background thread.""" diff --git a/config/llm_client.py b/config/llm_client.py index 5d72fab..dbd4caf 100644 --- a/config/llm_client.py +++ b/config/llm_client.py @@ -11,10 +11,10 @@ Auto-detects provider from settings: """ import logging +import time from functools import lru_cache -from openai import OpenAI - +from openai import OpenAI, RateLimitError, APIStatusError from config.settings import get_settings logger = logging.getLogger(__name__) @@ -31,6 +31,9 @@ _PROVIDERS = { }, } +MAX_RETRIES = 4 +BASE_DELAY = 1.0 # seconds + def get_llm_client() -> OpenAI: """Get the configured LLM client (NVIDIA NIM or Groq).""" @@ -54,12 +57,53 @@ def get_llm_client() -> OpenAI: client = OpenAI( api_key=api_key, base_url=config["base_url"], + max_retries=0, # We handle retries ourselves for better control ) logger.info(f"LLM client: {provider} ({config['base_url']})") return client +def chat_with_retry(client: OpenAI, **kwargs): + """ + Call client.chat.completions.create with exponential backoff on 429 errors. + + Args: + client: OpenAI client instance + **kwargs: Arguments passed to chat.completions.create + + Returns: + The completion response + + Raises: + RateLimitError: After all retries exhausted + APIStatusError: For non-429 API errors + """ + for attempt in range(MAX_RETRIES + 1): + try: + return client.chat.completions.create(**kwargs) + except RateLimitError as e: + if attempt == MAX_RETRIES: + logger.error(f"Rate limit: all {MAX_RETRIES} retries exhausted") + raise + delay = BASE_DELAY * (2**attempt) + logger.warning( + f"Rate limited (429), retrying in {delay:.1f}s " + f"(attempt {attempt + 1}/{MAX_RETRIES})" + ) + time.sleep(delay) + except APIStatusError as e: + if e.status_code == 429 and attempt < MAX_RETRIES: + delay = BASE_DELAY * (2**attempt) + logger.warning( + f"Rate limited (429), retrying in {delay:.1f}s " + f"(attempt {attempt + 1}/{MAX_RETRIES})" + ) + time.sleep(delay) + else: + raise + + def get_model_name() -> str: """Get the model name for the active provider.""" settings = get_settings() diff --git a/heartbeat/scheduler.py b/heartbeat/scheduler.py index 86f3a6f..468131a 100644 --- a/heartbeat/scheduler.py +++ b/heartbeat/scheduler.py @@ -125,6 +125,7 @@ async def run_digest( scout_section = None try: from agents.scout_agent import ScoutAgent + scout = ScoutAgent() scout_section = await scout.gather_intelligence(client_config) except Exception as e: @@ -140,25 +141,36 @@ async def run_digest( # Step 5: Push to Slack webhook webhook_url = client_config.channels.get("slack_webhook", "") - if webhook_url: + if webhook_url and webhook_url.startswith(("http://", "https://")): await _push_to_slack(webhook_url, full_digest) + elif webhook_url: + logger.warning( + f"[{client_id}] Slack webhook URL is malformed (missing http/https protocol): " + f"{webhook_url[:50]}..." + ) else: logger.warning(f"[{client_id}] No Slack webhook configured") # Step 6: Log success to JSONL - _log_digest_event(client_id, "success", { - "qa_score": qa_result["score"], - "qa_passed": qa_result["passed"], - "scout_included": scout_section is not None, - "digest_length": len(full_digest), - }) + _log_digest_event( + client_id, + "success", + { + "qa_score": qa_result["score"], + "qa_passed": qa_result["passed"], + "scout_included": scout_section is not None, + "digest_length": len(full_digest), + }, + ) logger.info(f"[{client_id}] Digest completed successfully") return full_digest except Exception as e: logger.error(f"[{client_id}] Digest failed: {e}", exc_info=True) - _log_digest_event(client_id, "failure", {"error": str(e), "attempt": retry_count + 1}) + _log_digest_event( + client_id, "failure", {"error": str(e), "attempt": retry_count + 1} + ) heartbeat = load_heartbeat(client_config) @@ -171,19 +183,25 @@ async def run_digest( await asyncio.sleep(delay_minutes * 60) return await run_digest(client_config, orchestrator, retry_count + 1) else: - logger.error(f"[{client_id}] Digest failed after {heartbeat.max_retries + 1} attempts") + logger.error( + f"[{client_id}] Digest failed after {heartbeat.max_retries + 1} attempts" + ) # Post failure notification to Slack webhook_url = client_config.channels.get("slack_webhook", "") - if webhook_url: + if webhook_url and webhook_url.startswith(("http://", "https://")): await _push_to_slack( - webhook_url, - "Clawrity digest unavailable. Backend may be offline." + webhook_url, "Clawrity digest unavailable. Backend may be offline." ) return None async def _push_to_slack(webhook_url: str, message: str): """Push a message to a Slack incoming webhook.""" + if not webhook_url or not webhook_url.startswith(("http://", "https://")): + logger.error( + f"Invalid Slack webhook URL: {webhook_url[:50] if webhook_url else '(empty)'}" + ) + return try: async with httpx.AsyncClient() as client: response = await client.post( @@ -194,7 +212,9 @@ async def _push_to_slack(webhook_url: str, message: str): if response.status_code == 200: logger.info("Digest pushed to Slack successfully") else: - logger.error(f"Slack webhook returned {response.status_code}: {response.text}") + logger.error( + f"Slack webhook returned {response.status_code}: {response.text}" + ) except Exception as e: logger.error(f"Failed to push digest to Slack: {e}") @@ -252,8 +272,7 @@ def start_scheduler( replace_existing=True, ) logger.info( - f"Scheduled digest for {client_id}: " - f"{heartbeat.time} {heartbeat.timezone}" + f"Scheduled digest for {client_id}: {heartbeat.time} {heartbeat.timezone}" ) # ETL sync at 02:00 (placeholder) @@ -290,6 +309,7 @@ async def _rag_reindex_placeholder(client_id: str): logger.info(f"[{client_id}] RAG re-index triggered (placeholder)") try: from scripts.run_rag_pipeline import run_pipeline + run_pipeline(client_id) except Exception as e: logger.warning(f"RAG re-index failed: {e}") diff --git a/main.py b/main.py index 885b634..39cf135 100644 --- a/main.py +++ b/main.py @@ -7,11 +7,13 @@ starts Slack bot, and exposes REST endpoints. import asyncio import logging +import traceback from contextlib import asynccontextmanager from typing import Dict, Optional -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse from pydantic import BaseModel from agents.orchestrator import Orchestrator @@ -48,20 +50,34 @@ async def lifespan(app: FastAPI): logger.info("=== Clawrity starting up ===") # 1. Init database schema - db = get_connector() - db.init_schema() - logger.info("Database schema ready") + try: + db = get_connector() + db.init_schema() + logger.info("Database schema ready") + except Exception as e: + logger.error(f"Database init failed: {e}") + logger.warning("Starting in degraded mode — database unavailable") # 2. Load client configs - client_configs = load_client_configs() - logger.info(f"Loaded {len(client_configs)} client(s): {list(client_configs.keys())}") + try: + client_configs = load_client_configs() + logger.info( + f"Loaded {len(client_configs)} client(s): {list(client_configs.keys())}" + ) + except Exception as e: + logger.error(f"Client config loading failed: {e}") + client_configs = {} # 3. Init orchestrator - orchestrator = Orchestrator() + try: + orchestrator = Orchestrator() + except Exception as e: + logger.error(f"Orchestrator init failed: {e}") # 4. Try to attach RAG retriever try: from rag.retriever import Retriever + retriever = Retriever() orchestrator.set_retriever(retriever) logger.info("RAG retriever attached to orchestrator") @@ -69,15 +85,22 @@ async def lifespan(app: FastAPI): logger.info(f"RAG retriever not available (Phase 2): {e}") # 5. Init protocol adapter - protocol_adapter = ProtocolAdapter(client_configs) + try: + protocol_adapter = ProtocolAdapter(client_configs) + except Exception as e: + logger.error(f"Protocol adapter init failed: {e}") # 6. Start Slack bot - slack_handler = SlackHandler(protocol_adapter, client_configs, orchestrator) - slack_handler.start() + try: + slack_handler = SlackHandler(protocol_adapter, client_configs, orchestrator) + slack_handler.start() + except Exception as e: + logger.warning(f"Slack bot not started: {e}") # 7. Start scheduler try: from heartbeat.scheduler import start_scheduler + scheduler = start_scheduler(client_configs, orchestrator) logger.info("HEARTBEAT scheduler started") except Exception as e: @@ -89,11 +112,20 @@ async def lifespan(app: FastAPI): # Shutdown logger.info("=== Clawrity shutting down ===") - if slack_handler: - slack_handler.stop() - if scheduler: - scheduler.shutdown(wait=False) - db.close() + try: + if slack_handler: + slack_handler.stop() + except Exception as e: + logger.warning(f"Slack handler stop error: {e}") + try: + if scheduler: + scheduler.shutdown(wait=False) + except Exception as e: + logger.warning(f"Scheduler stop error: {e}") + try: + db.close() + except Exception: + pass # --------------------------------------------------------------------------- @@ -115,6 +147,23 @@ app.add_middleware( ) +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception): + """Catch-all exception handler to prevent process crashes.""" + logger.error( + f"Unhandled exception on {request.method} {request.url.path}: {exc}\n" + f"{traceback.format_exc()}" + ) + return JSONResponse( + status_code=500, + content={ + "error": "Internal server error", + "detail": str(exc), + "path": str(request.url.path), + }, + ) + + # --------------------------------------------------------------------------- # Request/Response Models # --------------------------------------------------------------------------- @@ -157,11 +206,18 @@ class ClientRequest(BaseModel): # Endpoints # --------------------------------------------------------------------------- + @app.post("/chat", response_model=ChatResponse) async def chat(request: ChatRequest): """Send a message and get an AI response.""" if request.client_id not in client_configs: - raise HTTPException(status_code=404, detail=f"Client not found: {request.client_id}") + raise HTTPException( + status_code=404, detail=f"Client not found: {request.client_id}" + ) + if not orchestrator or not protocol_adapter: + raise HTTPException( + status_code=503, detail="Service not fully initialized. Check /health." + ) config = client_configs[request.client_id] message = protocol_adapter.normalise_api(request.client_id, request.message) @@ -174,7 +230,9 @@ async def chat(request: ChatRequest): async def compare(request: CompareRequest): """Side-by-side comparison: with RAG vs without RAG.""" if request.client_id not in client_configs: - raise HTTPException(status_code=404, detail=f"Client not found: {request.client_id}") + raise HTTPException( + status_code=404, detail=f"Client not found: {request.client_id}" + ) config = client_configs[request.client_id] message = protocol_adapter.normalise_api(request.client_id, request.message) @@ -198,17 +256,23 @@ async def compare(request: CompareRequest): async def scout(request: ScoutRequest): """Run a targeted scout search for competitor/market intelligence.""" if request.client_id not in client_configs: - raise HTTPException(status_code=404, detail=f"Client not found: {request.client_id}") + raise HTTPException( + status_code=404, detail=f"Client not found: {request.client_id}" + ) config = client_configs[request.client_id] try: from agents.scout_agent import ScoutAgent + scout_agent = ScoutAgent() result = await scout_agent.search_query(config, request.query) if result is None: - return {"response": "No relevant competitor or market news found for this query.", "has_results": False} + return { + "response": "No relevant competitor or market news found for this query.", + "has_results": False, + } return {"response": result, "has_results": True} except Exception as e: @@ -220,17 +284,23 @@ async def scout(request: ScoutRequest): async def scout_digest(request: ClientRequest): """Run full scout agent digest for a client.""" if request.client_id not in client_configs: - raise HTTPException(status_code=404, detail=f"Client not found: {request.client_id}") + raise HTTPException( + status_code=404, detail=f"Client not found: {request.client_id}" + ) config = client_configs[request.client_id] try: from agents.scout_agent import ScoutAgent + scout_agent = ScoutAgent() result = await scout_agent.gather_intelligence(config) if result is None: - return {"response": "No relevant market intelligence found.", "has_results": False} + return { + "response": "No relevant market intelligence found.", + "has_results": False, + } return {"response": result, "has_results": True} except Exception as e: @@ -242,16 +312,21 @@ async def scout_digest(request: ClientRequest): async def trigger_digest(request: ClientRequest): """Manually trigger the daily digest pipeline (same as scheduled job).""" if request.client_id not in client_configs: - raise HTTPException(status_code=404, detail=f"Client not found: {request.client_id}") + raise HTTPException( + status_code=404, detail=f"Client not found: {request.client_id}" + ) config = client_configs[request.client_id] try: from heartbeat.scheduler import run_digest + digest_text = await run_digest(config, orchestrator) if digest_text is None: - raise HTTPException(status_code=500, detail="Digest generation failed after all retries") + raise HTTPException( + status_code=500, detail="Digest generation failed after all retries" + ) return {"response": digest_text, "status": "success"} except HTTPException: @@ -269,6 +344,7 @@ async def admin_stats(client_id: str): try: from rag.monitoring import get_stats + return get_stats(client_id) except Exception as e: return {"error": str(e), "message": "Monitoring not yet configured"} @@ -282,6 +358,7 @@ async def run_forecast(client_id: str): try: from forecasting.prophet_engine import ProphetEngine + engine = ProphetEngine() results = engine.train_and_forecast(client_id) return {"status": "success", "branches_forecast": len(results)} @@ -297,10 +374,13 @@ async def get_forecast(client_id: str, branch: str): try: from forecasting.prophet_engine import ProphetEngine + engine = ProphetEngine() forecast = engine.get_cached_forecast(client_id, branch) if not forecast: - raise HTTPException(status_code=404, detail=f"No forecast found for {branch}") + raise HTTPException( + status_code=404, detail=f"No forecast found for {branch}" + ) return forecast except HTTPException: raise @@ -320,7 +400,7 @@ async def health(): pass scheduled_jobs = [] - if scheduler and hasattr(scheduler, 'get_jobs'): + if scheduler and hasattr(scheduler, "get_jobs"): try: scheduled_jobs = [ {"id": job.id, "name": job.name, "next_run": str(job.next_run_time)} @@ -333,7 +413,9 @@ async def health(): "status": "healthy" if db_connected else "degraded", "database": "connected" if db_connected else "disconnected", "clients": list(client_configs.keys()), - "scheduler_running": scheduler is not None and scheduler.running if scheduler else False, + "scheduler_running": scheduler is not None and scheduler.running + if scheduler + else False, "scheduled_jobs": scheduled_jobs, "slack_active": slack_handler is not None and slack_handler._thread is not None, } @@ -342,4 +424,6 @@ async def health(): @app.post("/slack/events") async def slack_events(): """Slack webhook endpoint (HTTP mode fallback). Socket Mode is primary.""" - return {"message": "Slack events are handled via Socket Mode. This endpoint is a fallback."} + return { + "message": "Slack events are handled via Socket Mode. This endpoint is a fallback." + } diff --git a/skills/nl_to_sql.py b/skills/nl_to_sql.py index 59b3e10..3275e4d 100644 --- a/skills/nl_to_sql.py +++ b/skills/nl_to_sql.py @@ -10,14 +10,14 @@ import re import logging from typing import Optional -from config.llm_client import get_llm_client, get_model_name +from config.llm_client import get_llm_client, get_model_name, chat_with_retry logger = logging.getLogger(__name__) # Dangerous SQL patterns — reject anything that isn't a SELECT UNSAFE_PATTERNS = re.compile( r"\b(INSERT|UPDATE|DELETE|DROP|ALTER|TRUNCATE|CREATE|GRANT|REVOKE|EXEC)\b", - re.IGNORECASE + re.IGNORECASE, ) SYSTEM_PROMPT = """You are a PostgreSQL SQL generator. Generate ONLY a valid SELECT query. @@ -90,7 +90,8 @@ class NLToSQL: ) try: - response = self.client.chat.completions.create( + response = chat_with_retry( + self.client, model=self.model, messages=[ {"role": "system", "content": system},