diff --git a/cookbook/client/server/megatron/entrypoint.sh b/cookbook/client/server/megatron/entrypoint.sh new file mode 100755 index 00000000..34a2bfa5 --- /dev/null +++ b/cookbook/client/server/megatron/entrypoint.sh @@ -0,0 +1,194 @@ +#!/bin/bash + +# Container entrypoint for Twinkle Megatron service. +# This process supervises run.sh and owns external health checks. + +set -u + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RUN_SCRIPT="$SCRIPT_DIR/run.sh" +TWINKLE_WORK_DIR="${TWINKLE_WORK_DIR:-/dashscope/caches/application/twinkle}" +TEMP_DIR="${TWINKLE_TEMP_DIR:-/dashscope/caches/application/ray_logs}" +LOG_FILE="$TWINKLE_WORK_DIR/run.log" +TWINKLE_HEALTH_URL="${TWINKLE_HEALTH_URL:-http://127.0.0.1:9000/api/v1/healthz}" +TWINKLE_WATCHDOG_INTERVAL_SECONDS="${TWINKLE_WATCHDOG_INTERVAL_SECONDS:-10}" +TWINKLE_WATCHDOG_FAILURE_THRESHOLD="${TWINKLE_WATCHDOG_FAILURE_THRESHOLD:-3}" +TWINKLE_RAY_GRACE_SECONDS="${TWINKLE_RAY_GRACE_SECONDS:-30}" +TWINKLE_HEALTH_GRACE_SECONDS="${TWINKLE_HEALTH_GRACE_SECONDS:-${TWINKLE_WATCHDOG_STARTUP_GRACE_SECONDS:-300}}" +RESTART_BACKOFF_SECONDS="${TWINKLE_ENTRYPOINT_RESTART_BACKOFF_SECONDS:-10}" + +CHILD_PID="" + +print_warning() { + echo -e "\033[33m[WARNING]\033[0m $1" +} + +print_error() { + echo -e "\033[31m[ERROR]\033[0m $1" +} + +require_non_negative_int() { + local name="$1" + local value="$2" + if ! [[ "$value" =~ ^[0-9]+$ ]]; then + print_error "$name 必须是非负整数,当前值: $value" + exit 1 + fi +} + +require_positive_int() { + local name="$1" + local value="$2" + if ! [[ "$value" =~ ^[1-9][0-9]*$ ]]; then + print_error "$name 必须是正整数,当前值: $value" + exit 1 + fi +} + +require_command() { + local command_name="$1" + if ! command -v "$command_name" &> /dev/null; then + print_error "缺少必需命令: $command_name" + exit 1 + fi +} + +validate_entrypoint_config() { + require_positive_int "TWINKLE_WATCHDOG_INTERVAL_SECONDS" "$TWINKLE_WATCHDOG_INTERVAL_SECONDS" + require_positive_int "TWINKLE_WATCHDOG_FAILURE_THRESHOLD" "$TWINKLE_WATCHDOG_FAILURE_THRESHOLD" + require_non_negative_int "TWINKLE_RAY_GRACE_SECONDS" "$TWINKLE_RAY_GRACE_SECONDS" + require_non_negative_int "TWINKLE_HEALTH_GRACE_SECONDS" "$TWINKLE_HEALTH_GRACE_SECONDS" + require_non_negative_int "TWINKLE_ENTRYPOINT_RESTART_BACKOFF_SECONDS" "$RESTART_BACKOFF_SECONDS" + + require_command timeout + require_command ray + require_command tail + + if ! command -v curl &> /dev/null && ! command -v wget &> /dev/null \ + && ! command -v python3 &> /dev/null && ! command -v python &> /dev/null; then + print_error "缺少 HTTP health 检查命令: curl, wget, python3 或 python" + exit 1 + fi +} + +check_http_health() { + if command -v curl &> /dev/null; then + curl -fsS --max-time 10 "$TWINKLE_HEALTH_URL" >/dev/null + return + fi + + if command -v wget &> /dev/null; then + wget -q --spider --timeout=10 "$TWINKLE_HEALTH_URL" + return + fi + + local python_bin="python3" + if ! command -v "$python_bin" &> /dev/null; then + python_bin="python" + fi + "$python_bin" - "$TWINKLE_HEALTH_URL" <<'PY' +import sys +import urllib.request + +url = sys.argv[1] +try: + with urllib.request.urlopen(url, timeout=10) as response: + sys.exit(0 if response.status == 200 else 1) +except Exception: + sys.exit(1) +PY +} + +print_watchdog_diagnostics() { + print_warning "EntryPoint watchdog 诊断信息:" + echo " - health url: $TWINKLE_HEALTH_URL" + echo " - run.sh pid: ${CHILD_PID:-unset}" + echo " - Ray logs: $TEMP_DIR/session_latest/logs" + + print_warning "ray status 输出:" + ray status 2>&1 || true + + if [ -f "$LOG_FILE" ]; then + print_warning "最近 100 行 Twinkle Server 日志:" + tail -n 100 "$LOG_FILE" || true + fi +} + +stop_child() { + if [ -n "$CHILD_PID" ] && kill -0 "$CHILD_PID" 2>/dev/null; then + kill -TERM "$CHILD_PID" 2>/dev/null || true + wait "$CHILD_PID" 2>/dev/null || true + fi + CHILD_PID="" +} + +stop_child_and_exit() { + stop_child + exit 143 +} + +trap stop_child_and_exit TERM INT + +case "${1:-}" in + --help|-h|--restart) + exec bash "$RUN_SCRIPT" "$@" + ;; +esac + +validate_entrypoint_config + +while true; do + TWINKLE_RUN_EXISTING_ACTION="${TWINKLE_RUN_EXISTING_ACTION:-restart}" bash "$RUN_SCRIPT" "$@" & + CHILD_PID=$! + + WATCHDOG_FAILURES=0 + WATCHDOG_STARTED_AT=$SECONDS + EXIT_CODE=0 + + while true; do + if ! kill -0 "$CHILD_PID" 2>/dev/null; then + wait "$CHILD_PID" + EXIT_CODE=$? + CHILD_PID="" + break + fi + + WATCHDOG_FAILURE_REASON="" + WATCHDOG_GRACE_SECONDS=0 + if ! timeout 10 ray status >/dev/null 2>&1; then + WATCHDOG_FAILURE_REASON="ray status failed" + WATCHDOG_GRACE_SECONDS="$TWINKLE_RAY_GRACE_SECONDS" + elif ! check_http_health; then + WATCHDOG_FAILURE_REASON="http health check failed: $TWINKLE_HEALTH_URL" + WATCHDOG_GRACE_SECONDS="$TWINKLE_HEALTH_GRACE_SECONDS" + fi + + if [ -z "$WATCHDOG_FAILURE_REASON" ]; then + WATCHDOG_FAILURES=0 + else + WATCHDOG_ELAPSED=$(( SECONDS - WATCHDOG_STARTED_AT )) + if [ "$WATCHDOG_ELAPSED" -lt "$WATCHDOG_GRACE_SECONDS" ]; then + print_warning "EntryPoint watchdog 启动宽限期内检查失败 (${WATCHDOG_ELAPSED}s/${WATCHDOG_GRACE_SECONDS}s): $WATCHDOG_FAILURE_REASON" + else + WATCHDOG_FAILURES=$((WATCHDOG_FAILURES + 1)) + print_warning "EntryPoint watchdog 检查失败 ($WATCHDOG_FAILURES/$TWINKLE_WATCHDOG_FAILURE_THRESHOLD): $WATCHDOG_FAILURE_REASON" + fi + fi + + if [ "$WATCHDOG_FAILURES" -ge "$TWINKLE_WATCHDOG_FAILURE_THRESHOLD" ]; then + print_error "EntryPoint watchdog 连续失败达到阈值,准备重启 run.sh" + print_watchdog_diagnostics + stop_child + EXIT_CODE=1 + break + fi + + sleep "$TWINKLE_WATCHDOG_INTERVAL_SECONDS" + done + + echo "[twinkle-entrypoint] run.sh exited with code $EXIT_CODE; restarting in ${RESTART_BACKOFF_SECONDS}s" + sleep "$RESTART_BACKOFF_SECONDS" & + CHILD_PID=$! + wait "$CHILD_PID" 2>/dev/null || true + CHILD_PID="" +done diff --git a/cookbook/client/server/megatron/run.sh b/cookbook/client/server/megatron/run.sh index 56ea111a..3c017b93 100644 --- a/cookbook/client/server/megatron/run.sh +++ b/cookbook/client/server/megatron/run.sh @@ -8,16 +8,28 @@ # 用法:./run.sh [选项] # # 选项: -# --head NODE Head 节点 GPU 配置,格式 "设备列表:数量" (默认: 0,1,2,3:4) -# --gpu-workers LIST GPU Worker 列表,分号分隔多个节点 (默认: 4,5,6,7:4) +# --restart 如果已有 run.sh 实例正在运行,请求其退出并由 entrypoint 重启服务 +# --head NODE Head 节点 GPU 设备列表,逗号分隔 (默认: 0,1,2,3) +# --gpu-workers LIST GPU Worker 列表,分号分隔多个节点 (默认: 4,5,6,7) # --cpu-workers N CPU Worker 数量 (默认: 1) # --temp-dir DIR Ray 临时目录 (默认: /dashscope/caches/application/ray_logs) # --save-dir DIR Twinkle 模型保存目录 (默认: /dashscope/caches/application/save) # --server-config FILE Twinkle 服务器配置文件路径 (默认: /twinkle/cookbook/client/server/megatron/server_config.yaml) # --help 显示帮助信息 # +# 环境变量: +# MODELSCOPE_CACHE 默认 /dashscope/caches/application/.cache +# TWINKLE_WORK_DIR 默认 /dashscope/caches/application/twinkle +# TWINKLE_RUN_EXISTING_ACTION 已有 run.sh 进程运行时的行为:exit 或 restart(默认 exit) +# TWINKLE_RUN_RESTART_TIMEOUT_SECONDS --restart 等待已有实例接收请求秒数(默认 120) +# # 示例: -# ./run.sh # 使用默认配置 +# bash /twinkle/cookbook/client/server/megatron/entrypoint.sh +# # 容器启动入口,负责 run.sh 保活和外部 health 检查 +# bash /twinkle/cookbook/client/server/megatron/run.sh +# # 直接启动服务,前台等待 server 子进程 +# bash /twinkle/cookbook/client/server/megatron/run.sh --restart +# # 更新代码后请求已有 run.sh 退出并由 entrypoint 重启 # ./run.sh --head "0,1,2,3" --gpu-workers "4,5,6,7" --cpu-workers 1 # ./run.sh --head "0,1,2,3" --gpu-workers "" --cpu-workers 0 # ./run.sh --head "" --cpu-workers 4 # 纯 CPU 模式 @@ -32,13 +44,13 @@ set -e # 遇到错误立即退出 # --- Ray 集群配置 --- # Head 节点(必须是第一个启动) -# 格式:"GPU设备列表:GPU数量",如 "0,1,2,3:4" +# 格式:"GPU设备列表",如 "0,1,2,3" # 如果不需要 GPU,设为空字符串 "" # 可通过命令行参数 $1 传入 # GPU Worker 节点列表(可以有多个) -# 格式:用分号分隔的 "GPU设备列表:GPU数量" -# 示例:"4,5,6,7:4" 或 "4,5,6,7:4;8,9,10,11:4" +# 格式:用分号分隔的 "GPU设备列表" +# 示例:"4,5,6,7" 或 "4,5,6,7;8,9,10,11" # 可通过命令行参数 $2 传入 # CPU Worker 数量 @@ -49,6 +61,8 @@ RAY_PORT=6379 RAY_ADDRESS="127.0.0.1:$RAY_PORT" # --- 路径配置 --- +export MODELSCOPE_CACHE="${MODELSCOPE_CACHE:-/dashscope/caches/application/.cache}" +TWINKLE_WORK_DIR="${TWINKLE_WORK_DIR:-/dashscope/caches/application/twinkle}" DEFAULT_TEMP_DIR="/dashscope/caches/application/ray_logs" LOG_FILE="run.log" REDIS_LOG_FILE="/twinkle/redis.log" @@ -65,6 +79,17 @@ PYROSCOPE_VERSION="${PYROSCOPE_VERSION:-v2.0.2}" OPENTELEMETRY_COLLECTOR_VERSION="${OPENTELEMETRY_COLLECTOR_VERSION:-v0.151.0}" OBI_VERSION="${OBI_VERSION:-v0.9.0}" +# --- 单实例与重启请求配置 --- +RUN_SCRIPT_PATH="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")" +TWINKLE_RUN_OWNER="${USER:-shared}" +TWINKLE_RUN_PID_FILE="${TWINKLE_RUN_PID_FILE:-/tmp/twinkle-megatron-run-${TWINKLE_RUN_OWNER}.pid}" +TWINKLE_RUN_RESTART_REQUEST_FILE="${TWINKLE_RUN_RESTART_REQUEST_FILE:-/tmp/twinkle-megatron-run-${TWINKLE_RUN_OWNER}.restart}" +TWINKLE_RUN_EXISTING_ACTION="${TWINKLE_RUN_EXISTING_ACTION:-exit}" +TWINKLE_RUN_RESTART_TIMEOUT_SECONDS="${TWINKLE_RUN_RESTART_TIMEOUT_SECONDS:-120}" +SERVER_PID="" +TAIL_PID="" +RESTART_REQUESTED_BY_SIGNAL=0 + # ============================================ # 参数解析(支持 --key=value 或 --key value 格式) # ============================================ @@ -77,9 +102,49 @@ TEMP_DIR="$DEFAULT_TEMP_DIR" SAVE_DIR="$DEFAULT_SAVE_DIR" SERVER_CONFIG_FILE="$DEFAULT_SERVER_CONFIG_FILE" +print_usage() { + cat </dev/null; then + return 1 + fi + + if [ -r "/proc/$pid/cmdline" ]; then + process_cwd="$(cd "/proc/$pid/cwd" 2>/dev/null && pwd || true)" + while IFS= read -r -d '' command_arg; do + if is_run_script_arg "$command_arg" "$process_cwd"; then + return 0 + fi + done < "/proc/$pid/cmdline" + return 1 + fi + + command_line="$(ps -p "$pid" -o command= 2>/dev/null || true)" + case "$command_line" in + *"$RUN_SCRIPT_PATH"*|*" ./$(basename "$RUN_SCRIPT_PATH")"*|*" $(basename "$RUN_SCRIPT_PATH")"*) + return 0 + ;; + *) + return 1 + ;; + esac +} + +is_run_script_arg() { + local command_arg="$1" + local process_cwd="$2" + local command_dir + + if [ "$command_arg" = "$RUN_SCRIPT_PATH" ]; then + return 0 + fi + + if [ "$(basename "$command_arg")" != "$(basename "$RUN_SCRIPT_PATH")" ]; then + return 1 + fi + + case "$command_arg" in + /*) + [ "$command_arg" = "$RUN_SCRIPT_PATH" ] + return + ;; + esac + + if [ -z "$process_cwd" ]; then + return 1 + fi + + command_dir="$(cd "$process_cwd/$(dirname "$command_arg")" 2>/dev/null && pwd || true)" + [ "$command_dir/$(basename "$RUN_SCRIPT_PATH")" = "$RUN_SCRIPT_PATH" ] +} + +find_existing_run_pid() { + local pid + + if [ -f "$TWINKLE_RUN_PID_FILE" ]; then + pid="$(cat "$TWINKLE_RUN_PID_FILE" 2>/dev/null || true)" + if is_run_script_process "$pid"; then + echo "$pid" + return 0 + fi + fi + + return 1 +} + +register_run_instance() { + local old_pid + + old_pid="$(find_existing_run_pid || true)" + if [ -z "$old_pid" ]; then + echo "$$" > "$TWINKLE_RUN_PID_FILE" + rm -f "$TWINKLE_RUN_RESTART_REQUEST_FILE" + return 0 + fi + + if [ "$TWINKLE_RUN_EXISTING_ACTION" != "restart" ]; then + print_error "已有 run.sh 实例正在运行,退出以避免中断当前服务" + print_error "如需主动重启,请使用 --restart 或设置 TWINKLE_RUN_EXISTING_ACTION=restart" + exit 1 + fi + + print_warning "检测到已有 run.sh 实例 (PID: $old_pid),准备重启..." + echo "$$" > "$TWINKLE_RUN_RESTART_REQUEST_FILE" + if ! kill -USR1 "$old_pid" 2>/dev/null; then + rm -f "$TWINKLE_RUN_RESTART_REQUEST_FILE" + print_error "无法向已有 run.sh 实例发送重启请求 (PID: $old_pid)" + exit 1 + fi + + for ((i=1; i<=TWINKLE_RUN_RESTART_TIMEOUT_SECONDS; i++)); do + if [ ! -f "$TWINKLE_RUN_RESTART_REQUEST_FILE" ]; then + print_success "已有 run.sh 已接收重启请求,将退出并由 entrypoint 重启服务" + exit 0 + fi + if ! kill -0 "$old_pid" 2>/dev/null; then + rm -f "$TWINKLE_RUN_RESTART_REQUEST_FILE" + print_error "已有 run.sh 实例在处理重启请求时退出 (PID: $old_pid)" + exit 1 + fi + sleep 1 + done + + rm -f "$TWINKLE_RUN_RESTART_REQUEST_FILE" + print_error "等待已有 run.sh 接收重启请求超时 (${TWINKLE_RUN_RESTART_TIMEOUT_SECONDS}s)" + exit 1 +} + +cleanup_pid_file() { + if [ -f "$TWINKLE_RUN_PID_FILE" ] && [ "$(cat "$TWINKLE_RUN_PID_FILE" 2>/dev/null || true)" = "$$" ]; then + rm -f "$TWINKLE_RUN_PID_FILE" + rm -f "$TWINKLE_RUN_RESTART_REQUEST_FILE" + fi +} + +require_non_negative_int() { + local name="$1" + local value="$2" + if ! [[ "$value" =~ ^[0-9]+$ ]]; then + print_error "$name 必须是非负整数,当前值: $value" + exit 1 + fi +} + +require_positive_int() { + local name="$1" + local value="$2" + if ! [[ "$value" =~ ^[1-9][0-9]*$ ]]; then + print_error "$name 必须是正整数,当前值: $value" + exit 1 + fi +} + +validate_runtime_config() { + case "$TWINKLE_RUN_EXISTING_ACTION" in + exit|restart) + ;; + *) + print_error "TWINKLE_RUN_EXISTING_ACTION 只能是 exit 或 restart,当前值: $TWINKLE_RUN_EXISTING_ACTION" + exit 1 + ;; + esac + + require_positive_int "TWINKLE_RUN_RESTART_TIMEOUT_SECONDS" "$TWINKLE_RUN_RESTART_TIMEOUT_SECONDS" + require_non_negative_int "CPU_WORKER_COUNT" "$CPU_WORKER_COUNT" +} + +require_command() { + local command_name="$1" + if ! command -v "$command_name" &> /dev/null; then + print_error "缺少必需命令: $command_name" + exit 1 + fi +} + +validate_runtime_dependencies() { + require_command ps + require_command tail + require_command python + require_command ray + require_command redis-server + require_command redis-cli + require_command pkill + require_command pgrep +} + wait_for_redis_ready() { local timeout="${1:-30}" @@ -243,6 +465,108 @@ wait_for_lgtm_ready() { return 1 } +start_log_tail() { + tail -F "$LOG_FILE" & + TAIL_PID=$! + print_info "日志 tail 已启动 (PID: $TAIL_PID)" +} + +stop_pid() { + local pid="$1" + local name="$2" + + if [ -z "$pid" ] || ! kill -0 "$pid" 2>/dev/null; then + return 0 + fi + + print_info "停止 $name (PID: $pid)..." + kill "$pid" 2>/dev/null || true + for _ in {1..10}; do + if ! kill -0 "$pid" 2>/dev/null; then + return 0 + fi + sleep 1 + done + print_warning "$name 未正常退出,强制终止..." + kill -9 "$pid" 2>/dev/null || true +} + +cleanup_existing_runtime() { + stop_pid "$TAIL_PID" "日志 tail" + stop_pid "$SERVER_PID" "Twinkle Server" + TAIL_PID="" + SERVER_PID="" + + print_info "停止已有的 Twinkle Server..." + pkill -f "twinkle.server" 2>/dev/null || true + + print_info "停止已有的 vLLM 进程..." + pkill -if "vLLM" 2>/dev/null || true + + sleep 2 + + if pgrep -f "twinkle.server" > /dev/null 2>&1; then + print_warning "Twinkle Server 未退出,强制终止..." + pkill -9 -f "twinkle.server" 2>/dev/null || true + fi + if pgrep -if "vLLM" > /dev/null 2>&1; then + print_warning "vLLM 进程未退出,强制终止..." + pkill -9 -i -f "vLLM" 2>/dev/null || true + fi + + print_info "停止已有的 Ray 集群..." + ray stop --force >/dev/null 2>&1 || true + + print_info "停止已有的 Redis..." + redis-cli -p 6380 shutdown nosave 2>/dev/null || pkill redis-server 2>/dev/null || true + if ! wait_for_redis_stopped 30; then + print_warning "Redis 未在 30 秒内退出,强制终止..." + pkill -9 redis-server 2>/dev/null || true + if ! wait_for_redis_stopped 10; then + print_error "Redis 端口 6380 未释放" + if command -v ss &> /dev/null; then + ss -lntp 2>/dev/null | grep ':6380 ' || true + fi + return 1 + fi + fi + + print_info "停止已有的 LGTM 观测栈..." + pkill -f "/otel-lgtm/run-all.sh" 2>/dev/null || true + pkill prometheus 2>/dev/null || true + pkill grafana 2>/dev/null || true + pkill otelcol 2>/dev/null || true + pkill loki 2>/dev/null || true + pkill tempo 2>/dev/null || true + pkill pyroscope 2>/dev/null || true +} + +cleanup_script_exit() { + trap - EXIT INT TERM + cleanup_existing_runtime || true + cleanup_pid_file +} + +handle_shutdown_signal() { + cleanup_script_exit + exit 143 +} + +handle_restart_signal() { + RESTART_REQUESTED_BY_SIGNAL=1 +} + +consume_restart_request() { + if [ "$RESTART_REQUESTED_BY_SIGNAL" -ne 1 ] && [ ! -f "$TWINKLE_RUN_RESTART_REQUEST_FILE" ]; then + return 1 + fi + + RESTART_REQUESTED_BY_SIGNAL=0 + rm -f "$TWINKLE_RUN_RESTART_REQUEST_FILE" + print_warning "收到 run.sh 重启请求,准备退出并由 entrypoint 重启服务" + return 0 +} + # 解析节点配置 "devices" -> 返回 devices 和自动计算 _gpu_count # 示例: "0,1,2,3" -> devices="0,1,2,3", count=4 parse_node_config() { @@ -261,228 +585,204 @@ parse_node_config() { # ============================================ # 开始启动 # ============================================ -print_header "Twinkle Megatron 服务启动脚本" - -# 打印配置信息 -print_info "集群配置:" -echo "" - -# 解析并显示 Head 节点 -parse_node_config "$HEAD_NODE" -if [ -n "$_gpu_devices" ]; then - echo " [Head 节点]" - echo " - GPU 设备: $_gpu_devices" - echo " - GPU 数量: $_gpu_count" -else - echo " [Head 节点] CPU only" -fi +print_runtime_config() { + print_header "Twinkle Megatron 服务启动脚本" -# 显示 GPU Worker 节点 -if [ ${#GPU_WORKERS[@]} -gt 0 ]; then + print_info "集群配置:" echo "" - echo " [GPU Worker 节点] 共 ${#GPU_WORKERS[@]} 个" - for i in "${!GPU_WORKERS[@]}"; do - parse_node_config "${GPU_WORKERS[$i]}" - echo " Worker $((i+1)): GPU=$_gpu_devices, Count=$_gpu_count" - done -fi - -# 显示 CPU Worker -if [ "$CPU_WORKER_COUNT" -gt 0 ]; then - echo "" - echo " [CPU Worker 节点] $CPU_WORKER_COUNT 个" -fi -echo "" -print_info "运行参数:" -echo " - Ray 地址: $RAY_ADDRESS" -echo " - 临时目录: $TEMP_DIR" -echo " - 保存目录: $TWINKLE_DEFAULT_SAVE_DIR" -echo " - 服务配置: $SERVER_CONFIG_FILE" -echo " - 日志文件: $LOG_FILE" -echo "" - -# 检查临时目录 -if [ ! -d "$TEMP_DIR" ]; then - print_info "创建临时目录: $TEMP_DIR" - mkdir -p "$TEMP_DIR" -fi - -# ============================================ -# 停止已有服务(Redis / Ray / LGTM / Twinkle) -# ============================================ -print_header "清理环境" - -# 停止 Twinkle server.py(twinkle.server 模块) -print_info "停止已有的 Twinkle Server..." -pkill -f "twinkle.server" 2>/dev/null || true + parse_node_config "$HEAD_NODE" + if [ -n "$_gpu_devices" ]; then + echo " [Head 节点]" + echo " - GPU 设备: $_gpu_devices" + echo " - GPU 数量: $_gpu_count" + else + echo " [Head 节点] CPU only" + fi -# 停止 vLLM 进程 -print_info "停止已有的 vLLM 进程..." -pkill -if "vLLM" 2>/dev/null || true + if [ ${#GPU_WORKERS[@]} -gt 0 ]; then + echo "" + echo " [GPU Worker 节点] 共 ${#GPU_WORKERS[@]} 个" + for i in "${!GPU_WORKERS[@]}"; do + parse_node_config "${GPU_WORKERS[$i]}" + echo " Worker $((i+1)): GPU=$_gpu_devices, Count=$_gpu_count" + done + fi -# 等待上述进程退出 -sleep 2 + if [ "$CPU_WORKER_COUNT" -gt 0 ]; then + echo "" + echo " [CPU Worker 节点] $CPU_WORKER_COUNT 个" + fi -# 若仍有残留则强制 SIGKILL -if pgrep -f "twinkle.server" > /dev/null 2>&1; then - print_warning "Twinkle Server 未退出,强制终止..." - pkill -9 -f "twinkle.server" 2>/dev/null || true -fi -if pgrep -if "vLLM" > /dev/null 2>&1; then - print_warning "vLLM 进程未退出,强制终止..." - pkill -9 -i -f "vLLM" 2>/dev/null || true -fi + echo "" + print_info "运行参数:" + echo " - Ray 地址: $RAY_ADDRESS" + echo " - 工作目录: $TWINKLE_WORK_DIR" + echo " - ModelScope 缓存: $MODELSCOPE_CACHE" + echo " - 临时目录: $TEMP_DIR" + echo " - 保存目录: $TWINKLE_DEFAULT_SAVE_DIR" + echo " - 服务配置: $SERVER_CONFIG_FILE" + echo " - 日志文件: $LOG_FILE" + echo "" +} -print_info "停止已有的 Ray 集群..." -ray stop --force 2>/dev/null || true +prepare_runtime_dirs() { + mkdir -p "$MODELSCOPE_CACHE" "$TEMP_DIR" "$SAVE_DIR" +} -print_info "停止已有的 Redis..." -redis-cli -p 6380 shutdown nosave 2>/dev/null || pkill redis-server 2>/dev/null || true -if ! wait_for_redis_stopped 30; then - print_warning "Redis 未在 30 秒内退出,强制终止..." - pkill -9 redis-server 2>/dev/null || true - if ! wait_for_redis_stopped 10; then - print_error "Redis 端口 6380 未释放" - if command -v ss &> /dev/null; then - ss -lntp 2>/dev/null | grep ':6380 ' || true +start_redis() { + print_header "启动 Redis" + + if command -v redis-server &> /dev/null && command -v redis-cli &> /dev/null; then + print_info "启动 Redis..." + redis-server --daemonize yes --port 6380 --save "" --appendonly no --logfile "$REDIS_LOG_FILE" + if wait_for_redis_ready 30; then + print_success "Redis 已启动 (port 6380)" + else + print_error "Redis 未能在 30 秒内启动或响应 PING (port 6380)" + if [ -f "$REDIS_LOG_FILE" ]; then + tail -n 50 "$REDIS_LOG_FILE" + fi + exit 1 fi + else + print_error "未检测到 redis-server 或 redis-cli,但 server_config.yaml 使用 redis persistence" exit 1 fi -fi - -print_info "停止已有的 LGTM 观测栈..." -pkill -f "/otel-lgtm/run-all.sh" 2>/dev/null || true -pkill prometheus 2>/dev/null || true -pkill grafana 2>/dev/null || true -pkill otelcol 2>/dev/null || true -pkill loki 2>/dev/null || true -pkill tempo 2>/dev/null || true -pkill pyroscope 2>/dev/null || true - -# ============================================ -# 启动 Redis -# ============================================ -print_header "启动 Redis" +} -if command -v redis-server &> /dev/null && command -v redis-cli &> /dev/null; then - print_info "启动 Redis..." - redis-server --daemonize yes --port 6380 --save "" --appendonly no --logfile "$REDIS_LOG_FILE" - if wait_for_redis_ready 30; then - print_success "Redis 已启动 (port 6380)" +start_ray_cluster() { + print_header "启动 Ray 集群" + + parse_node_config "$HEAD_NODE" + if [ -n "$_gpu_devices" ]; then + print_info "启动 Head 节点 (GPU: $_gpu_devices)..." + CUDA_VISIBLE_DEVICES="$_gpu_devices" ray start --head \ + --port=$RAY_PORT \ + --num-gpus=$_gpu_count \ + --disable-usage-stats \ + --include-dashboard=true \ + --temp-dir="$TEMP_DIR" else - print_error "Redis 未能在 30 秒内启动或响应 PING (port 6380)" - if [ -f "$REDIS_LOG_FILE" ]; then - tail -n 50 "$REDIS_LOG_FILE" - fi - exit 1 + print_info "启动 Head 节点 (CPU only)..." + CUDA_VISIBLE_DEVICES="" ray start --head \ + --port=$RAY_PORT \ + --num-gpus=0 \ + --disable-usage-stats \ + --include-dashboard=true \ + --temp-dir="$TEMP_DIR" fi -else - print_error "未检测到 redis-server 或 redis-cli,但 server_config.yaml 使用 redis persistence" - exit 1 -fi - -# ============================================ -# 启动 Ray Head 节点 -# ============================================ -print_header "启动 Ray 集群" - -parse_node_config "$HEAD_NODE" -if [ -n "$_gpu_devices" ]; then - print_info "启动 Head 节点 (GPU: $_gpu_devices)..." - CUDA_VISIBLE_DEVICES="$_gpu_devices" ray start --head \ - --port=$RAY_PORT \ - --num-gpus=$_gpu_count \ - --disable-usage-stats \ - --include-dashboard=true \ - --temp-dir="$TEMP_DIR" -else - print_info "启动 Head 节点 (CPU only)..." - CUDA_VISIBLE_DEVICES="" ray start --head \ - --port=$RAY_PORT \ - --num-gpus=0 \ - --disable-usage-stats \ - --include-dashboard=true \ - --temp-dir="$TEMP_DIR" -fi -print_success "Head 节点启动成功!" - -# ============================================ -# 启动 GPU Worker 节点 -# ============================================ -for i in "${!GPU_WORKERS[@]}"; do - parse_node_config "${GPU_WORKERS[$i]}" - print_info "启动 GPU Worker $((i+1)) (GPU: $_gpu_devices)..." - CUDA_VISIBLE_DEVICES="$_gpu_devices" ray start \ - --address=$RAY_ADDRESS \ - --num-gpus=$_gpu_count - print_success "GPU Worker $((i+1)) 启动成功!" -done + print_success "Head 节点启动成功!" -# ============================================ -# 启动 CPU Worker 节点 -# ============================================ -if [ "$CPU_WORKER_COUNT" -gt 0 ]; then - print_info "启动 $CPU_WORKER_COUNT 个 CPU Worker..." - for ((i=1; i<=CPU_WORKER_COUNT; i++)); do - CUDA_VISIBLE_DEVICES="" ray start \ + for i in "${!GPU_WORKERS[@]}"; do + parse_node_config "${GPU_WORKERS[$i]}" + print_info "启动 GPU Worker $((i+1)) (GPU: $_gpu_devices)..." + CUDA_VISIBLE_DEVICES="$_gpu_devices" ray start \ --address=$RAY_ADDRESS \ - --num-gpus=0 + --num-gpus=$_gpu_count + print_success "GPU Worker $((i+1)) 启动成功!" done - print_success "CPU Worker 启动成功!" -fi -# ============================================ -# 显示集群状态 -# ============================================ -echo "" -print_info "集群状态:" -ray status 2>/dev/null || true + if [ "$CPU_WORKER_COUNT" -gt 0 ]; then + print_info "启动 $CPU_WORKER_COUNT 个 CPU Worker..." + for ((i=1; i<=CPU_WORKER_COUNT; i++)); do + CUDA_VISIBLE_DEVICES="" ray start \ + --address=$RAY_ADDRESS \ + --num-gpus=0 + done + print_success "CPU Worker 启动成功!" + fi -# ============================================ -# 启动 LGTM 观测栈(Grafana + OTel Collector + Prometheus + Tempo + Loki) -# ============================================ -print_header "启动 LGTM 观测栈(可选)" + echo "" + print_info "集群状态:" + ray status 2>/dev/null || true +} -if [ -d "/otel-lgtm" ]; then - if [ -f /otel-lgtm/prometheus.yaml.orig ]; then - cp /otel-lgtm/prometheus.yaml.orig /otel-lgtm/prometheus.yaml - fi +start_lgtm() { + print_header "启动 LGTM 观测栈(可选)" + + if [ -d "/otel-lgtm" ]; then + if [ -f /otel-lgtm/prometheus.yaml.orig ]; then + cp /otel-lgtm/prometheus.yaml.orig /otel-lgtm/prometheus.yaml + fi - print_info "启动 LGTM 观测栈..." - rm -f /tmp/ready - export LGTM_VERSION GRAFANA_VERSION PROMETHEUS_VERSION TEMPO_VERSION LOKI_VERSION - export PYROSCOPE_VERSION OPENTELEMETRY_COLLECTOR_VERSION OBI_VERSION - (cd /otel-lgtm && exec nohup ./run-all.sh > /twinkle/lgtm.log 2>&1) & - LGTM_PID=$! - - if wait_for_lgtm_ready "$LGTM_PID" 120; then - print_success "LGTM 观测栈已启动" - echo " - Grafana: http://localhost:3000 (admin/admin)" - echo " - OTLP gRPC: localhost:4317" - echo " - OTLP HTTP: localhost:4318" - echo " - 日志文件: /twinkle/lgtm.log" + print_info "启动 LGTM 观测栈..." + rm -f /tmp/ready + export LGTM_VERSION GRAFANA_VERSION PROMETHEUS_VERSION TEMPO_VERSION LOKI_VERSION + export PYROSCOPE_VERSION OPENTELEMETRY_COLLECTOR_VERSION OBI_VERSION + (cd /otel-lgtm && exec nohup ./run-all.sh > /twinkle/lgtm.log 2>&1) & + LGTM_PID=$! + + if wait_for_lgtm_ready "$LGTM_PID" 120; then + print_success "LGTM 观测栈已启动" + echo " - Grafana: http://localhost:3000 (admin/admin)" + echo " - OTLP gRPC: localhost:4317" + echo " - OTLP HTTP: localhost:4318" + echo " - 日志文件: /twinkle/lgtm.log" + else + print_warning "LGTM 观测栈未在 120 秒内就绪,Twinkle 将继续启动" + echo " - 日志文件: /twinkle/lgtm.log" + fi else - print_warning "LGTM 观测栈未在 120 秒内就绪,Twinkle 将继续启动" - echo " - 日志文件: /twinkle/lgtm.log" + print_warning "未检测到 LGTM 观测栈 (/otel-lgtm),跳过" fi -else - print_warning "未检测到 LGTM 观测栈 (/otel-lgtm),跳过" -fi +} -# ============================================ -# 启动 Twinkle 服务器 -# ============================================ -print_header "启动 Twinkle 服务器" +start_twinkle_server() { + print_header "启动 Twinkle 服务器" + + print_info "日志输出到: $LOG_FILE" + echo "" + + touch "$LOG_FILE" + nohup python -m twinkle.server launch --config "$SERVER_CONFIG_FILE" > "$LOG_FILE" 2>&1 & + SERVER_PID=$! + print_success "Twinkle Server 已启动 (PID: $SERVER_PID)" -print_info "日志输出到: $LOG_FILE" -echo "" + start_log_tail +} -# 启动服务器并实时显示日志 -touch "$LOG_FILE" # 预创建文件,避免 tail -f 在文件尚未写入时报错 -nohup python -m twinkle.server launch --config "$SERVER_CONFIG_FILE" > "$LOG_FILE" 2>&1 & -SERVER_PID=$! -print_success "Twinkle Server 已启动 (PID: $SERVER_PID)" +wait_runtime() { + print_info "Twinkle runtime 已启动,等待 server 进程..." + while true; do + if consume_restart_request; then + return 0 + fi + + if ! kill -0 "$SERVER_PID" 2>/dev/null; then + print_error "Twinkle Server 进程已退出 (PID: $SERVER_PID)" + return 1 + fi + + if ! kill -0 "$TAIL_PID" 2>/dev/null; then + print_warning "日志 tail 进程已退出,重新启动..." + start_log_tail + fi + + sleep 1 || true + done +} + +run_service_once() { + print_runtime_config + prepare_runtime_dirs + print_header "清理环境" + cleanup_existing_runtime + start_redis + start_ray_cluster + start_lgtm + start_twinkle_server + wait_runtime +} -# 实时显示日志(阻塞进程) -tail -f "$LOG_FILE" +validate_runtime_config +validate_runtime_dependencies +mkdir -p "$TWINKLE_WORK_DIR" +cd "$TWINKLE_WORK_DIR" +trap handle_restart_signal USR1 +register_run_instance +trap cleanup_script_exit EXIT +trap handle_shutdown_signal INT TERM + +run_service_once +exit 0 diff --git a/src/twinkle/processor/base.py b/src/twinkle/processor/base.py index 8709d98a..467355e5 100644 --- a/src/twinkle/processor/base.py +++ b/src/twinkle/processor/base.py @@ -33,6 +33,7 @@ class InputProcessor: 'inputs_embeds': 0.0, 'attention_mask': 0, 'labels': -100, + 'completion_mask': 0, 'loss_scale': 0.0, 'position_ids': -1, 'length': -1, diff --git a/src/twinkle/server/deployment.py b/src/twinkle/server/deployment.py index 8bb3c076..bb4e3c1c 100644 --- a/src/twinkle/server/deployment.py +++ b/src/twinkle/server/deployment.py @@ -18,16 +18,20 @@ The middleware ordering is the load-bearing invariant: FastAPI runs ``http`` middleware in LIFO order, so the LAST registered is the OUTERMOST. The fixed -registration sequence here — optional cleanup → ``verify_token`` → tracing → -metrics — reproduces the per-builder execution order exactly -(``metrics → tracing → verify_token → [cleanup] → handler``), with metrics as -the outermost layer wrapping tracing. +registration sequence here — optional cleanup → exception boundary → +``verify_token`` → tracing → metrics → optional replica-id header — reproduces +the per-builder execution order exactly +(``[replica-id] → metrics → tracing → verify_token → exception boundary → +[cleanup] → handler``), with metrics wrapping tracing and the replica-id header +wrapping the full response path when enabled. """ from __future__ import annotations +import traceback from collections.abc import Awaitable, Callable from contextlib import asynccontextmanager from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse from ray import serve from typing import Any @@ -73,10 +77,14 @@ def build_deployment_app( 2. [if ``attach_cleanup_middleware``] the gateway-only lazy-cleanup middleware (registered first ⇒ innermost), since the Gateway has no per-handler hook; - 3. ``verify_token`` middleware; - 4. ``create_tracing_middleware(component)``; - 5. ``create_metrics_middleware(component)`` (registered last ⇒ outermost); - 6. ``register_routes(app, get_servable)``. + 3. ``catch_unhandled_exceptions`` middleware, inside auth/tracing/metrics + and outside cleanup/routes; + 4. ``verify_token`` middleware; + 5. ``create_tracing_middleware(component)``; + 6. ``create_metrics_middleware(component)``; + 7. [if ``attach_replica_id_header``] replica-id response header middleware + (registered last ⇒ outermost); + 8. ``register_routes(app, get_servable)``. Args: component: ``'Gateway' | 'Model' | 'Sampler' | 'Processor'`` — used as @@ -117,7 +125,8 @@ async def lifespan(app: FastAPI): # Registration order matters: FastAPI runs middleware LIFO, so the LAST # registered wraps the outermost layer. Register cleanup (if any) first so - # it stays innermost, then auth, then tracing, then metrics (outermost). + # it stays innermost, then the exception boundary, auth, tracing, metrics, + # and finally the optional replica-id header as the outermost layer. if attach_cleanup_middleware: @app.middleware('http') @@ -129,6 +138,15 @@ async def ensure_state_cleanup_started(request: Request, call_next): await get_servable()._ensure_state_cleanup_started() return await call_next(request) + @app.middleware('http') + async def catch_unhandled_exceptions(request: Request, call_next): + try: + return await call_next(request) + except Exception: + error = traceback.format_exc() + logger.error(error) + return JSONResponse(status_code=500, content={'detail': error}) + @app.middleware('http') async def verify_token(request: Request, call_next): return await verify_request_token(request=request, call_next=call_next) diff --git a/src/twinkle/server/utils/task_errors.py b/src/twinkle/server/utils/task_errors.py new file mode 100644 index 00000000..478745c0 --- /dev/null +++ b/src/twinkle/server/utils/task_errors.py @@ -0,0 +1,5 @@ +# Copyright (c) ModelScope Contributors. All rights reserved. + + +def task_error_payload(error: str) -> dict[str, str]: + return {'error': error, 'category': 'Server'} diff --git a/src/twinkle/server/utils/task_queue/mixin.py b/src/twinkle/server/utils/task_queue/mixin.py index a74c8456..7b6f895e 100644 --- a/src/twinkle/server/utils/task_queue/mixin.py +++ b/src/twinkle/server/utils/task_queue/mixin.py @@ -16,6 +16,7 @@ from typing import TYPE_CHECKING, Any from twinkle.server.telemetry.middleware import get_task_metrics +from twinkle.server.utils.task_errors import task_error_payload from twinkle.utils.logger import get_logger from .config import TaskQueueConfig from .rate_limiter import RateLimiter @@ -329,7 +330,7 @@ async def _run() -> None: queue_state=QueueState.ACTIVE.value) logger.info(f'[TaskQueue] Background task {request_id} completed, type={task_type or "unknown"}') except Exception: - error_payload = {'error': traceback.format_exc(), 'category': 'Server'} + error_payload = task_error_payload(traceback.format_exc()) await self.state.store_future_status( request_id, TaskStatus.FAILED.value, diff --git a/src/twinkle/server/utils/task_queue/worker.py b/src/twinkle/server/utils/task_queue/worker.py index cb5081a4..a26888cf 100644 --- a/src/twinkle/server/utils/task_queue/worker.py +++ b/src/twinkle/server/utils/task_queue/worker.py @@ -16,6 +16,7 @@ from twinkle.server.telemetry.correlation import MODEL_ID, TOKEN_ID from twinkle.server.telemetry.tracing import traced_operation +from twinkle.server.utils.task_errors import task_error_payload from twinkle.utils.logger import get_logger from .config import TaskQueueConfig from .types import QueuedTask, QueueState, TaskStatus @@ -136,10 +137,7 @@ async def _store_task_failed( task.request_id, TaskStatus.FAILED.value, task.model_id, - result={ - 'error': error, - 'category': 'Server' - }, + result=task_error_payload(error), queue_state=queue_state, queue_state_reason=queue_state_reason, ) diff --git a/tests/processor/test_processor.py b/tests/processor/test_processor.py index 8efca819..d730499d 100644 --- a/tests/processor/test_processor.py +++ b/tests/processor/test_processor.py @@ -37,6 +37,28 @@ def test_padding_side_left(self): out = proc.collate_fn(batch) assert out[0]['input_ids'].shape == (2, 5) + def test_completion_mask_padding(self): + proc = InputProcessor(padding_free=False, padding_side='right') + batch = [ + { + 'input_ids': torch.tensor([1, 2, 3]), + 'position_ids': torch.arange(3), + 'labels': torch.tensor([-100, 10, 11]), + 'completion_mask': torch.tensor([0, 1, 1]), + }, + { + 'input_ids': torch.tensor([4, 5]), + 'position_ids': torch.arange(2), + 'labels': torch.tensor([-100, 12]), + 'completion_mask': torch.tensor([0, 1]), + }, + ] + out = proc.collate_fn(batch) + assert len(out) == 1 + b = out[0] + assert b['completion_mask'].shape == b['input_ids'].shape == (2, 3) + assert b['completion_mask'].tolist() == [[0, 1, 1], [0, 1, 0]] + class TestPaddingFreeMode: """padding_free: concatenate multiple samples into single row.""" @@ -97,7 +119,7 @@ def test_multimodal_collate(self): assert 'pixel_values' in b # 2 images x 3 channels after squeeze, cat along dim=0 -> shape[0]=6 assert b['pixel_values'].shape[0] == 6 - assert b['image_grid_thw'].shape[0] == 6 + assert b['image_grid_thw'].shape == (2, 3) class TestGRPOMode: diff --git a/tests/server/test_app_builders_characterization.py b/tests/server/test_app_builders_characterization.py index b8b17955..31f9e726 100644 --- a/tests/server/test_app_builders_characterization.py +++ b/tests/server/test_app_builders_characterization.py @@ -253,20 +253,19 @@ def _registered_http_middleware_names(app: FastAPI) -> list[str]: def _assert_middleware_lifo_order(app: FastAPI, *, expect_cleanup: bool, expect_replica_id: bool = False) -> None: - """Assert metrics is outermost, wrapping tracing, wrapping auth. + """Assert the scaffold middleware stack preserves the intended LIFO order. ``user_middleware`` is ordered OUTERMOST → INNERMOST (Starlette prepends - each new entry). So the expected sequence is metrics first, then tracing, - then ``verify_token``, with the Gateway-only cleanup middleware last as - the innermost layer (it has no per-handler hook elsewhere). - - Model and Sampler add ``inject_replica_id`` after the scaffold stack, making - it the outermost middleware. + each new entry). Model and Sampler add ``inject_replica_id`` after the + scaffold stack, making it the outermost middleware when enabled. """ names = _registered_http_middleware_names(app) - expected_prefix = ['metrics_middleware', 'tracing_middleware', 'verify_token'] - expected = (expected_prefix + ['ensure_state_cleanup_started']) if expect_cleanup else expected_prefix + expected = [] if expect_replica_id: - expected = ['inject_replica_id'] + expected + expected.append('inject_replica_id') + expected.extend(['metrics_middleware', 'tracing_middleware', 'verify_token']) + expected.append('catch_unhandled_exceptions') + if expect_cleanup: + expected.append('ensure_state_cleanup_started') assert names == expected, (f'middleware ordering mismatch — expected (outermost→innermost) ' f'{expected!r}, got {names!r}') diff --git a/tests/server/test_deployment_exception_boundary.py b/tests/server/test_deployment_exception_boundary.py new file mode 100644 index 00000000..4f61b42e --- /dev/null +++ b/tests/server/test_deployment_exception_boundary.py @@ -0,0 +1,42 @@ +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from twinkle.server.deployment import build_deployment_app + + +def test_deployment_app_catches_unhandled_route_exception_and_keeps_serving(monkeypatch): + + class _ReplicaId: + unique_id = 'replica-test' + + class _Context: + replica_id = _ReplicaId() + + from twinkle.server import deployment + + monkeypatch.setattr(deployment.serve, 'get_replica_context', lambda: _Context()) + calls = {'health': 0} + + def register_routes(app: FastAPI, _get_self): + + @app.get('/healthz') + async def healthz(): + calls['health'] += 1 + return {'ok': True, 'health_calls': calls['health']} + + @app.get('/boom') + async def boom(): + raise RuntimeError('boom with replica header') + + app = build_deployment_app('Test', register_routes, attach_replica_id_header=True) + client = TestClient(app) + + response = client.get('/boom', headers={'x-request-id': 'boundary-test'}) + assert response.status_code == 500 + assert response.headers['X-Twinkle-Replica-Id'] == 'replica-test' + assert 'Traceback' in response.json()['detail'] + assert 'RuntimeError: boom with replica header' in response.json()['detail'] + + response = client.get('/healthz') + assert response.status_code == 200 + assert response.json() == {'ok': True, 'health_calls': 1} diff --git a/tests/server/utils/test_task_errors.py b/tests/server/utils/test_task_errors.py new file mode 100644 index 00000000..0c498664 --- /dev/null +++ b/tests/server/utils/test_task_errors.py @@ -0,0 +1,10 @@ +from twinkle.server.utils.task_errors import task_error_payload + + +def test_task_error_payload_keeps_lora_traceback(): + error = 'Traceback...\nRuntimeError: No lora available for tenant session-default. Max loras: 3\n' + + assert task_error_payload(error) == { + 'error': error, + 'category': 'Server', + }