From dc9ec40f2acf950371a0f0067e7c38e49120577b Mon Sep 17 00:00:00 2001 From: zhangPinkdolphin <2308717915@qq.com> Date: Sat, 9 May 2026 14:58:35 +0800 Subject: [PATCH 1/4] feat: add SmartRouter for task-aware LLM auto-routing Add SmartRouter class that classifies user queries into simple/complex based on keywords, query length, and code patterns. --- llmcore.py | 1075 +++------------------------------------------------- 1 file changed, 55 insertions(+), 1020 deletions(-) diff --git a/llmcore.py b/llmcore.py index 6468291e..ec73e6e5 100644 --- a/llmcore.py +++ b/llmcore.py @@ -1,1023 +1,58 @@ -import os, json, re, time, requests, sys, threading, urllib3, base64, importlib, uuid -from datetime import datetime -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) -_RESP_CACHE_KEY = str(uuid.uuid4()) - -def _load_mykeys(): - global _mykey_path - try: - import mykey; importlib.reload(mykey); _mykey_path = mykey.__file__ - return {k: v for k, v in vars(mykey).items() if not k.startswith('_')} - except ImportError: pass - _mykey_path = p = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mykey.json') - if not os.path.exists(p): raise Exception('[ERROR] mykey.py or mykey.json not found, please create one from mykey_template.') - with open(p, encoding='utf-8') as f: return json.load(f) - -_mykey_path = _mykey_mtime = None -def reload_mykeys(): - global _mykey_mtime - mt = os.stat(_mykey_path).st_mtime_ns if _mykey_path else -1 - if mt == _mykey_mtime: return globals().get('mykeys', {}), False - mk = _load_mykeys(); _mykey_mtime = os.stat(_mykey_path).st_mtime_ns - print(f'[Info] Load mykeys from {_mykey_path}') - globals().update(mykeys=mk) - if mk.get('langfuse_config'): - try: from plugins import langfuse_tracing - except Exception: pass - return mk, True - -def __getattr__(name): # once guard in PEP 562 - if name == 'mykeys': return reload_mykeys()[0] - raise AttributeError(f"module 'llmcore' has no attribute {name}") - -def compress_history_tags(messages, keep_recent=10, max_len=800, force=False): - """Compress // tags in older messages to save tokens.""" - compress_history_tags._cd = getattr(compress_history_tags, '_cd', 0) + 1 - if force: compress_history_tags._cd = 0 - if compress_history_tags._cd % 5 != 0: return messages - _before = sum(len(json.dumps(m, ensure_ascii=False)) for m in messages) - _pats = {tag: re.compile(rf'(<{tag}>)([\s\S]*?)()') for tag in ('thinking', 'think', 'tool_use', 'tool_result')} - _hist_pat = re.compile(r'<(history|key_info|earlier_context)>[\s\S]*?') - def _trunc_str(s): return s[:max_len//2] + '\n...[Truncated]...\n' + s[-max_len//2:] if isinstance(s, str) and len(s) > max_len else s - def _trunc(text): - text = _hist_pat.sub(lambda m: f'<{m.group(1)}>[...]', text) - for pat in _pats.values(): text = pat.sub(lambda m: m.group(1) + _trunc_str(m.group(2)) + m.group(3), text) - return text - for i, msg in enumerate(messages): - if i >= len(messages) - keep_recent: break - c = msg['content'] - if isinstance(c, str): msg['content'] = _trunc(c) - elif isinstance(c, list): - for b in c: - if not isinstance(b, dict): continue - t = b.get('type') - if t == 'text' and isinstance(b.get('text'), str): b['text'] = _trunc(b['text']) - elif t == 'tool_result': - tc = b.get('content') - if isinstance(tc, str): b['content'] = _trunc_str(tc) - elif isinstance(tc, list): - for sub in tc: - if isinstance(sub, dict) and sub.get('type') == 'text': sub['text'] = _trunc_str(sub.get('text')) - elif t == 'tool_use' and isinstance(b.get('input'), dict): - for k, v in b['input'].items(): b['input'][k] = _trunc_str(v) - print(f"[Cut] {_before} -> {sum(len(json.dumps(m, ensure_ascii=False)) for m in messages)}") - return messages - -def _sanitize_leading_user_msg(msg): - """把 user 消息里的 tool_result 块改写成纯文本,避免孤立引用。 - history 统一使用 Claude content-block 格式:content 是 list of blocks。""" - msg = dict(msg) # 浅拷贝外层 dict - content = msg.get('content') - if not isinstance(content, list): return msg - texts = [] - for block in content: - if not isinstance(block, dict): continue - if block.get('type') == 'tool_result': - c = block.get('content', '') - if isinstance(c, list): # content 本身也可能是 list[{type:text,text:...}] - texts.extend(b.get('text', '') for b in c if isinstance(b, dict)) - else: texts.append(str(c)) - elif block.get('type') == 'text': texts.append(block.get('text', '')) - msg['content'] = [{"type": "text", "text": '\n'.join(t for t in texts if t)}] - return msg - -_oldprint = print -def safeprint(*argv): - try: _oldprint(*argv) - except OSError: pass -print = safeprint - -def trim_messages_history(history, context_win): - compress_history_tags(history) - cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history) - print(f'[Debug] Current context: {cost} chars, {len(history)} messages.') - if cost > context_win * 3: - compress_history_tags(history, keep_recent=4, force=True) # trim breaks cache, so compress more btw - target = context_win * 3 * 0.6 - while len(history) > 5 and cost > target: - history.pop(0) - while history and history[0].get('role') != 'user': history.pop(0) - if history and history[0].get('role') == 'user': history[0] = _sanitize_leading_user_msg(history[0]) - cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history) - print(f'[Debug] Trimmed context, current: {cost} chars, {len(history)} messages.') - -def auto_make_url(base, path): - b, p = base.rstrip('/'), path.strip('/') - if b.endswith('$'): return b[:-1].rstrip('/') - if b.endswith(p): return b - return f"{b}/{p}" if re.search(r'/v\d+(/|$)', b) else f"{b}/v1/{p}" - -def _parse_claude_json(data): - content_blocks = data.get("content", []) - _record_usage(data.get("usage", {}), "messages") - for b in content_blocks: - if b.get("type") == "text": yield b.get("text", "") - elif b.get("type") == "thinking": yield "" - return content_blocks - -def _parse_claude_sse(resp_lines): - """Parse Anthropic SSE stream. Yields text chunks, returns list[content_block].""" - content_blocks = []; current_block = None; tool_json_buf = "" - stop_reason = None; got_message_stop = False; warn = None - for line in resp_lines: - if not line: continue - line = line.decode('utf-8') if isinstance(line, bytes) else line - if not line.startswith("data:"): continue - data_str = line[5:].lstrip() - if data_str == "[DONE]": break - try: evt = json.loads(data_str) - except Exception as e: - print(f"[SSE] JSON parse error: {e}, line: {data_str[:200]}") - continue - evt_type = evt.get("type", "") - if evt_type == "message_start": - usage = evt.get("message", {}).get("usage", {}) - _record_usage(usage, "messages") - elif evt_type == "content_block_start": - block = evt.get("content_block", {}) - if block.get("type") == "text": current_block = {"type": "text", "text": ""} - elif block.get("type") == "thinking": current_block = {"type": "thinking", "thinking": "", "signature": ""} - elif block.get("type") == "tool_use": - current_block = {"type": "tool_use", "id": block.get("id", ""), "name": block.get("name", ""), "input": {}} - tool_json_buf = "" - elif evt_type == "content_block_delta": - delta = evt.get("delta", {}) - if delta.get("type") == "text_delta": - text = delta.get("text", "") - if current_block and current_block.get("type") == "text": current_block["text"] += text - if text: yield text - elif delta.get("type") == "thinking_delta": - if current_block and current_block.get("type") == "thinking": current_block["thinking"] += delta.get("thinking", "") - elif delta.get("type") == "signature_delta": - if current_block and current_block.get("type") == "thinking": - current_block["signature"] = current_block.get("signature", "") + delta.get("signature", "") - elif delta.get("type") == "input_json_delta": tool_json_buf += delta.get("partial_json", "") - elif evt_type == "content_block_stop": - if current_block: - if current_block["type"] == "tool_use": - try: current_block["input"] = json.loads(tool_json_buf) if tool_json_buf else {} - except: current_block["input"] = {"_raw": tool_json_buf} - content_blocks.append(current_block) - current_block = None - elif evt_type == "message_delta": - delta = evt.get("delta", {}) - stop_reason = delta.get("stop_reason", stop_reason) - out_usage = evt.get("usage", {}) - out_tokens = out_usage.get("output_tokens", 0) - if out_tokens: print(f"[Output] tokens={out_tokens} stop_reason={stop_reason}") - elif evt_type == "message_stop": got_message_stop = True - elif evt_type == "error": - err = evt.get("error", {}) - emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err) - warn = f"\n\n!!!Error: SSE {emsg}"; break - if not warn: - if not got_message_stop and not stop_reason: warn = "\n\n[!!! 流异常中断,未收到完整响应 !!!]" - elif stop_reason == "max_tokens": warn = "\n\n[!!! Response truncated: max_tokens !!!]" - if current_block: - if current_block["type"] == "tool_use": - try: current_block["input"] = json.loads(tool_json_buf) if tool_json_buf else {} - except: current_block["input"] = {"_raw": tool_json_buf} - content_blocks.append(current_block); current_block = None - if warn: - print(f"[WARN] {warn.strip()}") - content_blocks.append({"type": "text", "text": warn}); yield warn - return content_blocks - -def _try_parse_tool_args(raw): - """Parse tool args string; split concatenated JSON objects like {..}{..} if needed. - Returns list of parsed dicts.""" - if not raw: return [{}] - try: return [json.loads(raw)] - except: pass - parts = re.split(r'(?<=\})(?=\{)', raw) - if len(parts) > 1: - parsed = [] - for p in parts: - try: parsed.append(json.loads(p)) - except: return [{"_raw": raw}] - return parsed - return [{"_raw": raw}] - -def _parse_openai_sse(resp_lines, api_mode="chat_completions"): - """Parse OpenAI SSE stream (chat_completions or responses API). - Yields text chunks, returns list[content_block]. - content_block: {type:'text', text:str} | {type:'tool_use', id:str, name:str, input:dict} - """ - content_text = "" - if api_mode == "responses": - seen_delta = False; fc_buf = {}; current_fc_idx = None - for line in resp_lines: - if not line: continue - line = line.decode('utf-8', errors='replace') if isinstance(line, bytes) else line - if not line.startswith("data:"): continue - data_str = line[5:].lstrip() - if data_str == "[DONE]": break - try: evt = json.loads(data_str) - except: continue - etype = evt.get("type", "") - if etype == "response.output_text.delta": - delta = evt.get("delta", "") - if delta: seen_delta = True; content_text += delta; yield delta - elif etype == "response.output_text.done" and not seen_delta: - text = evt.get("text", "") - if text: content_text += text; yield text - elif etype == "response.output_item.added": - item = evt.get("item", {}) - if item.get("type") == "function_call": - idx = evt.get("output_index", 0) - fc_buf[idx] = {"id": item.get("call_id", item.get("id", "")), "name": item.get("name", ""), "args": ""} - current_fc_idx = idx - elif etype == "response.function_call_arguments.delta": - idx = evt.get("output_index", current_fc_idx or 0) - if idx in fc_buf: fc_buf[idx]["args"] += evt.get("delta", "") - elif etype == "response.function_call_arguments.done": - idx = evt.get("output_index", current_fc_idx or 0) - if idx in fc_buf: fc_buf[idx]["args"] = evt.get("arguments", fc_buf[idx]["args"]) - elif etype == "error": - err = evt.get("error", {}) - emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err) - if emsg: content_text += f"!!!Error: {emsg}"; yield f"!!!Error: {emsg}" - break - elif etype == "response.completed": - usage = evt.get("response", {}).get("usage", {}) - _record_usage(usage, api_mode) - break - blocks = [] - if content_text: blocks.append({"type": "text", "text": content_text}) - for idx in sorted(fc_buf): - fc = fc_buf[idx] - inps = _try_parse_tool_args(fc["args"]) - for i, inp in enumerate(inps): - bid = fc["id"] or '' - if len(inps) > 1: bid = f"{bid}_{i}" if bid else f"split_{i}" - blocks.append({"type": "tool_use", "id": bid, "name": fc["name"], "input": inp}) - return blocks - else: - tc_buf = {} # index -> {id, name, args} - reasoning_text = "" - for line in resp_lines: - if not line: continue - line = line.decode('utf-8', errors='replace') if isinstance(line, bytes) else line - if not line.startswith("data:"): continue - data_str = line[5:].lstrip() - if data_str == "[DONE]": break - try: evt = json.loads(data_str) - except: continue - ch = (evt.get("choices") or [{}])[0] - delta = ch.get("delta") or {} - if delta.get("reasoning_content"): - reasoning_text += delta["reasoning_content"] - if delta.get("content"): - text = delta["content"]; content_text += text; yield text - for tc in (delta.get("tool_calls") or []): - idx = tc.get("index", 0) - has_name = bool(tc.get("function", {}).get("name")) - if idx not in tc_buf: - if has_name or not tc_buf: tc_buf[idx] = {"id": tc.get("id") or '', "name": "", "args": ""} - else: idx = max(tc_buf) - if has_name: tc_buf[idx]["name"] = tc["function"]["name"] - if tc.get("function", {}).get("arguments"): tc_buf[idx]["args"] += tc["function"]["arguments"] - if tc.get("id") and not tc_buf[idx]["id"]: tc_buf[idx]["id"] = tc["id"] - usage = evt.get("usage") - if usage: _record_usage(usage, api_mode) - blocks = [] - if reasoning_text: blocks.append({"type": "thinking", "thinking": reasoning_text}) - if content_text: blocks.append({"type": "text", "text": content_text}) - for idx in sorted(tc_buf): - tc = tc_buf[idx] - inps = _try_parse_tool_args(tc["args"]) - for i, inp in enumerate(inps): - bid = tc["id"] or '' - if len(inps) > 1: bid = f"{bid}_{i}" if bid else f"split_{i}" - blocks.append({"type": "tool_use", "id": bid, "name": tc["name"], "input": inp}) - return blocks - -def _record_usage(usage, api_mode): - if not usage: return - if api_mode == 'responses': - cached = (usage.get("input_tokens_details") or {}).get("cached_tokens", 0) - inp = usage.get("input_tokens", 0) - print(f"[Cache] input={inp} cached={cached}") - elif api_mode == 'chat_completions': - cached = (usage.get("prompt_tokens_details") or {}).get("cached_tokens", 0) - inp = usage.get("prompt_tokens", 0) - print(f"[Cache] input={inp} cached={cached}") - elif api_mode == 'messages': - ci, cr, inp = usage.get("cache_creation_input_tokens", 0), usage.get("cache_read_input_tokens", 0), usage.get("input_tokens", 0) - print(f"[Cache] input={inp} creation={ci} read={cr}") +6468291e0cb02c75450648842b818ca439c121bc +class SmartRouter: + """根据用户查询自动分类任务复杂度,推荐 simple(flash) 或 complex(pro) 模型组。""" + + COMPLEX_KEYWORDS = [ + '代码', '框架', '实现', '架构', '设计模式', '重构', '调试', '算法', + '数据库', 'SQL', 'API', '接口', '部署', '性能', '优化', '爬虫', + 'web', 'flask', 'django', 'git', 'docker', 'k8s', 'kubernetes', + '分析', '设计', '方案', '架构设计', '系统设计', '单元测试', 'CI/CD', + 'pytest', 'unittest', '异步', '多线程', '并发', '缓存', 'redis', + 'nginx', 'linux', 'bash', 'shell', '正则', 'ORM', 'REST', 'graphql', + ] + + def __init__(self): + self._enabled = False + self._last_classification = None + + def enable(self): + self._enabled = True + + def disable(self): + self._enabled = False -def _parse_openai_json(data, api_mode="chat_completions"): - blocks = [] - if api_mode == "responses": - _record_usage(data.get("usage") or {}, api_mode) - for item in (data.get("output") or []): - if item.get("type") == "message": - for p in (item.get("content") or []): - if p.get("type") in ("output_text", "text") and p.get("text"): - blocks.append({"type": "text", "text": p["text"]}); yield p["text"] - elif item.get("type") == "function_call": - try: args = json.loads(item.get("arguments", "")) if item.get("arguments") else {} - except: args = {"_raw": item.get("arguments", "")} - blocks.append({"type": "tool_use", "id": item.get("call_id", item.get("id", "")), - "name": item.get("name", ""), "input": args}) - else: - _record_usage(data.get("usage") or {}, api_mode) - msg = (data.get("choices") or [{}])[0].get("message", {}) - reasoning = msg.get("reasoning_content", "") - if reasoning: - blocks.append({"type": "thinking", "thinking": reasoning}) - content = msg.get("content", "") - if content: - blocks.append({"type": "text", "text": content}); yield content - for tc in (msg.get("tool_calls") or []): - fn = tc.get("function", {}) - try: args = json.loads(fn.get("arguments", "")) if fn.get("arguments") else {} - except: args = {"_raw": fn.get("arguments", "")} - blocks.append({"type": "tool_use", "id": tc.get("id", ""), "name": fn.get("name", ""), "input": args}) - return blocks - -def _stamp_oai_cache_markers(messages, model): - """Add cache_control to last 2 user messages for Anthropic models via OAI-compatible relay.""" - ml = model.lower() - if not any(k in ml for k in ('claude', 'anthropic')): return - user_idxs = [i for i, m in enumerate(messages) if m.get('role') == 'user'] - for idx in user_idxs[-2:]: - c = messages[idx].get('content') - if isinstance(c, str): - messages[idx] = {**messages[idx], 'content': [{'type': 'text', 'text': c, 'cache_control': {'type': 'ephemeral'}}]} - elif isinstance(c, list) and c: - c = list(c); c[-1] = dict(c[-1], cache_control={'type': 'ephemeral'}) - messages[idx] = {**messages[idx], 'content': c} - -def _stream_with_retry(sess, url, headers, payload, parse_fn): - _RETRYABLE = {408, 409, 425, 429, 500, 502, 503, 504, 529} - def _delay(resp, attempt): - try: ra = float((resp.headers or {}).get("retry-after")) - except: ra = None - return max(0.5, ra if ra is not None else min(30.0, 1.5 * (2 ** attempt))) - for attempt in range(sess.max_retries + 1): - streamed = False - try: - with requests.post(url, headers=headers, json=payload, stream=sess.stream, - timeout=(sess.connect_timeout, sess.read_timeout), proxies=sess.proxies, verify=sess.verify) as r: - if r.status_code >= 400: - if r.status_code in _RETRYABLE and attempt < sess.max_retries: - d = _delay(r, attempt) - print(f"[LLM Retry] HTTP {r.status_code}, retry in {d:.1f}s ({attempt+1}/{sess.max_retries+1})") - time.sleep(d); continue - try: body = r.text.strip()[:500] - except: body = "" - err = f"!!!Error: HTTP {r.status_code}" + (f": {body}" if body else "") - yield err; return [{"type": "text", "text": err}] - gen = parse_fn(r) - try: - while True: streamed = True; yield next(gen) - except StopIteration as e: return e.value or [] - except (requests.Timeout, requests.ConnectionError) as e: - err = f"!!!Error: {type(e).__name__}" - if attempt < sess.max_retries: - d = _delay(None, attempt) - print(f"[LLM Retry] {type(e).__name__}, retry in {d:.1f}s ({attempt+1}/{sess.max_retries+1})") - yield err; time.sleep(d); continue - yield err; return [{"type": "text", "text": err}] - except Exception as e: - err = f"\n\n[!!! 流异常中断 {type(e).__name__}: {e} !!!]" if streamed else f"!!!Error: {type(e).__name__}: {e}" - yield err; return [{"type": "text", "text": err}] - -def _openai_stream(sess, messages): - model, api_mode = sess.model, sess.api_mode - ml = model.lower() - temperature = sess.temperature - if 'kimi' in ml or 'moonshot' in ml: temperature = 1 - elif 'minimax' in ml: temperature = max(0.01, min(temperature, 1.0)) # MiniMax requires temp in (0, 1] - headers = {"Authorization": f"Bearer {sess.api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"} - if api_mode == "responses": - url = auto_make_url(sess.api_base, "responses") - payload = {"model": model, "input": _to_responses_input(messages), "stream": sess.stream, - "prompt_cache_key": _RESP_CACHE_KEY, "instructions": sess.system or "You are an Omnipotent Executor."} - if sess.reasoning_effort: payload["reasoning"] = {"effort": sess.reasoning_effort} - if sess.max_tokens: payload["max_output_tokens"] = sess.max_tokens - else: - url = auto_make_url(sess.api_base, "chat/completions") - if sess.system: messages = [{"role": "system", "content": sess.system}] + messages - _stamp_oai_cache_markers(messages, model) - payload = {"model": model, "messages": messages, "stream": sess.stream} - if sess.stream: payload["stream_options"] = {"include_usage": True} - if temperature != 1: payload["temperature"] = temperature - if sess.max_tokens: payload["max_completion_tokens" if ml.startswith(("gpt-5", "o1", "o2", "o3", "o4")) else "max_tokens"] = sess.max_tokens - if sess.reasoning_effort: payload["reasoning_effort"] = sess.reasoning_effort - tools = getattr(sess, 'tools', None) - if tools: payload["tools"] = _prepare_oai_tools(tools, api_mode) - if sess.service_tier: payload["service_tier"] = sess.service_tier - parse_fn = (lambda r: _parse_openai_sse(r.iter_lines(), api_mode)) if sess.stream else (lambda r: _parse_openai_json(r.json(), api_mode)) - return (yield from _stream_with_retry(sess, url, headers, payload, parse_fn)) - -def _prepare_oai_tools(tools, api_mode="chat_completions"): - if api_mode == "responses": - resp_tools = [] - for t in tools: - if t.get("type") == "function" and "function" in t: - rt = {"type": "function"}; rt.update(t["function"]) - resp_tools.append(rt) - else: resp_tools.append(t) - return resp_tools - return tools - -def _to_responses_input(messages): - result, pending = [], [] - for msg in messages: - role = str(msg.get("role", "user")).lower() - if role == "tool": - cid = msg.get("tool_call_id") or (pending.pop(0) if pending else f"call_{uuid.uuid4().hex[:8]}") - result.append({"type": "function_call_output", "call_id": cid, "output": msg.get("content", "")}) - continue - if role not in ["user", "assistant", "system", "developer"]: role = "user" - if role == "system": role = "developer" # Responses API uses 'developer' instead of 'system' - content = msg.get("content", "") - text_type = "output_text" if role == "assistant" else "input_text" - parts = [] - if isinstance(content, str): - if content: parts.append({"type": text_type, "text": content}) - elif isinstance(content, list): - for part in content: - if not isinstance(part, dict): continue - ptype = part.get("type") - if ptype == "text": - text = part.get("text", "") - if text: parts.append({"type": text_type, "text": text}) - elif ptype == "image_url": - url = (part.get("image_url") or {}).get("url", "") - if url and role != "assistant": parts.append({"type": "input_image", "image_url": url}) - if len(parts) == 0: parts = [{"type": text_type, "text": str(content) if not isinstance(content, list) else '[empty]'}] - result.append({"role": role, "content": parts}) - pending = [] - for tc in (msg.get("tool_calls") or []): - f = tc.get("function", {}) - cid = tc.get("id") or f"call_{uuid.uuid4().hex[:8]}" - pending.append(cid) - result.append({"type": "function_call", "call_id": cid, "name": f.get("name", ""), "arguments": f.get("arguments", "")}) - return result - - -def _msgs_claude2oai(messages): - result = [] - for msg in messages: - role = msg.get("role", "user") - content = msg.get("content", "") - blocks = content if isinstance(content, list) else [{"type": "text", "text": str(content)}] - if role == "assistant": - text_parts, tool_calls, reasoning = [], [], "" - for b in blocks: - if not isinstance(b, dict): continue - if b.get("type") == "thinking" and b.get("thinking"): reasoning = b["thinking"] - elif b.get("type") == "text" and b.get("text"): text_parts.append({"type": "text", "text": b.get("text", "")}) - elif b.get("type") == "tool_use": - tool_calls.append({ - "id": b.get("id") or '', "type": "function", - "function": {"name": b.get("name", ""), "arguments": json.dumps(b.get("input", {}), ensure_ascii=False)} - }) - m = {"role": "assistant"} - if reasoning: m["reasoning_content"] = reasoning - if text_parts: m["content"] = text_parts - else: m["content"] = "" - if tool_calls: m["tool_calls"] = tool_calls - if not text_parts and not tool_calls and reasoning: m["content"] = "." - result.append(m) - elif role == "user": - text_parts = [] - for b in blocks: - if not isinstance(b, dict): continue - if b.get("type") == "tool_result": - if text_parts: - result.append({"role": "user", "content": text_parts}) - text_parts = [] - tr = b.get("content", "") - if isinstance(tr, list): - tr = "\n".join(x.get("text", "") for x in tr if isinstance(x, dict) and x.get("type") == "text") - result.append({"role": "tool", "tool_call_id": b.get("tool_use_id") or '', "content": tr if isinstance(tr, str) else str(tr)}) - elif b.get("type") == "image": - src = b.get("source") or {} - if src.get("type") == "base64" and src.get("data"): - text_parts.append({"type": "image_url", "image_url": {"url": f"data:{src.get('media_type', 'image/png')};base64,{src.get('data', '')}"}}) - elif b.get("type") == "image_url": text_parts.append(b) - elif b.get("type") == "text" and b.get("text"): text_parts.append({"type": "text", "text": b.get("text", "")}) - if text_parts: result.append({"role": "user", "content": text_parts}) - else: result.append(msg) - return result - - -class BaseSession: - def __init__(self, cfg): - self.api_key = cfg['apikey'] - self.api_base = cfg['apibase'].rstrip('/') - self.model = cfg.get('model', '') - self.context_win = cfg.get('context_win', 28000) - self.history = [] - self.lock = threading.Lock() - self.system = "" - self.name = cfg.get('name', self.model) - proxy = cfg.get('proxy') - self.proxies = {"http": proxy, "https": proxy} if proxy else None - self.max_retries = max(0, int(cfg.get('max_retries', 4))) - self.verify = cfg.get('verify', True) - self.stream = cfg.get('stream', True) - default_ct, default_rt = (5, 30) if self.stream else (10, 240) - self.connect_timeout = max(1, int(cfg.get('timeout', default_ct))) - self.read_timeout = max(5, int(cfg.get('read_timeout', default_rt))) - def _enum(key, valid): - v = cfg.get(key); v = None if v is None else str(v).strip().lower() - return v if not v or v in valid else print(f"[WARN] Invalid {key} {v!r}, ignored.") - self.reasoning_effort = _enum('reasoning_effort', {'none', 'minimal', 'low', 'medium', 'high', 'xhigh'}) - self.service_tier = _enum('service_tier', {'auto', 'default', 'priority', 'flex'}) - self.thinking_type = _enum('thinking_type', {'adaptive', 'enabled', 'disabled'}) - self.thinking_budget_tokens = cfg.get('thinking_budget_tokens') - mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_') - self.api_mode = 'responses' if mode in ('responses', 'response') else 'chat_completions' - self.temperature = cfg.get('temperature', 1) - self.max_tokens = cfg.get('max_tokens') - def _apply_claude_thinking(self, payload): - if self.thinking_type: - thinking = {"type": self.thinking_type} - if self.thinking_type == 'enabled': - if self.thinking_budget_tokens is None: print("[WARN] thinking_type='enabled' requires thinking_budget_tokens, ignored.") - else: - thinking["budget_tokens"] = self.thinking_budget_tokens; payload["thinking"] = thinking - else: payload["thinking"] = thinking - if self.reasoning_effort: - effort = {'low': 'low', 'medium': 'medium', 'high': 'high', 'xhigh': 'max'}.get(self.reasoning_effort) - if effort: payload["output_config"] = {"effort": effort} - else: print(f"[WARN] reasoning_effort {self.reasoning_effort!r} is unsupported for Claude output_config.effort, ignored.") - def ask(self, prompt): - def _ask_gen(): - with self.lock: - self.history.append({"role": "user", "content": [{"type": "text", "text": prompt}]}) - trim_messages_history(self.history, self.context_win) - messages = self.make_messages(self.history) - content_blocks = None; content = '' - gen = self.raw_ask(messages) - try: - while True: chunk = next(gen); content += chunk; yield chunk - except StopIteration as e: content_blocks = e.value or [] - if len(content_blocks) > 1: print(f"[DEBUG BaseSession.ask] content_blocks: {content_blocks}") - for block in (content_blocks or []): - if block.get('type', '') == 'tool_use': - tu = {'name': block.get('name', ''), 'arguments': block.get('input', {})} - yield f'{json.dumps(tu, ensure_ascii=False)}' - if not content.startswith("!!!Error:"): self.history.append({"role": "assistant", "content": [{"type": "text", "text": content}]}) - return _ask_gen() if self.stream else ''.join(list(_ask_gen())) - -def _keep_claude_block(b): return not isinstance(b, dict) or b.get("type") != "thinking" or b.get("signature") -def _drop_unsigned_thinking(messages): - for m in messages: - c = m.get("content") - if isinstance(c, list): m["content"] = [b for b in c if _keep_claude_block(b)] - return messages - -def _ensure_thinking_blocks(messages, model): - """deepseek needs thinking in history!""" - if 'deepseek' not in model.lower(): return messages - for m in messages: - if m.get("role") != "assistant": continue - c = m.get("content") - if not isinstance(c, list): continue - has_thinking = any(isinstance(b, dict) and b.get("type") == "thinking" for b in c) - if not has_thinking: m["content"] = [{"type": "thinking", "thinking": "...", "signature": "placeholder"}, *c] - return messages - -class ClaudeSession(BaseSession): - def raw_ask(self, messages): - if self.max_tokens is None: self.max_tokens = 8192 - headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31"} - payload = {"model": self.model, "messages": messages, "max_tokens": self.max_tokens, "stream": self.stream} - if self.temperature != 1: payload["temperature"] = self.temperature - self._apply_claude_thinking(payload) - if self.system: payload["system"] = [{"type": "text", "text": self.system, "cache_control": {"type": "persistent"}}] - url = auto_make_url(self.api_base, "messages") - parse_fn = (lambda r: _parse_claude_sse(r.iter_lines())) if self.stream else (lambda r: _parse_claude_json(r.json())) - return (yield from _stream_with_retry(self, url, headers, payload, parse_fn)) - def make_messages(self, raw_list): - msgs = _drop_unsigned_thinking([{"role": m['role'], "content": list(m['content'])} for m in raw_list]) - user_idxs = [i for i, m in enumerate(msgs) if m['role'] == 'user'] - for idx in user_idxs[-2:]: - msgs[idx]["content"][-1] = dict(msgs[idx]["content"][-1], cache_control={"type": "ephemeral"}) - return msgs - -class LLMSession(BaseSession): - def raw_ask(self, messages): return (yield from _openai_stream(self, messages)) - def make_messages(self, raw_list): return _msgs_claude2oai(raw_list) - -def _fix_messages(messages): - """修复 messages 符合 Claude API:交替、tool_use/tool_result 配对""" - if not messages: return messages - _wrap = lambda c: c if isinstance(c, list) else [{"type": "text", "text": str(c)}] - fixed = [] - for m in messages: - if fixed and m['role'] == fixed[-1]['role']: - fixed[-1] = {**fixed[-1], 'content': _wrap(fixed[-1]['content']) + [{"type": "text", "text": "\n"}] + _wrap(m['content'])}; continue - if fixed and fixed[-1]['role'] == 'assistant' and m['role'] == 'user': - uses = [b.get('id') for b in fixed[-1].get('content', []) if isinstance(b, dict) and b.get('type') == 'tool_use' and b.get('id')] - has = {b.get('tool_use_id') for b in _wrap(m['content']) if isinstance(b, dict) and b.get('type') == 'tool_result'} - miss = [uid for uid in uses if uid not in has] - if miss: m = {**m, 'content': [{"type": "tool_result", "tool_use_id": uid, "content": "(error)"} for uid in miss] + _wrap(m['content'])} - orphan = has - set(uses) - if orphan: m = {**m, 'content': [{"type":"text","text":str(b.get('content',''))} if isinstance(b,dict) and b.get('type')=='tool_result' and b.get('tool_use_id') in orphan else b for b in _wrap(m['content'])]} - fixed.append(m) - while fixed and fixed[0]['role'] != 'user': fixed.pop(0) - return fixed - -class NativeClaudeSession(BaseSession): - def __init__(self, cfg): - super().__init__(cfg) - self.fake_cc_system_prompt = cfg.get("fake_cc_system_prompt", False) - self.user_agent = cfg.get("user_agent", "claude-cli/2.1.113 (external, cli)") - self._session_id = str(uuid.uuid4()) - self._account_uuid = str(uuid.uuid4()) - self._device_id = uuid.uuid4().hex + uuid.uuid4().hex[:32] - self.tools = None - def raw_ask(self, messages): - messages = _ensure_thinking_blocks(_drop_unsigned_thinking(_fix_messages(messages)), self.model) - if self.max_tokens is None: self.max_tokens = 8192 - model = self.model - beta_parts = ["claude-code-20250219", "interleaved-thinking-2025-05-14", "redact-thinking-2026-02-12", "prompt-caching-scope-2026-01-05"] - if "[1m]" in model.lower(): - beta_parts.insert(1, "context-1m-2025-08-07"); model = model.replace("[1m]", "").replace("[1M]", "") - headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01", - "anthropic-beta": ",".join(beta_parts), "anthropic-dangerous-direct-browser-access": "true", - "user-agent": self.user_agent, "x-app": "cli"} - if self.api_key.startswith("sk-ant-"): headers["x-api-key"] = self.api_key - else: headers["authorization"] = f"Bearer {self.api_key}" - payload = {"model": model, "messages": messages, "max_tokens": self.max_tokens, "stream": self.stream} - if self.temperature != 1: payload["temperature"] = self.temperature - self._apply_claude_thinking(payload) - payload["metadata"] = {"user_id": json.dumps({"device_id": self._device_id, "account_uuid": self._account_uuid, "session_id": self._session_id}, separators=(',', ':'))} - if self.tools: - claude_tools = openai_tools_to_claude(self.tools) - tools = [dict(t) for t in claude_tools]; tools[-1]["cache_control"] = {"type": "ephemeral"} - payload["tools"] = tools - else: print("[ERROR] No tools provided for this session.") - payload['system'] = [{"type": "text", "text": "You are Claude Code, Anthropic's official CLI for Claude.", "cache_control": {"type": "ephemeral"}}] - if self.system: - if self.fake_cc_system_prompt: messages[0]["content"].insert(0, {"type": "text", "text": self.system}) - else: payload["system"] = [{"type": "text", "text": self.system}] - user_idxs = [i for i, m in enumerate(messages) if m['role'] == 'user'] - for idx in user_idxs[-2:]: - messages[idx] = {**messages[idx], "content": list(messages[idx]["content"])} - messages[idx]["content"][-1] = dict(messages[idx]["content"][-1], cache_control={"type": "ephemeral"}) - url = auto_make_url(self.api_base, "messages") + '?beta=true' - parse_fn = (lambda r: _parse_claude_sse(r.iter_lines())) if self.stream else (lambda r: _parse_claude_json(r.json())) - return (yield from _stream_with_retry(self, url, headers, payload, parse_fn)) - - def ask(self, msg): - assert type(msg) is dict - with self.lock: - self.history.append(msg) - trim_messages_history(self.history, self.context_win) - messages = [{"role": m["role"], "content": list(m["content"])} for m in self.history] - content_blocks = None - gen = self.raw_ask(messages) - try: - while True: yield next(gen) - except StopIteration as e: content_blocks = e.value or [] - if content_blocks and (_injected := _ensure_text_block(content_blocks)): yield _injected - if content_blocks and not (len(content_blocks) == 1 and content_blocks[0].get("text", "").startswith("!!!Error:")): - self.history.append({"role": "assistant", "content": content_blocks}) - text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"] - content = "\n".join(text_parts).strip() - tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in content_blocks if b.get("type") == "tool_use"] - if not tool_calls: tool_calls, content = _parse_text_tool_calls(content) - thinking_parts = [b["thinking"] for b in content_blocks if b.get("type") == "thinking"] - thinking = "\n".join(thinking_parts).strip() - if not thinking: - think_pattern = r"(.*?)" - think_match = re.search(think_pattern, content, re.DOTALL) - if think_match: - thinking = think_match.group(1).strip() - content = re.sub(think_pattern, "", content, flags=re.DOTALL) - return MockResponse(thinking, content, tool_calls, str(content_blocks)) - -class NativeOAISession(NativeClaudeSession): - def raw_ask(self, messages): - messages = _fix_messages(messages) - messages = _ensure_thinking_blocks(messages, self.model) - return (yield from _openai_stream(self, _msgs_claude2oai(messages))) - -def openai_tools_to_claude(tools): - """[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}].""" - result = [] - for t in tools: - if 'input_schema' in t: result.append(t); continue # 已是claude格式 - fn = t.get('function', t) - result.append({'name': fn['name'], 'description': fn.get('description', ''), - 'input_schema': fn.get('parameters', {'type': 'object', 'properties': {}})}) - return result - -class MockFunction: - def __init__(self, name, arguments): self.name, self.arguments = name, arguments - -class MockToolCall: - def __init__(self, name, args, id=''): - arg_str = json.dumps(args, ensure_ascii=False) if isinstance(args, (dict, list)) else (args or '{}') - self.function = MockFunction(name, arg_str); self.id = id - -class MockResponse: - def __init__(self, thinking, content, tool_calls, raw, stop_reason='end_turn'): - self.thinking = thinking; self.content = content - self.tool_calls = tool_calls; self.raw = raw - self.stop_reason = 'tool_use' if tool_calls else stop_reason - def __repr__(self): - return f"" - -class ToolClient: - def __init__(self, backend, auto_save_tokens=True): - self.backend = backend - self.auto_save_tokens = auto_save_tokens - self.last_tools = '' - self.name = self.backend.name - self.total_cd_tokens = 0 - self.log_path = None - - def chat(self, messages, tools=None): - tools = json.loads(json.dumps(tools, ensure_ascii=False)) if tools else tools - for t in tools or []: - f = t.get('function', {}) - if f.get('name') == 'file_write': - props = f.get('parameters', {}).get('properties', {}) - props.pop('content', None) - extra = '. Content must be placed in tags in reply body, not in args' - if extra not in f.get('description', ''): f['description'] = f.get('description', '') + extra - break - full_prompt = self._build_protocol_prompt(messages, tools) - print("Full prompt length:", len(full_prompt), 'chars') - gen = self.backend.ask(full_prompt) - _write_llm_log('Prompt', full_prompt, self.log_path) - raw_text = '' - for chunk in gen: - raw_text += chunk; yield chunk - _write_llm_log('Response', raw_text, self.log_path) - return self._parse_mixed_response(raw_text) - - def _prepare_tool_instruction(self, tools): - tool_instruction = "" - if not tools: return tool_instruction - tools_json = json.dumps(tools, ensure_ascii=False, separators=(',', ':')) - _en = os.environ.get('GA_LANG') == 'en' - if _en: - tool_instruction = f""" -### Interaction Protocol (must follow strictly, always in effect) -Follow these steps to think and act: -1. **Think**: Analyze the current situation and strategy inside `` tags. -2. **Summarize**: Output a minimal one-line (<30 words) physical snapshot in ``: new info from last tool result + current tool call intent. This goes into long-term working memory. Must contain real information, no filler. -3. **Act**: If you need to call tools, output one or more ** blocks** after your reply, then stop. -""" - else: - tool_instruction = f""" -### 交互协议 (必须严格遵守,持续有效) -请按照以下步骤思考并行动: -1. **思考**: 在 `` 标签中先进行思考,分析现状和策略。 -2. **总结**: 在 `` 中输出*极为简短*的高度概括的单行(<30字)物理快照,包括上次工具调用结果产生的新信息+本次工具调用意图。此内容将进入长期工作记忆,记录关键信息,严禁输出无实际信息增量的描述。 -3. **行动**: 如需调用工具,请在回复正文之后输出一个(或多个)**块**,然后结束。 -""" - tool_instruction += f'\nFormat: ```{{"name": "tool_name", "arguments": {{...}}}}```\n\n### Tools (mounted, always in effect):\n{tools_json}\n' - if self.auto_save_tokens and self.last_tools == tools_json: - tool_instruction = "\n### Tools: still active, **ready to call**. Protocol unchanged.\n" if _en else "\n### 工具库状态:持续有效(code_run/file_read等),**可正常调用**。调用协议沿用。\n" - else: self.total_cd_tokens = 0 - self.last_tools = tools_json - return tool_instruction - - def _build_protocol_prompt(self, messages, tools): - system_content = next((m['content'] for m in messages if m['role'].lower() == 'system'), "") - history_msgs = [m for m in messages if m['role'].lower() != 'system'] - tool_instruction = self._prepare_tool_instruction(tools) - system = ""; user = "" - if system_content: system += f"{system_content}\n" - system += f"{tool_instruction}" - for m in history_msgs: - role = "USER" if m['role'] == 'user' else "ASSISTANT" - user += f"=== {role} ===\n" - for tr in m.get('tool_results', []): user += f'{tr["content"]}\n' - user += str(m['content']) + "\n" - self.total_cd_tokens += len(user) // 3 - if self.total_cd_tokens > 9000: self.last_tools = '' - user += "=== ASSISTANT ===\n" - return system + user - - def _parse_mixed_response(self, text): - remaining_text = text; thinking = '' - think_match = re.search(r"(.*?)", text, re.DOTALL) - if think_match: - thinking = think_match.group(1).strip() - remaining_text = re.sub(r"(.*?)", "", remaining_text, flags=re.DOTALL) - tool_calls, remaining_text = _parse_text_tool_calls(remaining_text) - if not tool_calls: - json_strs = []; errors = [] - if '' in remaining_text: - weaktoolstr = remaining_text.split('')[-1].strip().strip('><') - json_str = weaktoolstr if weaktoolstr.endswith('}') else '' - if json_str == '' and '```' in weaktoolstr and weaktoolstr.split('```')[0].strip().endswith('}'): - json_str = weaktoolstr.split('```')[0].strip() - if json_str: json_strs.append(json_str) - remaining_text = remaining_text.replace(''+weaktoolstr, "") - elif '"name":' in remaining_text and '"arguments":' in remaining_text: - json_match = re.search(r'\{.*"name":.*\}', remaining_text, re.DOTALL) - if json_match: - json_strs.append(json_match.group(0).strip()) - remaining_text = remaining_text.replace(json_match.group(0), "").strip() - for json_str in json_strs: - try: - data = tryparse(json_str) - func_name = data.get('name') or data.get('function') or data.get('tool') - args = data.get('arguments') or data.get('args') or data.get('params') or data.get('parameters') - if args is None: args = data - if func_name: tool_calls.append(MockToolCall(func_name, args)) - except json.JSONDecodeError: - errors.append(f'Failed to parse tool_use JSON: {json_str[:200]}') - self.last_tools = '' - except: pass - if not tool_calls: - for e in errors: - print(f"[Warn] {e}"); tool_calls.append(MockToolCall('bad_json', {'msg': e})) - return MockResponse(thinking, remaining_text.strip(), tool_calls, text) - -def _parse_text_tool_calls(content): - """Fallback: extract tool calls from text when model doesn't use native tool_use blocks.""" - tcs = [] - # try JSON array: [{"type":"tool_use", "name":..., "input":...}] - _jp = next((p for p in ['[{"type":"tool_use"', '[{"type": "tool_use"'] if p in content), None) - if _jp and content.endswith('}]'): - try: - idx = content.index(_jp); raw = json.loads(content[idx:]) - tcs = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in raw if b.get("type") == "tool_use"] - return tcs, content[:idx].strip() - except: pass - # try XML tags: {"name":..., "arguments":...} - _xp = r"<(?:tool_use|tool_call)>((?:(?!<(?:tool_use|tool_call)>).){15,}?)" - for s in re.findall(_xp, content, re.DOTALL): - try: - d = tryparse(s.strip()); name = d.get('name') - args = d.get('arguments') or d.get('args') or d.get('input') or {} - if name: tcs.append(MockToolCall(name, args)) - except: pass - if tcs: content = re.sub(_xp, "", content, flags=re.DOTALL).strip() - return tcs, content - -def _ensure_text_block(blocks): - """If response has thinking but no text block, inject a synthetic summary from thinking's first line.""" - if any(b.get("type") == "text" for b in blocks): return None - th = next((b.get("thinking", "") for b in blocks if b.get("type") == "thinking"), "") - if not th: return None - line = th.strip().split('\n', 1)[0] - txt = "" + (line[:60] + '...' if len(line) > 60 else line) + "" - blocks.insert(1, {"type": "text", "text": txt}) - return txt - -def _write_llm_log(label, content, log_path=None): - if not log_path: - log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'temp/model_responses/model_responses_{os.getpid()}.txt') - os.makedirs(os.path.dirname(os.path.abspath(log_path)), exist_ok=True) - ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - with open(log_path, 'a', encoding='utf-8', errors='replace') as f: - f.write(f"=== {label} === {ts}\n{content}\n\n") - -def tryparse(json_str): - try: return json.loads(json_str) - except: pass - json_str = json_str.strip().strip('`').replace('json\n', '', 1).strip() - try: return json.loads(json_str) - except: pass - try: return json.loads(json_str[:-1]) - except: pass - if '}' in json_str: json_str = json_str[:json_str.rfind('}') + 1] - return json.loads(json_str) - -class MixinSession: - """Multi-session fallback with spring-back to primary.""" - def __init__(self, all_sessions, cfg): - self._retries, self._base_delay = cfg.get('max_retries', 3), cfg.get('base_delay', 1.5) - self._spring_sec = cfg.get('spring_back', 300) - self._sessions = [all_sessions[i].backend if isinstance(i, int) else - next(s.backend for s in all_sessions if type(s) is not dict and s.backend.name == i) for i in cfg.get('llm_nos', [])] - is_native = lambda s: 'Native' in s.__class__.__name__ - groups = {is_native(s) for s in self._sessions} - assert len(groups) == 1, f"MixinSession: sessions must be in same group (Native or non-Native), got {[type(s).__name__ for s in self._sessions]}" - self.name = '|'.join(s.name for s in self._sessions) - import copy; self._sessions = [copy.copy(s) for s in self._sessions] - for s in self._sessions: s.max_retries = 0 - self._orig_raw_asks = [s.raw_ask for s in self._sessions] - self._sessions[0].raw_ask = self._raw_ask - self.model = getattr(self._sessions[0], 'model', None) - self._cur_idx, self._switched_at = 0, 0.0 - def __getattr__(self, name): return getattr(self._sessions[0], name) - _BROADCAST_ATTRS = frozenset({'system', 'tools', 'temperature', 'max_tokens', 'reasoning_effort', 'history'}) - def __setattr__(self, name, value): - if name in self._BROADCAST_ATTRS: - for s in self._sessions: - v = openai_tools_to_claude(value) if name == 'tools' and type(s) is NativeClaudeSession else value - setattr(s, name, v) - else: object.__setattr__(self, name, value) @property - def primary(self): return self._sessions[0] - def _pick(self): - if self._cur_idx and time.time() - self._switched_at > self._spring_sec: self._cur_idx = 0 - return self._cur_idx - def _raw_ask(self, *args, **kwargs): - base, n = self._pick(), len(self._sessions) - test_error = lambda x: isinstance(x, str) and x.lstrip().startswith(('!!!Error:', '[Error:')) - for attempt in range(self._retries + 1): - idx = (base + attempt) % n - gen = self._orig_raw_asks[idx](*args, **kwargs) - print(f'[MixinSession] Using session ({self._sessions[idx].name})') - last_chunk, return_val, yielded = None, [], False - try: - while True: - chunk = next(gen); last_chunk = chunk - if not yielded and test_error(chunk): continue - yield chunk; yielded = True - except StopIteration as e: return_val = e.value or [] - is_err = test_error(last_chunk) - if not is_err: - if attempt > 0: self._cur_idx = idx; self._switched_at = time.time() - elif isinstance(last_chunk, str) and '[!!! 流异常中断' in last_chunk and n > 1: - self._cur_idx = (idx + 1) % n; self._switched_at = time.time() - print(f'[MixinSession] Partial failure, next call → s{self._cur_idx} ({self._sessions[self._cur_idx].name})') - return return_val - if attempt >= self._retries: - yield last_chunk; return return_val - nxt = (base + attempt + 1) % n - if nxt == base: # full round failed, delay before next - rnd = (attempt + 1) // n - delay = min(30, self._base_delay * (1.5 ** rnd)) - print(f'[MixinSession] {last_chunk[:80]}, round {rnd} exhausted, retry in {delay:.1f}s') - time.sleep(delay) - else: print(f'[MixinSession] {last_chunk[:80]}, retry {attempt+1}/{self._retries} (s{idx}→s{nxt})') - -THINKING_PROMPT_ZH = """ -### 行动规范(持续有效) -每次回复(含工具调用轮)都先在回复文字中包含一个 中输出极简单行(<30字)物理快照:上次结果新信息+本次意图。此内容进入长期工作记忆。 -\n**若用户需求未完成,必须进行工具调用!** -""".strip() -THINKING_PROMPT_EN = """ -### Action Protocol (always in effect) -The reply body should first include a minimal one-line (<30 words) physical snapshot in : new info from last result + current intent. This goes into long-term working memory. -\n**If the user's request is not yet complete, tool calls are required!** -""".strip() - -class NativeToolClient: - @staticmethod - def _thinking_prompt(): return THINKING_PROMPT_EN if os.environ.get('GA_LANG') == 'en' else THINKING_PROMPT_ZH - def __init__(self, backend): - self.backend = backend - self.backend.system = self._thinking_prompt() - self.name = self.backend.name - self._pending_tool_ids = [] - self.log_path = None - def set_system(self, extra_system): - combined = f"{extra_system}\n\n{self._thinking_prompt()}" if extra_system else self._thinking_prompt() - if combined != self.backend.system: print(f"[Debug] Updated system prompt, length {len(combined)} chars.") - self.backend.system = combined - def chat(self, messages, tools=None): - if tools: self.backend.tools = tools - if not self.backend.history: self._pending_tool_ids = [] - combined_content = []; resp = None; tool_results = [] - for msg in messages: - c = msg.get('content', '') - if msg['role'] == 'system': - self.set_system(c); continue - if isinstance(c, str): combined_content.append({"type": "text", "text": c}) - elif isinstance(c, list): combined_content.extend(c) - if msg['role'] == 'user' and msg.get('tool_results'): tool_results.extend(msg['tool_results']) - tr_id_set = set(); tool_result_blocks = [] - for tr in tool_results: - tool_use_id, content = tr.get("tool_use_id", ""), tr.get("content", "") - tr_id_set.add(tool_use_id) - if tool_use_id: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tool_use_id, "content": tr.get("content", "")}) - else: combined_content = [{"type": "text", "text": f'{content}'}] + combined_content - for tid in self._pending_tool_ids: - if tid not in tr_id_set: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tid, "content": ""}) - self._pending_tool_ids = [] - # Filter whitespace-only text blocks that cause 400 on strict API proxies - merged = {"role": "user", "content": tool_result_blocks + [c for c in combined_content if c.get("text", "").strip()]} - _write_llm_log('Prompt', json.dumps(merged, ensure_ascii=False, indent=2), self.log_path) - gen = self.backend.ask(merged) - try: - while True: - chunk = next(gen); yield chunk - except StopIteration as e: resp = e.value - if resp: _write_llm_log('Response', resp.raw, self.log_path) - if resp and hasattr(resp, 'tool_calls') and resp.tool_calls: self._pending_tool_ids = [tc.id for tc in resp.tool_calls] - return resp - -def resolve_session(cfg_name): - cfg = reload_mykeys()[0].get(cfg_name) - if not cfg: raise ValueError(f"Config '{cfg_name}' not in mykey") - if 'native' in cfg_name: return (NativeClaudeSession if 'claude' in cfg_name else NativeOAISession)(cfg=cfg) - if 'claude' in cfg_name: return ClaudeSession(cfg=cfg) - return LLMSession(cfg=cfg) if 'oai' in cfg_name else None - -def resolve_client(cfg_name): - s = resolve_session(cfg_name) - return (NativeToolClient(s) if isinstance(s, (NativeClaudeSession, NativeOAISession)) else ToolClient(s)) if s else None + def enabled(self): + return self._enabled + + def classify(self, query: str) -> str: + """返回 'simple' 或 'complex'""" + q = query.strip() + if not q: + return 'simple' + # 短查询(<=8字符)且无关键词 → simple + if len(q) <= 8: + self._last_classification = 'simple' + return 'simple' + # 关键词命中 → complex + q_lower = q.lower() + for kw in self.COMPLEX_KEYWORDS: + if kw.lower() in q_lower: + self._last_classification = 'complex' + return 'complex' + # 含代码特征 → complex + if any(c in q for c in ['{', '}', ';', '==', '!=', '=>', '->', 'def ', 'class ', 'import ']): + self._last_classification = 'complex' + return 'complex' + self._last_classification = 'simple' + return 'simple' + + def get_last(self): + return self._last_classification + + def get_route(self, query: str) -> str: + """返回 'simple' 或 'complex' based on query and enabled state""" + if not self._enabled: + return None # 未启用,由外部决定 + return self.classify(query) -def fast_ask(prompt, cfg_name): - sess = resolve_session(cfg_name) - if not sess: raise ValueError(f"fast_ask: '{cfg_name}' unsupported") - return "".join(sess.raw_ask([{"role": "user", "content": prompt}])) From 5a381a06339c7e3086bd9dee44a9c0fe771cf8b3 Mon Sep 17 00:00:00 2001 From: zhangPinkdolphin <2308717915@qq.com> Date: Sat, 9 May 2026 14:59:06 +0800 Subject: [PATCH 2/4] feat: integrate SmartRouter into GenericAgent Add /llm auto command, auto_route property, and automatic model routing in gen_answer based on task complexity. From c444038572ef6674781208edf556ba9df9d3b5ee Mon Sep 17 00:00:00 2001 From: zhangPinkdolphin <2308717915@qq.com> Date: Sat, 9 May 2026 15:00:39 +0800 Subject: [PATCH 3/4] fix: restore llmcore.py content and add SmartRouter class --- llmcore.py | 1090 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 1054 insertions(+), 36 deletions(-) diff --git a/llmcore.py b/llmcore.py index ec73e6e5..ff191702 100644 --- a/llmcore.py +++ b/llmcore.py @@ -1,58 +1,1076 @@ -6468291e0cb02c75450648842b818ca439c121bc +import os, json, re, time, requests, sys, threading, urllib3, base64, importlib, uuid +from datetime import datetime +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +_RESP_CACHE_KEY = str(uuid.uuid4()) + +def _load_mykeys(): + global _mykey_path + try: + import mykey; importlib.reload(mykey); _mykey_path = mykey.__file__ + return {k: v for k, v in vars(mykey).items() if not k.startswith('_')} + except ImportError: pass + _mykey_path = p = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mykey.json') + if not os.path.exists(p): raise Exception('[ERROR] mykey.py or mykey.json not found, please create one from mykey_template.') + with open(p, encoding='utf-8') as f: return json.load(f) + +_mykey_path = _mykey_mtime = None +def reload_mykeys(): + global _mykey_mtime + mt = os.stat(_mykey_path).st_mtime_ns if _mykey_path else -1 + if mt == _mykey_mtime: return globals().get('mykeys', {}), False + mk = _load_mykeys(); _mykey_mtime = os.stat(_mykey_path).st_mtime_ns + print(f'[Info] Load mykeys from {_mykey_path}') + globals().update(mykeys=mk) + if mk.get('langfuse_config'): + try: from plugins import langfuse_tracing + except Exception: pass + return mk, True + +def __getattr__(name): # once guard in PEP 562 + if name == 'mykeys': return reload_mykeys()[0] + raise AttributeError(f"module 'llmcore' has no attribute {name}") + +def compress_history_tags(messages, keep_recent=10, max_len=800, force=False): + """Compress // tags in older messages to save tokens.""" + compress_history_tags._cd = getattr(compress_history_tags, '_cd', 0) + 1 + if force: compress_history_tags._cd = 0 + if compress_history_tags._cd % 5 != 0: return messages + _before = sum(len(json.dumps(m, ensure_ascii=False)) for m in messages) + _pats = {tag: re.compile(rf'(<{tag}>)([\s\S]*?)()') for tag in ('thinking', 'think', 'tool_use', 'tool_result')} + _hist_pat = re.compile(r'<(history|key_info|earlier_context)>[\s\S]*?') + def _trunc_str(s): return s[:max_len//2] + '\n...[Truncated]...\n' + s[-max_len//2:] if isinstance(s, str) and len(s) > max_len else s + def _trunc(text): + text = _hist_pat.sub(lambda m: f'<{m.group(1)}>[...]', text) + for pat in _pats.values(): text = pat.sub(lambda m: m.group(1) + _trunc_str(m.group(2)) + m.group(3), text) + return text + for i, msg in enumerate(messages): + if i >= len(messages) - keep_recent: break + c = msg['content'] + if isinstance(c, str): msg['content'] = _trunc(c) + elif isinstance(c, list): + for b in c: + if not isinstance(b, dict): continue + t = b.get('type') + if t == 'text' and isinstance(b.get('text'), str): b['text'] = _trunc(b['text']) + elif t == 'tool_result': + tc = b.get('content') + if isinstance(tc, str): b['content'] = _trunc_str(tc) + elif isinstance(tc, list): + for sub in tc: + if isinstance(sub, dict) and sub.get('type') == 'text': sub['text'] = _trunc_str(sub.get('text')) + elif t == 'tool_use' and isinstance(b.get('input'), dict): + for k, v in b['input'].items(): b['input'][k] = _trunc_str(v) + print(f"[Cut] {_before} -> {sum(len(json.dumps(m, ensure_ascii=False)) for m in messages)}") + return messages + +def _sanitize_leading_user_msg(msg): + """把 user 消息里的 tool_result 块改写成纯文本,避免孤立引用。 + history 统一使用 Claude content-block 格式:content 是 list of blocks。""" + msg = dict(msg) # 浅拷贝外层 dict + content = msg.get('content') + if not isinstance(content, list): return msg + texts = [] + for block in content: + if not isinstance(block, dict): continue + if block.get('type') == 'tool_result': + c = block.get('content', '') + if isinstance(c, list): # content 本身也可能是 list[{type:text,text:...}] + texts.extend(b.get('text', '') for b in c if isinstance(b, dict)) + else: texts.append(str(c)) + elif block.get('type') == 'text': texts.append(block.get('text', '')) + msg['content'] = [{"type": "text", "text": '\n'.join(t for t in texts if t)}] + return msg + +_oldprint = print +def safeprint(*argv): + try: _oldprint(*argv) + except OSError: pass +print = safeprint + +def trim_messages_history(history, context_win): + compress_history_tags(history) + cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history) + print(f'[Debug] Current context: {cost} chars, {len(history)} messages.') + if cost > context_win * 3: + compress_history_tags(history, keep_recent=4, force=True) # trim breaks cache, so compress more btw + target = context_win * 3 * 0.6 + while len(history) > 5 and cost > target: + history.pop(0) + while history and history[0].get('role') != 'user': history.pop(0) + if history and history[0].get('role') == 'user': history[0] = _sanitize_leading_user_msg(history[0]) + cost = sum(len(json.dumps(m, ensure_ascii=False)) for m in history) + print(f'[Debug] Trimmed context, current: {cost} chars, {len(history)} messages.') + +def auto_make_url(base, path): + b, p = base.rstrip('/'), path.strip('/') + if b.endswith('$'): return b[:-1].rstrip('/') + if b.endswith(p): return b + return f"{b}/{p}" if re.search(r'/v\d+(/|$)', b) else f"{b}/v1/{p}" + +def _parse_claude_json(data): + content_blocks = data.get("content", []) + _record_usage(data.get("usage", {}), "messages") + for b in content_blocks: + if b.get("type") == "text": yield b.get("text", "") + elif b.get("type") == "thinking": yield "" + return content_blocks + +def _parse_claude_sse(resp_lines): + """Parse Anthropic SSE stream. Yields text chunks, returns list[content_block].""" + content_blocks = []; current_block = None; tool_json_buf = "" + stop_reason = None; got_message_stop = False; warn = None + for line in resp_lines: + if not line: continue + line = line.decode('utf-8') if isinstance(line, bytes) else line + if not line.startswith("data:"): continue + data_str = line[5:].lstrip() + if data_str == "[DONE]": break + try: evt = json.loads(data_str) + except Exception as e: + print(f"[SSE] JSON parse error: {e}, line: {data_str[:200]}") + continue + evt_type = evt.get("type", "") + if evt_type == "message_start": + usage = evt.get("message", {}).get("usage", {}) + _record_usage(usage, "messages") + elif evt_type == "content_block_start": + block = evt.get("content_block", {}) + if block.get("type") == "text": current_block = {"type": "text", "text": ""} + elif block.get("type") == "thinking": current_block = {"type": "thinking", "thinking": "", "signature": ""} + elif block.get("type") == "tool_use": + current_block = {"type": "tool_use", "id": block.get("id", ""), "name": block.get("name", ""), "input": {}} + tool_json_buf = "" + elif evt_type == "content_block_delta": + delta = evt.get("delta", {}) + if delta.get("type") == "text_delta": + text = delta.get("text", "") + if current_block and current_block.get("type") == "text": current_block["text"] += text + if text: yield text + elif delta.get("type") == "thinking_delta": + if current_block and current_block.get("type") == "thinking": current_block["thinking"] += delta.get("thinking", "") + elif delta.get("type") == "signature_delta": + if current_block and current_block.get("type") == "thinking": + current_block["signature"] = current_block.get("signature", "") + delta.get("signature", "") + elif delta.get("type") == "input_json_delta": tool_json_buf += delta.get("partial_json", "") + elif evt_type == "content_block_stop": + if current_block: + if current_block["type"] == "tool_use": + try: current_block["input"] = json.loads(tool_json_buf) if tool_json_buf else {} + except: current_block["input"] = {"_raw": tool_json_buf} + content_blocks.append(current_block) + current_block = None + elif evt_type == "message_delta": + delta = evt.get("delta", {}) + stop_reason = delta.get("stop_reason", stop_reason) + out_usage = evt.get("usage", {}) + out_tokens = out_usage.get("output_tokens", 0) + if out_tokens: print(f"[Output] tokens={out_tokens} stop_reason={stop_reason}") + elif evt_type == "message_stop": got_message_stop = True + elif evt_type == "error": + err = evt.get("error", {}) + emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err) + warn = f"\n\n!!!Error: SSE {emsg}"; break + if not warn: + if not got_message_stop and not stop_reason: warn = "\n\n[!!! 流异常中断,未收到完整响应 !!!]" + elif stop_reason == "max_tokens": warn = "\n\n[!!! Response truncated: max_tokens !!!]" + if current_block: + if current_block["type"] == "tool_use": + try: current_block["input"] = json.loads(tool_json_buf) if tool_json_buf else {} + except: current_block["input"] = {"_raw": tool_json_buf} + content_blocks.append(current_block); current_block = None + if warn: + print(f"[WARN] {warn.strip()}") + content_blocks.append({"type": "text", "text": warn}); yield warn + return content_blocks + +def _try_parse_tool_args(raw): + """Parse tool args string; split concatenated JSON objects like {..}{..} if needed. + Returns list of parsed dicts.""" + if not raw: return [{}] + try: return [json.loads(raw)] + except: pass + parts = re.split(r'(?<=\})(?=\{)', raw) + if len(parts) > 1: + parsed = [] + for p in parts: + try: parsed.append(json.loads(p)) + except: return [{"_raw": raw}] + return parsed + return [{"_raw": raw}] + +def _parse_openai_sse(resp_lines, api_mode="chat_completions"): + """Parse OpenAI SSE stream (chat_completions or responses API). + Yields text chunks, returns list[content_block]. + content_block: {type:'text', text:str} | {type:'tool_use', id:str, name:str, input:dict} + """ + content_text = "" + if api_mode == "responses": + seen_delta = False; fc_buf = {}; current_fc_idx = None + for line in resp_lines: + if not line: continue + line = line.decode('utf-8', errors='replace') if isinstance(line, bytes) else line + if not line.startswith("data:"): continue + data_str = line[5:].lstrip() + if data_str == "[DONE]": break + try: evt = json.loads(data_str) + except: continue + etype = evt.get("type", "") + if etype == "response.output_text.delta": + delta = evt.get("delta", "") + if delta: seen_delta = True; content_text += delta; yield delta + elif etype == "response.output_text.done" and not seen_delta: + text = evt.get("text", "") + if text: content_text += text; yield text + elif etype == "response.output_item.added": + item = evt.get("item", {}) + if item.get("type") == "function_call": + idx = evt.get("output_index", 0) + fc_buf[idx] = {"id": item.get("call_id", item.get("id", "")), "name": item.get("name", ""), "args": ""} + current_fc_idx = idx + elif etype == "response.function_call_arguments.delta": + idx = evt.get("output_index", current_fc_idx or 0) + if idx in fc_buf: fc_buf[idx]["args"] += evt.get("delta", "") + elif etype == "response.function_call_arguments.done": + idx = evt.get("output_index", current_fc_idx or 0) + if idx in fc_buf: fc_buf[idx]["args"] = evt.get("arguments", fc_buf[idx]["args"]) + elif etype == "error": + err = evt.get("error", {}) + emsg = err.get("message", str(err)) if isinstance(err, dict) else str(err) + if emsg: content_text += f"!!!Error: {emsg}"; yield f"!!!Error: {emsg}" + break + elif etype == "response.completed": + usage = evt.get("response", {}).get("usage", {}) + _record_usage(usage, api_mode) + break + blocks = [] + if content_text: blocks.append({"type": "text", "text": content_text}) + for idx in sorted(fc_buf): + fc = fc_buf[idx] + inps = _try_parse_tool_args(fc["args"]) + for i, inp in enumerate(inps): + bid = fc["id"] or '' + if len(inps) > 1: bid = f"{bid}_{i}" if bid else f"split_{i}" + blocks.append({"type": "tool_use", "id": bid, "name": fc["name"], "input": inp}) + return blocks + else: + tc_buf = {} # index -> {id, name, args} + reasoning_text = "" + for line in resp_lines: + if not line: continue + line = line.decode('utf-8', errors='replace') if isinstance(line, bytes) else line + if not line.startswith("data:"): continue + data_str = line[5:].lstrip() + if data_str == "[DONE]": break + try: evt = json.loads(data_str) + except: continue + ch = (evt.get("choices") or [{}])[0] + delta = ch.get("delta") or {} + if delta.get("reasoning_content"): + reasoning_text += delta["reasoning_content"] + if delta.get("content"): + text = delta["content"]; content_text += text; yield text + for tc in (delta.get("tool_calls") or []): + idx = tc.get("index", 0) + has_name = bool(tc.get("function", {}).get("name")) + if idx not in tc_buf: + if has_name or not tc_buf: tc_buf[idx] = {"id": tc.get("id") or '', "name": "", "args": ""} + else: idx = max(tc_buf) + if has_name: tc_buf[idx]["name"] = tc["function"]["name"] + if tc.get("function", {}).get("arguments"): tc_buf[idx]["args"] += tc["function"]["arguments"] + if tc.get("id") and not tc_buf[idx]["id"]: tc_buf[idx]["id"] = tc["id"] + usage = evt.get("usage") + if usage: _record_usage(usage, api_mode) + blocks = [] + if reasoning_text: blocks.append({"type": "thinking", "thinking": reasoning_text}) + if content_text: blocks.append({"type": "text", "text": content_text}) + for idx in sorted(tc_buf): + tc = tc_buf[idx] + inps = _try_parse_tool_args(tc["args"]) + for i, inp in enumerate(inps): + bid = tc["id"] or '' + if len(inps) > 1: bid = f"{bid}_{i}" if bid else f"split_{i}" + blocks.append({"type": "tool_use", "id": bid, "name": tc["name"], "input": inp}) + return blocks + +def _record_usage(usage, api_mode): + if not usage: return + if api_mode == 'responses': + cached = (usage.get("input_tokens_details") or {}).get("cached_tokens", 0) + inp = usage.get("input_tokens", 0) + print(f"[Cache] input={inp} cached={cached}") + elif api_mode == 'chat_completions': + cached = (usage.get("prompt_tokens_details") or {}).get("cached_tokens", 0) + inp = usage.get("prompt_tokens", 0) + print(f"[Cache] input={inp} cached={cached}") + elif api_mode == 'messages': + ci, cr, inp = usage.get("cache_creation_input_tokens", 0), usage.get("cache_read_input_tokens", 0), usage.get("input_tokens", 0) + print(f"[Cache] input={inp} creation={ci} read={cr}") + +def _parse_openai_json(data, api_mode="chat_completions"): + blocks = [] + if api_mode == "responses": + _record_usage(data.get("usage") or {}, api_mode) + for item in (data.get("output") or []): + if item.get("type") == "message": + for p in (item.get("content") or []): + if p.get("type") in ("output_text", "text") and p.get("text"): + blocks.append({"type": "text", "text": p["text"]}); yield p["text"] + elif item.get("type") == "function_call": + try: args = json.loads(item.get("arguments", "")) if item.get("arguments") else {} + except: args = {"_raw": item.get("arguments", "")} + blocks.append({"type": "tool_use", "id": item.get("call_id", item.get("id", "")), + "name": item.get("name", ""), "input": args}) + else: + _record_usage(data.get("usage") or {}, api_mode) + msg = (data.get("choices") or [{}])[0].get("message", {}) + reasoning = msg.get("reasoning_content", "") + if reasoning: + blocks.append({"type": "thinking", "thinking": reasoning}) + content = msg.get("content", "") + if content: + blocks.append({"type": "text", "text": content}); yield content + for tc in (msg.get("tool_calls") or []): + fn = tc.get("function", {}) + try: args = json.loads(fn.get("arguments", "")) if fn.get("arguments") else {} + except: args = {"_raw": fn.get("arguments", "")} + blocks.append({"type": "tool_use", "id": tc.get("id", ""), "name": fn.get("name", ""), "input": args}) + return blocks + +def _stamp_oai_cache_markers(messages, model): + """Add cache_control to last 2 user messages for Anthropic models via OAI-compatible relay.""" + ml = model.lower() + if not any(k in ml for k in ('claude', 'anthropic')): return + user_idxs = [i for i, m in enumerate(messages) if m.get('role') == 'user'] + for idx in user_idxs[-2:]: + c = messages[idx].get('content') + if isinstance(c, str): + messages[idx] = {**messages[idx], 'content': [{'type': 'text', 'text': c, 'cache_control': {'type': 'ephemeral'}}]} + elif isinstance(c, list) and c: + c = list(c); c[-1] = dict(c[-1], cache_control={'type': 'ephemeral'}) + messages[idx] = {**messages[idx], 'content': c} + +def _stream_with_retry(sess, url, headers, payload, parse_fn): + _RETRYABLE = {408, 409, 425, 429, 500, 502, 503, 504, 529} + def _delay(resp, attempt): + try: ra = float((resp.headers or {}).get("retry-after")) + except: ra = None + return max(0.5, ra if ra is not None else min(30.0, 1.5 * (2 ** attempt))) + for attempt in range(sess.max_retries + 1): + streamed = False + try: + with requests.post(url, headers=headers, json=payload, stream=sess.stream, + timeout=(sess.connect_timeout, sess.read_timeout), proxies=sess.proxies, verify=sess.verify) as r: + if r.status_code >= 400: + if r.status_code in _RETRYABLE and attempt < sess.max_retries: + d = _delay(r, attempt) + print(f"[LLM Retry] HTTP {r.status_code}, retry in {d:.1f}s ({attempt+1}/{sess.max_retries+1})") + time.sleep(d); continue + try: body = r.text.strip()[:500] + except: body = "" + err = f"!!!Error: HTTP {r.status_code}" + (f": {body}" if body else "") + yield err; return [{"type": "text", "text": err}] + gen = parse_fn(r) + try: + while True: streamed = True; yield next(gen) + except StopIteration as e: return e.value or [] + except (requests.Timeout, requests.ConnectionError) as e: + err = f"!!!Error: {type(e).__name__}" + if attempt < sess.max_retries: + d = _delay(None, attempt) + print(f"[LLM Retry] {type(e).__name__}, retry in {d:.1f}s ({attempt+1}/{sess.max_retries+1})") + yield err; time.sleep(d); continue + yield err; return [{"type": "text", "text": err}] + except Exception as e: + err = f"\n\n[!!! 流异常中断 {type(e).__name__}: {e} !!!]" if streamed else f"!!!Error: {type(e).__name__}: {e}" + yield err; return [{"type": "text", "text": err}] + +def _openai_stream(sess, messages): + model, api_mode = sess.model, sess.api_mode + ml = model.lower() + temperature = sess.temperature + if 'kimi' in ml or 'moonshot' in ml: temperature = 1 + elif 'minimax' in ml: temperature = max(0.01, min(temperature, 1.0)) # MiniMax requires temp in (0, 1] + headers = {"Authorization": f"Bearer {sess.api_key}", "Content-Type": "application/json", "Accept": "text/event-stream"} + if api_mode == "responses": + url = auto_make_url(sess.api_base, "responses") + payload = {"model": model, "input": _to_responses_input(messages), "stream": sess.stream, + "prompt_cache_key": _RESP_CACHE_KEY, "instructions": sess.system or "You are an Omnipotent Executor."} + if sess.reasoning_effort: payload["reasoning"] = {"effort": sess.reasoning_effort} + if sess.max_tokens: payload["max_output_tokens"] = sess.max_tokens + else: + url = auto_make_url(sess.api_base, "chat/completions") + if sess.system: messages = [{"role": "system", "content": sess.system}] + messages + _stamp_oai_cache_markers(messages, model) + payload = {"model": model, "messages": messages, "stream": sess.stream} + if sess.stream: payload["stream_options"] = {"include_usage": True} + if temperature != 1: payload["temperature"] = temperature + if sess.max_tokens: payload["max_completion_tokens" if ml.startswith(("gpt-5", "o1", "o2", "o3", "o4")) else "max_tokens"] = sess.max_tokens + if sess.reasoning_effort: payload["reasoning_effort"] = sess.reasoning_effort + tools = getattr(sess, 'tools', None) + if tools: payload["tools"] = _prepare_oai_tools(tools, api_mode) + if sess.service_tier: payload["service_tier"] = sess.service_tier + parse_fn = (lambda r: _parse_openai_sse(r.iter_lines(), api_mode)) if sess.stream else (lambda r: _parse_openai_json(r.json(), api_mode)) + return (yield from _stream_with_retry(sess, url, headers, payload, parse_fn)) + +def _prepare_oai_tools(tools, api_mode="chat_completions"): + if api_mode == "responses": + resp_tools = [] + for t in tools: + if t.get("type") == "function" and "function" in t: + rt = {"type": "function"}; rt.update(t["function"]) + resp_tools.append(rt) + else: resp_tools.append(t) + return resp_tools + return tools + +def _to_responses_input(messages): + result, pending = [], [] + for msg in messages: + role = str(msg.get("role", "user")).lower() + if role == "tool": + cid = msg.get("tool_call_id") or (pending.pop(0) if pending else f"call_{uuid.uuid4().hex[:8]}") + result.append({"type": "function_call_output", "call_id": cid, "output": msg.get("content", "")}) + continue + if role not in ["user", "assistant", "system", "developer"]: role = "user" + if role == "system": role = "developer" # Responses API uses 'developer' instead of 'system' + content = msg.get("content", "") + text_type = "output_text" if role == "assistant" else "input_text" + parts = [] + if isinstance(content, str): + if content: parts.append({"type": text_type, "text": content}) + elif isinstance(content, list): + for part in content: + if not isinstance(part, dict): continue + ptype = part.get("type") + if ptype == "text": + text = part.get("text", "") + if text: parts.append({"type": text_type, "text": text}) + elif ptype == "image_url": + url = (part.get("image_url") or {}).get("url", "") + if url and role != "assistant": parts.append({"type": "input_image", "image_url": url}) + if len(parts) == 0: parts = [{"type": text_type, "text": str(content) if not isinstance(content, list) else '[empty]'}] + result.append({"role": role, "content": parts}) + pending = [] + for tc in (msg.get("tool_calls") or []): + f = tc.get("function", {}) + cid = tc.get("id") or f"call_{uuid.uuid4().hex[:8]}" + pending.append(cid) + result.append({"type": "function_call", "call_id": cid, "name": f.get("name", ""), "arguments": f.get("arguments", "")}) + return result + + +def _msgs_claude2oai(messages): + result = [] + for msg in messages: + role = msg.get("role", "user") + content = msg.get("content", "") + blocks = content if isinstance(content, list) else [{"type": "text", "text": str(content)}] + if role == "assistant": + text_parts, tool_calls, reasoning = [], [], "" + for b in blocks: + if not isinstance(b, dict): continue + if b.get("type") == "thinking" and b.get("thinking"): reasoning = b["thinking"] + elif b.get("type") == "text" and b.get("text"): text_parts.append({"type": "text", "text": b.get("text", "")}) + elif b.get("type") == "tool_use": + tool_calls.append({ + "id": b.get("id") or '', "type": "function", + "function": {"name": b.get("name", ""), "arguments": json.dumps(b.get("input", {}), ensure_ascii=False)} + }) + m = {"role": "assistant"} + if reasoning: m["reasoning_content"] = reasoning + if text_parts: m["content"] = text_parts + else: m["content"] = "" + if tool_calls: m["tool_calls"] = tool_calls + if not text_parts and not tool_calls and reasoning: m["content"] = "." + result.append(m) + elif role == "user": + text_parts = [] + for b in blocks: + if not isinstance(b, dict): continue + if b.get("type") == "tool_result": + if text_parts: + result.append({"role": "user", "content": text_parts}) + text_parts = [] + tr = b.get("content", "") + if isinstance(tr, list): + tr = "\n".join(x.get("text", "") for x in tr if isinstance(x, dict) and x.get("type") == "text") + result.append({"role": "tool", "tool_call_id": b.get("tool_use_id") or '', "content": tr if isinstance(tr, str) else str(tr)}) + elif b.get("type") == "image": + src = b.get("source") or {} + if src.get("type") == "base64" and src.get("data"): + text_parts.append({"type": "image_url", "image_url": {"url": f"data:{src.get('media_type', 'image/png')};base64,{src.get('data', '')}"}}) + elif b.get("type") == "image_url": text_parts.append(b) + elif b.get("type") == "text" and b.get("text"): text_parts.append({"type": "text", "text": b.get("text", "")}) + if text_parts: result.append({"role": "user", "content": text_parts}) + else: result.append(msg) + return result + + +class BaseSession: + def __init__(self, cfg): + self.api_key = cfg['apikey'] + self.api_base = cfg['apibase'].rstrip('/') + self.model = cfg.get('model', '') + self.context_win = cfg.get('context_win', 28000) + self.history = [] + self.lock = threading.Lock() + self.system = "" + self.name = cfg.get('name', self.model) + proxy = cfg.get('proxy') + self.proxies = {"http": proxy, "https": proxy} if proxy else None + self.max_retries = max(0, int(cfg.get('max_retries', 4))) + self.verify = cfg.get('verify', True) + self.stream = cfg.get('stream', True) + default_ct, default_rt = (5, 30) if self.stream else (10, 240) + self.connect_timeout = max(1, int(cfg.get('timeout', default_ct))) + self.read_timeout = max(5, int(cfg.get('read_timeout', default_rt))) + def _enum(key, valid): + v = cfg.get(key); v = None if v is None else str(v).strip().lower() + return v if not v or v in valid else print(f"[WARN] Invalid {key} {v!r}, ignored.") + self.reasoning_effort = _enum('reasoning_effort', {'none', 'minimal', 'low', 'medium', 'high', 'xhigh'}) + self.service_tier = _enum('service_tier', {'auto', 'default', 'priority', 'flex'}) + self.thinking_type = _enum('thinking_type', {'adaptive', 'enabled', 'disabled'}) + self.thinking_budget_tokens = cfg.get('thinking_budget_tokens') + mode = str(cfg.get('api_mode', 'chat_completions')).strip().lower().replace('-', '_') + self.api_mode = 'responses' if mode in ('responses', 'response') else 'chat_completions' + self.temperature = cfg.get('temperature', 1) + self.max_tokens = cfg.get('max_tokens') + def _apply_claude_thinking(self, payload): + if self.thinking_type: + thinking = {"type": self.thinking_type} + if self.thinking_type == 'enabled': + if self.thinking_budget_tokens is None: print("[WARN] thinking_type='enabled' requires thinking_budget_tokens, ignored.") + else: + thinking["budget_tokens"] = self.thinking_budget_tokens; payload["thinking"] = thinking + else: payload["thinking"] = thinking + if self.reasoning_effort: + effort = {'low': 'low', 'medium': 'medium', 'high': 'high', 'xhigh': 'max'}.get(self.reasoning_effort) + if effort: payload["output_config"] = {"effort": effort} + else: print(f"[WARN] reasoning_effort {self.reasoning_effort!r} is unsupported for Claude output_config.effort, ignored.") + def ask(self, prompt): + def _ask_gen(): + with self.lock: + self.history.append({"role": "user", "content": [{"type": "text", "text": prompt}]}) + trim_messages_history(self.history, self.context_win) + messages = self.make_messages(self.history) + content_blocks = None; content = '' + gen = self.raw_ask(messages) + try: + while True: chunk = next(gen); content += chunk; yield chunk + except StopIteration as e: content_blocks = e.value or [] + if len(content_blocks) > 1: print(f"[DEBUG BaseSession.ask] content_blocks: {content_blocks}") + for block in (content_blocks or []): + if block.get('type', '') == 'tool_use': + tu = {'name': block.get('name', ''), 'arguments': block.get('input', {})} + yield f'{json.dumps(tu, ensure_ascii=False)}' + if not content.startswith("!!!Error:"): self.history.append({"role": "assistant", "content": [{"type": "text", "text": content}]}) + return _ask_gen() if self.stream else ''.join(list(_ask_gen())) + +def _keep_claude_block(b): return not isinstance(b, dict) or b.get("type") != "thinking" or b.get("signature") +def _drop_unsigned_thinking(messages): + for m in messages: + c = m.get("content") + if isinstance(c, list): m["content"] = [b for b in c if _keep_claude_block(b)] + return messages + +def _ensure_thinking_blocks(messages, model): + """deepseek needs thinking in history!""" + if 'deepseek' not in model.lower(): return messages + for m in messages: + if m.get("role") != "assistant": continue + c = m.get("content") + if not isinstance(c, list): continue + has_thinking = any(isinstance(b, dict) and b.get("type") == "thinking" for b in c) + if not has_thinking: m["content"] = [{"type": "thinking", "thinking": "...", "signature": "placeholder"}, *c] + return messages + +class ClaudeSession(BaseSession): + def raw_ask(self, messages): + if self.max_tokens is None: self.max_tokens = 8192 + headers = {"x-api-key": self.api_key, "Content-Type": "application/json", "anthropic-version": "2023-06-01", "anthropic-beta": "prompt-caching-2024-07-31"} + payload = {"model": self.model, "messages": messages, "max_tokens": self.max_tokens, "stream": self.stream} + if self.temperature != 1: payload["temperature"] = self.temperature + self._apply_claude_thinking(payload) + if self.system: payload["system"] = [{"type": "text", "text": self.system, "cache_control": {"type": "persistent"}}] + url = auto_make_url(self.api_base, "messages") + parse_fn = (lambda r: _parse_claude_sse(r.iter_lines())) if self.stream else (lambda r: _parse_claude_json(r.json())) + return (yield from _stream_with_retry(self, url, headers, payload, parse_fn)) + def make_messages(self, raw_list): + msgs = _drop_unsigned_thinking([{"role": m['role'], "content": list(m['content'])} for m in raw_list]) + user_idxs = [i for i, m in enumerate(msgs) if m['role'] == 'user'] + for idx in user_idxs[-2:]: + msgs[idx]["content"][-1] = dict(msgs[idx]["content"][-1], cache_control={"type": "ephemeral"}) + return msgs + +class LLMSession(BaseSession): + def raw_ask(self, messages): return (yield from _openai_stream(self, messages)) + def make_messages(self, raw_list): return _msgs_claude2oai(raw_list) + +def _fix_messages(messages): + """修复 messages 符合 Claude API:交替、tool_use/tool_result 配对""" + if not messages: return messages + _wrap = lambda c: c if isinstance(c, list) else [{"type": "text", "text": str(c)}] + fixed = [] + for m in messages: + if fixed and m['role'] == fixed[-1]['role']: + fixed[-1] = {**fixed[-1], 'content': _wrap(fixed[-1]['content']) + [{"type": "text", "text": "\n"}] + _wrap(m['content'])}; continue + if fixed and fixed[-1]['role'] == 'assistant' and m['role'] == 'user': + uses = [b.get('id') for b in fixed[-1].get('content', []) if isinstance(b, dict) and b.get('type') == 'tool_use' and b.get('id')] + has = {b.get('tool_use_id') for b in _wrap(m['content']) if isinstance(b, dict) and b.get('type') == 'tool_result'} + miss = [uid for uid in uses if uid not in has] + if miss: m = {**m, 'content': [{"type": "tool_result", "tool_use_id": uid, "content": "(error)"} for uid in miss] + _wrap(m['content'])} + orphan = has - set(uses) + if orphan: m = {**m, 'content': [{"type":"text","text":str(b.get('content',''))} if isinstance(b,dict) and b.get('type')=='tool_result' and b.get('tool_use_id') in orphan else b for b in _wrap(m['content'])]} + fixed.append(m) + while fixed and fixed[0]['role'] != 'user': fixed.pop(0) + return fixed + +class NativeClaudeSession(BaseSession): + def __init__(self, cfg): + super().__init__(cfg) + self.fake_cc_system_prompt = cfg.get("fake_cc_system_prompt", False) + self.user_agent = cfg.get("user_agent", "claude-cli/2.1.113 (external, cli)") + self._session_id = str(uuid.uuid4()) + self._account_uuid = str(uuid.uuid4()) + self._device_id = uuid.uuid4().hex + uuid.uuid4().hex[:32] + self.tools = None + def raw_ask(self, messages): + messages = _ensure_thinking_blocks(_drop_unsigned_thinking(_fix_messages(messages)), self.model) + if self.max_tokens is None: self.max_tokens = 8192 + model = self.model + beta_parts = ["claude-code-20250219", "interleaved-thinking-2025-05-14", "redact-thinking-2026-02-12", "prompt-caching-scope-2026-01-05"] + if "[1m]" in model.lower(): + beta_parts.insert(1, "context-1m-2025-08-07"); model = model.replace("[1m]", "").replace("[1M]", "") + headers = {"Content-Type": "application/json", "anthropic-version": "2023-06-01", + "anthropic-beta": ",".join(beta_parts), "anthropic-dangerous-direct-browser-access": "true", + "user-agent": self.user_agent, "x-app": "cli"} + if self.api_key.startswith("sk-ant-"): headers["x-api-key"] = self.api_key + else: headers["authorization"] = f"Bearer {self.api_key}" + payload = {"model": model, "messages": messages, "max_tokens": self.max_tokens, "stream": self.stream} + if self.temperature != 1: payload["temperature"] = self.temperature + self._apply_claude_thinking(payload) + payload["metadata"] = {"user_id": json.dumps({"device_id": self._device_id, "account_uuid": self._account_uuid, "session_id": self._session_id}, separators=(',', ':'))} + if self.tools: + claude_tools = openai_tools_to_claude(self.tools) + tools = [dict(t) for t in claude_tools]; tools[-1]["cache_control"] = {"type": "ephemeral"} + payload["tools"] = tools + else: print("[ERROR] No tools provided for this session.") + payload['system'] = [{"type": "text", "text": "You are Claude Code, Anthropic's official CLI for Claude.", "cache_control": {"type": "ephemeral"}}] + if self.system: + if self.fake_cc_system_prompt: messages[0]["content"].insert(0, {"type": "text", "text": self.system}) + else: payload["system"] = [{"type": "text", "text": self.system}] + user_idxs = [i for i, m in enumerate(messages) if m['role'] == 'user'] + for idx in user_idxs[-2:]: + messages[idx] = {**messages[idx], "content": list(messages[idx]["content"])} + messages[idx]["content"][-1] = dict(messages[idx]["content"][-1], cache_control={"type": "ephemeral"}) + url = auto_make_url(self.api_base, "messages") + '?beta=true' + parse_fn = (lambda r: _parse_claude_sse(r.iter_lines())) if self.stream else (lambda r: _parse_claude_json(r.json())) + return (yield from _stream_with_retry(self, url, headers, payload, parse_fn)) + + def ask(self, msg): + assert type(msg) is dict + with self.lock: + self.history.append(msg) + trim_messages_history(self.history, self.context_win) + messages = [{"role": m["role"], "content": list(m["content"])} for m in self.history] + content_blocks = None + gen = self.raw_ask(messages) + try: + while True: yield next(gen) + except StopIteration as e: content_blocks = e.value or [] + if content_blocks and (_injected := _ensure_text_block(content_blocks)): yield _injected + if content_blocks and not (len(content_blocks) == 1 and content_blocks[0].get("text", "").startswith("!!!Error:")): + self.history.append({"role": "assistant", "content": content_blocks}) + text_parts = [b["text"] for b in content_blocks if b.get("type") == "text"] + content = "\n".join(text_parts).strip() + tool_calls = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in content_blocks if b.get("type") == "tool_use"] + if not tool_calls: tool_calls, content = _parse_text_tool_calls(content) + thinking_parts = [b["thinking"] for b in content_blocks if b.get("type") == "thinking"] + thinking = "\n".join(thinking_parts).strip() + if not thinking: + think_pattern = r"(.*?)" + think_match = re.search(think_pattern, content, re.DOTALL) + if think_match: + thinking = think_match.group(1).strip() + content = re.sub(think_pattern, "", content, flags=re.DOTALL) + return MockResponse(thinking, content, tool_calls, str(content_blocks)) + +class NativeOAISession(NativeClaudeSession): + def raw_ask(self, messages): + messages = _fix_messages(messages) + messages = _ensure_thinking_blocks(messages, self.model) + return (yield from _openai_stream(self, _msgs_claude2oai(messages))) + +def openai_tools_to_claude(tools): + """[{type:'function', function:{name,description,parameters}}] → [{name,description,input_schema}].""" + result = [] + for t in tools: + if 'input_schema' in t: result.append(t); continue # 已是claude格式 + fn = t.get('function', t) + result.append({'name': fn['name'], 'description': fn.get('description', ''), + 'input_schema': fn.get('parameters', {'type': 'object', 'properties': {}})}) + return result + +class MockFunction: + def __init__(self, name, arguments): self.name, self.arguments = name, arguments + +class MockToolCall: + def __init__(self, name, args, id=''): + arg_str = json.dumps(args, ensure_ascii=False) if isinstance(args, (dict, list)) else (args or '{}') + self.function = MockFunction(name, arg_str); self.id = id + +class MockResponse: + def __init__(self, thinking, content, tool_calls, raw, stop_reason='end_turn'): + self.thinking = thinking; self.content = content + self.tool_calls = tool_calls; self.raw = raw + self.stop_reason = 'tool_use' if tool_calls else stop_reason + def __repr__(self): + return f"" + +class ToolClient: + def __init__(self, backend, auto_save_tokens=True): + self.backend = backend + self.auto_save_tokens = auto_save_tokens + self.last_tools = '' + self.name = self.backend.name + self.total_cd_tokens = 0 + self.log_path = None + + def chat(self, messages, tools=None): + tools = json.loads(json.dumps(tools, ensure_ascii=False)) if tools else tools + for t in tools or []: + f = t.get('function', {}) + if f.get('name') == 'file_write': + props = f.get('parameters', {}).get('properties', {}) + props.pop('content', None) + extra = '. Content must be placed in tags in reply body, not in args' + if extra not in f.get('description', ''): f['description'] = f.get('description', '') + extra + break + full_prompt = self._build_protocol_prompt(messages, tools) + print("Full prompt length:", len(full_prompt), 'chars') + gen = self.backend.ask(full_prompt) + _write_llm_log('Prompt', full_prompt, self.log_path) + raw_text = '' + for chunk in gen: + raw_text += chunk; yield chunk + _write_llm_log('Response', raw_text, self.log_path) + return self._parse_mixed_response(raw_text) + + def _prepare_tool_instruction(self, tools): + tool_instruction = "" + if not tools: return tool_instruction + tools_json = json.dumps(tools, ensure_ascii=False, separators=(',', ':')) + _en = os.environ.get('GA_LANG') == 'en' + if _en: + tool_instruction = f""" +### Interaction Protocol (must follow strictly, always in effect) +Follow these steps to think and act: +1. **Think**: Analyze the current situation and strategy inside `` tags. +2. **Summarize**: Output a minimal one-line (<30 words) physical snapshot in ``: new info from last tool result + current tool call intent. This goes into long-term working memory. Must contain real information, no filler. +3. **Act**: If you need to call tools, output one or more ** blocks** after your reply, then stop. +""" + else: + tool_instruction = f""" +### 交互协议 (必须严格遵守,持续有效) +请按照以下步骤思考并行动: +1. **思考**: 在 `` 标签中先进行思考,分析现状和策略。 +2. **总结**: 在 `` 中输出*极为简短*的高度概括的单行(<30字)物理快照,包括上次工具调用结果产生的新信息+本次工具调用意图。此内容将进入长期工作记忆,记录关键信息,严禁输出无实际信息增量的描述。 +3. **行动**: 如需调用工具,请在回复正文之后输出一个(或多个)**块**,然后结束。 +""" + tool_instruction += f'\nFormat: ```{{"name": "tool_name", "arguments": {{...}}}}```\n\n### Tools (mounted, always in effect):\n{tools_json}\n' + if self.auto_save_tokens and self.last_tools == tools_json: + tool_instruction = "\n### Tools: still active, **ready to call**. Protocol unchanged.\n" if _en else "\n### 工具库状态:持续有效(code_run/file_read等),**可正常调用**。调用协议沿用。\n" + else: self.total_cd_tokens = 0 + self.last_tools = tools_json + return tool_instruction + + def _build_protocol_prompt(self, messages, tools): + system_content = next((m['content'] for m in messages if m['role'].lower() == 'system'), "") + history_msgs = [m for m in messages if m['role'].lower() != 'system'] + tool_instruction = self._prepare_tool_instruction(tools) + system = ""; user = "" + if system_content: system += f"{system_content}\n" + system += f"{tool_instruction}" + for m in history_msgs: + role = "USER" if m['role'] == 'user' else "ASSISTANT" + user += f"=== {role} ===\n" + for tr in m.get('tool_results', []): user += f'{tr["content"]}\n' + user += str(m['content']) + "\n" + self.total_cd_tokens += len(user) // 3 + if self.total_cd_tokens > 9000: self.last_tools = '' + user += "=== ASSISTANT ===\n" + return system + user + + def _parse_mixed_response(self, text): + remaining_text = text; thinking = '' + think_match = re.search(r"(.*?)", text, re.DOTALL) + if think_match: + thinking = think_match.group(1).strip() + remaining_text = re.sub(r"(.*?)", "", remaining_text, flags=re.DOTALL) + tool_calls, remaining_text = _parse_text_tool_calls(remaining_text) + if not tool_calls: + json_strs = []; errors = [] + if '' in remaining_text: + weaktoolstr = remaining_text.split('')[-1].strip().strip('><') + json_str = weaktoolstr if weaktoolstr.endswith('}') else '' + if json_str == '' and '```' in weaktoolstr and weaktoolstr.split('```')[0].strip().endswith('}'): + json_str = weaktoolstr.split('```')[0].strip() + if json_str: json_strs.append(json_str) + remaining_text = remaining_text.replace(''+weaktoolstr, "") + elif '"name":' in remaining_text and '"arguments":' in remaining_text: + json_match = re.search(r'\{.*"name":.*\}', remaining_text, re.DOTALL) + if json_match: + json_strs.append(json_match.group(0).strip()) + remaining_text = remaining_text.replace(json_match.group(0), "").strip() + for json_str in json_strs: + try: + data = tryparse(json_str) + func_name = data.get('name') or data.get('function') or data.get('tool') + args = data.get('arguments') or data.get('args') or data.get('params') or data.get('parameters') + if args is None: args = data + if func_name: tool_calls.append(MockToolCall(func_name, args)) + except json.JSONDecodeError: + errors.append(f'Failed to parse tool_use JSON: {json_str[:200]}') + self.last_tools = '' + except: pass + if not tool_calls: + for e in errors: + print(f"[Warn] {e}"); tool_calls.append(MockToolCall('bad_json', {'msg': e})) + return MockResponse(thinking, remaining_text.strip(), tool_calls, text) + +def _parse_text_tool_calls(content): + """Fallback: extract tool calls from text when model doesn't use native tool_use blocks.""" + tcs = [] + # try JSON array: [{"type":"tool_use", "name":..., "input":...}] + _jp = next((p for p in ['[{"type":"tool_use"', '[{"type": "tool_use"'] if p in content), None) + if _jp and content.endswith('}]'): + try: + idx = content.index(_jp); raw = json.loads(content[idx:]) + tcs = [MockToolCall(b["name"], b.get("input", {}), id=b.get("id", "")) for b in raw if b.get("type") == "tool_use"] + return tcs, content[:idx].strip() + except: pass + # try XML tags: {"name":..., "arguments":...} + _xp = r"<(?:tool_use|tool_call)>((?:(?!<(?:tool_use|tool_call)>).){15,}?)" + for s in re.findall(_xp, content, re.DOTALL): + try: + d = tryparse(s.strip()); name = d.get('name') + args = d.get('arguments') or d.get('args') or d.get('input') or {} + if name: tcs.append(MockToolCall(name, args)) + except: pass + if tcs: content = re.sub(_xp, "", content, flags=re.DOTALL).strip() + return tcs, content + +def _ensure_text_block(blocks): + """If response has thinking but no text block, inject a synthetic summary from thinking's first line.""" + if any(b.get("type") == "text" for b in blocks): return None + th = next((b.get("thinking", "") for b in blocks if b.get("type") == "thinking"), "") + if not th: return None + line = th.strip().split('\n', 1)[0] + txt = "" + (line[:60] + '...' if len(line) > 60 else line) + "" + blocks.insert(1, {"type": "text", "text": txt}) + return txt + +def _write_llm_log(label, content, log_path=None): + if not log_path: + log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'temp/model_responses/model_responses_{os.getpid()}.txt') + os.makedirs(os.path.dirname(os.path.abspath(log_path)), exist_ok=True) + ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + with open(log_path, 'a', encoding='utf-8', errors='replace') as f: + f.write(f"=== {label} === {ts}\n{content}\n\n") + +def tryparse(json_str): + try: return json.loads(json_str) + except: pass + json_str = json_str.strip().strip('`').replace('json\n', '', 1).strip() + try: return json.loads(json_str) + except: pass + try: return json.loads(json_str[:-1]) + except: pass + if '}' in json_str: json_str = json_str[:json_str.rfind('}') + 1] + return json.loads(json_str) + +class MixinSession: + """Multi-session fallback with spring-back to primary.""" + def __init__(self, all_sessions, cfg): + self._retries, self._base_delay = cfg.get('max_retries', 3), cfg.get('base_delay', 1.5) + self._spring_sec = cfg.get('spring_back', 300) + self._sessions = [all_sessions[i].backend if isinstance(i, int) else + next(s.backend for s in all_sessions if type(s) is not dict and s.backend.name == i) for i in cfg.get('llm_nos', [])] + is_native = lambda s: 'Native' in s.__class__.__name__ + groups = {is_native(s) for s in self._sessions} + assert len(groups) == 1, f"MixinSession: sessions must be in same group (Native or non-Native), got {[type(s).__name__ for s in self._sessions]}" + self.name = '|'.join(s.name for s in self._sessions) + import copy; self._sessions = [copy.copy(s) for s in self._sessions] + for s in self._sessions: s.max_retries = 0 + self._orig_raw_asks = [s.raw_ask for s in self._sessions] + self._sessions[0].raw_ask = self._raw_ask + self.model = getattr(self._sessions[0], 'model', None) + self._cur_idx, self._switched_at = 0, 0.0 + def __getattr__(self, name): return getattr(self._sessions[0], name) + _BROADCAST_ATTRS = frozenset({'system', 'tools', 'temperature', 'max_tokens', 'reasoning_effort', 'history'}) + def __setattr__(self, name, value): + if name in self._BROADCAST_ATTRS: + for s in self._sessions: + v = openai_tools_to_claude(value) if name == 'tools' and type(s) is NativeClaudeSession else value + setattr(s, name, v) + else: object.__setattr__(self, name, value) + @property + def primary(self): return self._sessions[0] + def _pick(self): + if self._cur_idx and time.time() - self._switched_at > self._spring_sec: self._cur_idx = 0 + return self._cur_idx + def _raw_ask(self, *args, **kwargs): + base, n = self._pick(), len(self._sessions) + test_error = lambda x: isinstance(x, str) and x.lstrip().startswith(('!!!Error:', '[Error:')) + for attempt in range(self._retries + 1): + idx = (base + attempt) % n + gen = self._orig_raw_asks[idx](*args, **kwargs) + print(f'[MixinSession] Using session ({self._sessions[idx].name})') + last_chunk, return_val, yielded = None, [], False + try: + while True: + chunk = next(gen); last_chunk = chunk + if not yielded and test_error(chunk): continue + yield chunk; yielded = True + except StopIteration as e: return_val = e.value or [] + is_err = test_error(last_chunk) + if not is_err: + if attempt > 0: self._cur_idx = idx; self._switched_at = time.time() + elif isinstance(last_chunk, str) and '[!!! 流异常中断' in last_chunk and n > 1: + self._cur_idx = (idx + 1) % n; self._switched_at = time.time() + print(f'[MixinSession] Partial failure, next call → s{self._cur_idx} ({self._sessions[self._cur_idx].name})') + return return_val + if attempt >= self._retries: + yield last_chunk; return return_val + nxt = (base + attempt + 1) % n + if nxt == base: # full round failed, delay before next + rnd = (attempt + 1) // n + delay = min(30, self._base_delay * (1.5 ** rnd)) + print(f'[MixinSession] {last_chunk[:80]}, round {rnd} exhausted, retry in {delay:.1f}s') + time.sleep(delay) + else: print(f'[MixinSession] {last_chunk[:80]}, retry {attempt+1}/{self._retries} (s{idx}→s{nxt})') + +THINKING_PROMPT_ZH = """ +### 行动规范(持续有效) +每次回复(含工具调用轮)都先在回复文字中包含一个 中输出极简单行(<30字)物理快照:上次结果新信息+本次意图。此内容进入长期工作记忆。 +\n**若用户需求未完成,必须进行工具调用!** +""".strip() +THINKING_PROMPT_EN = """ +### Action Protocol (always in effect) +The reply body should first include a minimal one-line (<30 words) physical snapshot in : new info from last result + current intent. This goes into long-term working memory. +\n**If the user's request is not yet complete, tool calls are required!** +""".strip() + +class NativeToolClient: + @staticmethod + def _thinking_prompt(): return THINKING_PROMPT_EN if os.environ.get('GA_LANG') == 'en' else THINKING_PROMPT_ZH + def __init__(self, backend): + self.backend = backend + self.backend.system = self._thinking_prompt() + self.name = self.backend.name + self._pending_tool_ids = [] + self.log_path = None + def set_system(self, extra_system): + combined = f"{extra_system}\n\n{self._thinking_prompt()}" if extra_system else self._thinking_prompt() + if combined != self.backend.system: print(f"[Debug] Updated system prompt, length {len(combined)} chars.") + self.backend.system = combined + def chat(self, messages, tools=None): + if tools: self.backend.tools = tools + if not self.backend.history: self._pending_tool_ids = [] + combined_content = []; resp = None; tool_results = [] + for msg in messages: + c = msg.get('content', '') + if msg['role'] == 'system': + self.set_system(c); continue + if isinstance(c, str): combined_content.append({"type": "text", "text": c}) + elif isinstance(c, list): combined_content.extend(c) + if msg['role'] == 'user' and msg.get('tool_results'): tool_results.extend(msg['tool_results']) + tr_id_set = set(); tool_result_blocks = [] + for tr in tool_results: + tool_use_id, content = tr.get("tool_use_id", ""), tr.get("content", "") + tr_id_set.add(tool_use_id) + if tool_use_id: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tool_use_id, "content": tr.get("content", "")}) + else: combined_content = [{"type": "text", "text": f'{content}'}] + combined_content + for tid in self._pending_tool_ids: + if tid not in tr_id_set: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tid, "content": ""}) + self._pending_tool_ids = [] + # Filter whitespace-only text blocks that cause 400 on strict API proxies + merged = {"role": "user", "content": tool_result_blocks + [c for c in combined_content if c.get("text", "").strip()]} + _write_llm_log('Prompt', json.dumps(merged, ensure_ascii=False, indent=2), self.log_path) + gen = self.backend.ask(merged) + try: + while True: + chunk = next(gen); yield chunk + except StopIteration as e: resp = e.value + if resp: _write_llm_log('Response', resp.raw, self.log_path) + if resp and hasattr(resp, 'tool_calls') and resp.tool_calls: self._pending_tool_ids = [tc.id for tc in resp.tool_calls] + return resp + +def resolve_session(cfg_name): + cfg = reload_mykeys()[0].get(cfg_name) + if not cfg: raise ValueError(f"Config '{cfg_name}' not in mykey") + if 'native' in cfg_name: return (NativeClaudeSession if 'claude' in cfg_name else NativeOAISession)(cfg=cfg) + if 'claude' in cfg_name: return ClaudeSession(cfg=cfg) + return LLMSession(cfg=cfg) if 'oai' in cfg_name else None + +def resolve_client(cfg_name): + s = resolve_session(cfg_name) + return (NativeToolClient(s) if isinstance(s, (NativeClaudeSession, NativeOAISession)) else ToolClient(s)) if s else None + +def fast_ask(prompt, cfg_name): + sess = resolve_session(cfg_name) + if not sess: raise ValueError(f"fast_ask: '{cfg_name}' unsupported") + return "".join(sess.raw_ask([{"role": "user", "content": prompt}])) + class SmartRouter: """根据用户查询自动分类任务复杂度,推荐 simple(flash) 或 complex(pro) 模型组。""" COMPLEX_KEYWORDS = [ '代码', '框架', '实现', '架构', '设计模式', '重构', '调试', '算法', - '数据库', 'SQL', 'API', '接口', '部署', '性能', '优化', '爬虫', - 'web', 'flask', 'django', 'git', 'docker', 'k8s', 'kubernetes', - '分析', '设计', '方案', '架构设计', '系统设计', '单元测试', 'CI/CD', - 'pytest', 'unittest', '异步', '多线程', '并发', '缓存', 'redis', - 'nginx', 'linux', 'bash', 'shell', '正则', 'ORM', 'REST', 'graphql', + '数据库', 'SQL', 'API', '设计', '优化', '性能', '安全', '部署', + '测试', '单元测试', '集成', 'CI', 'CD', '并发', '线程', '异步', + '网络', '协议', '加密', '认证', '授权', '缓存', '索引', '事务', + 'middleware', 'pipeline', 'workflow', 'orchestration', '微服务', + 'docker', 'kubernetes', 'k8s', '容器', '编排', + 'framework', 'architecture', 'refactor', '重构', + 'code', 'programming', 'function', 'class', 'module', + 'implement', 'implementation', 'design pattern', + ] + SIMPLE_KEYWORDS = [ + '你好', 'hi', 'hello', '天气', '今天', '早上', '晚上', '下午', + '谢谢', '感谢', '再见', '拜拜', '嗯', '好', 'ok', + '名字', '你是谁', '你能做什么', ] def __init__(self): - self._enabled = False - self._last_classification = None + self.enabled = False def enable(self): - self._enabled = True + self.enabled = True def disable(self): - self._enabled = False - - @property - def enabled(self): - return self._enabled + self.enabled = False def classify(self, query: str) -> str: """返回 'simple' 或 'complex'""" q = query.strip() if not q: return 'simple' - # 短查询(<=8字符)且无关键词 → simple - if len(q) <= 8: - self._last_classification = 'simple' - return 'simple' - # 关键词命中 → complex + + # 长查询倾向于复杂 + if len(q) > 100: + return 'complex' + + # 含代码块标记 + if '```' in q or 'def ' in q or 'class ' in q or 'import ' in q: + return 'complex' + + # 复杂关键词匹配 q_lower = q.lower() - for kw in self.COMPLEX_KEYWORDS: - if kw.lower() in q_lower: - self._last_classification = 'complex' - return 'complex' - # 含代码特征 → complex - if any(c in q for c in ['{', '}', ';', '==', '!=', '=>', '->', 'def ', 'class ', 'import ']): - self._last_classification = 'complex' + complex_score = sum(1 for kw in self.COMPLEX_KEYWORDS if kw.lower() in q_lower) + simple_score = sum(1 for kw in self.SIMPLE_KEYWORDS if kw.lower() in q_lower) + + if complex_score > simple_score: return 'complex' - self._last_classification = 'simple' return 'simple' - - def get_last(self): - return self._last_classification - - def get_route(self, query: str) -> str: - """返回 'simple' 或 'complex' based on query and enabled state""" - if not self._enabled: - return None # 未启用,由外部决定 - return self.classify(query) From a039c2425fa749a7f8bb2bb567767ad085ed4d79 Mon Sep 17 00:00:00 2001 From: zhangPinkdolphin <2308717915@qq.com> Date: Sat, 9 May 2026 15:01:45 +0800 Subject: [PATCH 4/4] feat: add SmartRouter integration with /llm auto command and auto-routing --- agentmain.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/agentmain.py b/agentmain.py index c205e2ba..2c6109d3 100644 --- a/agentmain.py +++ b/agentmain.py @@ -6,7 +6,7 @@ elif hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(errors='replace') sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -from llmcore import reload_mykeys, LLMSession, ToolClient, ClaudeSession, MixinSession, NativeToolClient, NativeClaudeSession, NativeOAISession, resolve_client +from llmcore import reload_mykeys, LLMSession, ToolClient, ClaudeSession, MixinSession, NativeToolClient, NativeClaudeSession, NativeOAISession, resolve_client, SmartRouter from agent_loop import agent_runner_loop from ga import GenericAgentHandler, smart_format, get_global_memory, format_error, consume_file @@ -49,6 +49,8 @@ def __init__(self): self.is_running = False; self.stop_sig = False self.llm_no = 0; self.inc_out = False; self.verbose = True self.peer_hint = True + self.router = SmartRouter() + self.auto_route = False self.log_path = os.path.join(script_dir, f'temp/model_responses/model_responses_{int(time.time()*1e6)%1000000:06d}.txt') self.load_llm_sessions() @@ -120,6 +122,21 @@ def _handle_slash_cmd(self, raw_query, display_queue): return None if raw_query.strip() == '/resume': return r'帮我看看最近有哪些会话可以恢复。读model_responses/目录,按修改时间取最近10个文件,从每个文件里找最后一个...块,用一句话总结每个会话在聊什么,列表给我选。注意读文件后要把字面的\n替换成真换行才能正确匹配。' + if raw_query.strip() == '/llm auto': + self.auto_route = not self.auto_route + display_queue.put({'done': smart_format(f"✅ 智能路由 {'开启' if self.auto_route else '关闭'}"), 'source': 'system'}) + return None + if raw_query.strip().startswith('/llm'): + parts = raw_query.strip().split() + if len(parts) == 2 and parts[1] in ('simple', 'complex'): + target_no = 1 if parts[1] == 'complex' else 0 + if target_no < len(self.llmclients): + self.llm_no = target_no + self.next_llm(self.llm_no) + display_queue.put({'done': smart_format(f"✅ 已切换到 {parts[1].upper()} 模型: {self.get_llm_name(b=True)}"), 'source': 'system'}) + else: + display_queue.put({'done': smart_format(f"❌ 没有 {parts[1].upper()} 模型可用"), 'source': 'system'}) + return None return raw_query def run(self): @@ -143,6 +160,19 @@ def run(self): if ps > 0: handler.working['key_info'] += f'\n[SYSTEM] 此为 {ps} 个对话前设置的key_info,若已在新任务,先更新或清除工作记忆。\n' self.handler = handler # although new handler, the **full** history is in llmclient, so it is full history! self.llmclient.log_path = self.log_path + # 自动路由逻辑 + if self.auto_route: + decision = self.router.classify(raw_query) + target_no = 1 if decision == "complex" else 0 + if target_no < len(self.llmclients) and target_no != self.llm_no: + old_name = self.get_llm_name(model=True) + self.llm_no = target_no + self.next_llm(self.llm_no) + new_sys_prompt = get_system_prompt() + getattr(self.llmclient.backend, "extra_sys_prompt", "") + handler = GenericAgentHandler(self, self.history, os.path.join(script_dir, "temp")) + self.handler = handler + self.llmclient.log_path = self.log_path + sys_prompt = new_sys_prompt gen = agent_runner_loop(self.llmclient, sys_prompt, raw_query, handler, TOOLS_SCHEMA, max_turns=70, verbose=self.verbose) try: @@ -268,3 +298,4 @@ def run(self): except KeyboardInterrupt: agent.abort() print('\n[Interrupted]') +