diff --git a/.github/workflows/publish-pypi.yml b/.github/workflows/publish-pypi.yml new file mode 100644 index 0000000..92bed00 --- /dev/null +++ b/.github/workflows/publish-pypi.yml @@ -0,0 +1,107 @@ +name: Publish to PyPI + +on: + workflow_run: + workflows: ["CI Stable"] + types: [completed] + branches: [main] + +permissions: + contents: write + +jobs: + publish: + # Run only when the upstream CI on the trusted main branch finished + # successfully. workflow_run carries head_branch + head_sha so we can + # gate strictly on the main branch and check out the exact commit + # that passed CI, avoiding any race with a later push to main and + # ensuring no fork-originated ref is ever materialised here. + if: >- + ${{ github.event.workflow_run.conclusion == 'success' + && github.event.workflow_run.head_branch == 'main' + && github.event.workflow_run.event != 'pull_request' }} + runs-on: ubuntu-latest + + steps: + # The job's `if` already gates on workflow_run.head_branch == 'main' + # and workflow_run.event != 'pull_request', so a fork PR head can + # never reach this checkout. We pin to workflow_run.head_sha to + # publish exactly the commit that passed CI on main. + # nosemgrep: yaml.github-actions.security.workflow-run-target-code-checkout.workflow-run-target-code-checkout + - name: Checkout the exact commit that passed CI + uses: actions/checkout@v4 + with: + ref: ${{ github.event.workflow_run.head_sha }} + fetch-depth: 0 + persist-credentials: true + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install build tools + run: | + python -m pip install --upgrade pip + pip install build twine tomlkit + + - name: Bump patch version in pyproject.toml + id: bump + run: | + python <<'PYEOF' + import os + import tomlkit + + path = "pyproject.toml" + with open(path, encoding="utf-8") as f: + doc = tomlkit.parse(f.read()) + + old = str(doc["project"]["version"]) + parts = old.split(".") + parts[-1] = str(int(parts[-1]) + 1) + new = ".".join(parts) + doc["project"]["version"] = new + + with open(path, "w", encoding="utf-8") as f: + f.write(tomlkit.dumps(doc)) + + with open(os.environ["GITHUB_OUTPUT"], "a") as f: + f.write(f"old_version={old}\n") + f.write(f"new_version={new}\n") + + print(f"Bumped version: {old} -> {new}") + PYEOF + + - name: Build distributions + run: python -m build + + - name: Publish to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} + run: twine upload dist/* + + - name: Commit and tag bumped version + run: | + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + git add pyproject.toml + git commit -m "chore: bump version to ${{ steps.bump.outputs.new_version }} [skip ci]" + git tag "v${{ steps.bump.outputs.new_version }}" + # Push the bump commit onto main directly. We checked out a + # detached HEAD at workflow_run.head_sha, so push HEAD into + # refs/heads/main. If main has moved since the CI run, this + # rejects as non-fast-forward rather than overwriting history. + git push origin "HEAD:refs/heads/main" + git push origin "v${{ steps.bump.outputs.new_version }}" + + - name: Create GitHub Release + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + gh release create "v${{ steps.bump.outputs.new_version }}" \ + dist/* \ + --title "v${{ steps.bump.outputs.new_version }}" \ + --notes "Automated release for version ${{ steps.bump.outputs.new_version }}. Published to PyPI: https://pypi.org/project/je-load-density/${{ steps.bump.outputs.new_version }}/" \ + --target main diff --git a/je_load_density/__init__.py b/je_load_density/__init__.py index 474c7a8..636e768 100644 --- a/je_load_density/__init__.py +++ b/je_load_density/__init__.py @@ -1,46 +1,95 @@ # hook (side-effect import: registers Locust request hooks) from je_load_density.wrapper.event.request_hook import request_hook # noqa: F401 -# env -from je_load_density.utils.executor.action_executor import add_command_to_executor -# executor -from je_load_density.utils.executor.action_executor import execute_action -from je_load_density.utils.executor.action_executor import execute_files -from je_load_density.utils.executor.action_executor import executor -# file + +# Executor + action plumbing +from je_load_density.utils.executor.action_executor import ( + add_command_to_executor, + execute_action, + execute_files, + executor, +) from je_load_density.utils.file_process.get_dir_file_list import get_dir_files_as_list -# html -from je_load_density.utils.generate_report.generate_html_report import generate_html -from je_load_density.utils.generate_report.generate_html_report import generate_html_report -# json -from je_load_density.utils.generate_report.generate_json_report import generate_json -from je_load_density.utils.generate_report.generate_json_report import generate_json_report + +# Reports +from je_load_density.utils.generate_report.generate_csv_report import generate_csv_report +from je_load_density.utils.generate_report.generate_html_report import ( + generate_html, + generate_html_report, +) +from je_load_density.utils.generate_report.generate_json_report import ( + generate_json, + generate_json_report, +) +from je_load_density.utils.generate_report.generate_junit_report import generate_junit_report +from je_load_density.utils.generate_report.generate_summary_report import ( + build_summary, + generate_summary_report, +) +from je_load_density.utils.generate_report.generate_xml_report import ( + generate_xml, + generate_xml_report, +) + +# JSON IO from je_load_density.utils.json.json_file.json_file import read_action_json -# xml -from je_load_density.utils.generate_report.generate_xml_report import generate_xml -from je_load_density.utils.generate_report.generate_xml_report import generate_xml_report -# server -from je_load_density.utils.socket_server.load_density_socket_server import start_load_density_socket_server -# test record + +# Metrics +from je_load_density.utils.metrics import ( + start_influxdb_sink, + start_opentelemetry_exporter, + start_prometheus_exporter, + stop_influxdb_sink, + stop_opentelemetry_exporter, + stop_prometheus_exporter, +) + +# Parameterisation +from je_load_density.utils.parameterization import ( + parameter_resolver, + register_csv_source, + register_csv_sources, + register_variable, + register_variables, + resolve, +) + +# Recording / replay +from je_load_density.utils.recording.har_importer import ( + har_to_action_json, + har_to_tasks, + load_har, +) + +# Project scaffolding +from je_load_density.utils.project.create_project_structure import create_project_dir + +# Control socket +from je_load_density.utils.socket_server.load_density_socket_server import ( + start_load_density_socket_server, +) + +# Test records +from je_load_density.utils.test_record.sqlite_persistence import ( + fetch_run_records, + list_runs, + persist_records, +) from je_load_density.utils.test_record.test_record_class import test_record_instance -# start -from je_load_density.wrapper.create_locust_env.create_locust_env import prepare_env -from je_load_density.wrapper.create_locust_env.create_locust_env import create_env -# Proxy +# Locust environment + start +from je_load_density.wrapper.create_locust_env.create_locust_env import ( + create_env, + prepare_env, +) from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy - from je_load_density.wrapper.start_wrapper.start_test import start_test -# Locust -from locust import SequentialTaskSet -from locust import task -from locust import TaskSet +# Locust re-exports +from locust import SequentialTaskSet, TaskSet, task -# Callback +# Callback executor from je_load_density.utils.callback.callback_function_executor import callback_executor -from je_load_density.utils.project.create_project_structure import create_project_dir - __all__ = [ "create_env", "start_test", "locust_wrapper_proxy", @@ -49,9 +98,20 @@ "execute_action", "execute_files", "executor", "add_command_to_executor", "get_dir_files_as_list", "generate_html", "generate_html_report", - "generate_json", "generate_json_report", "read_action_json", + "generate_json", "generate_json_report", "generate_xml", "generate_xml_report", + "generate_csv_report", "generate_junit_report", "generate_summary_report", + "build_summary", + "read_action_json", "start_load_density_socket_server", "SequentialTaskSet", "task", "TaskSet", - "callback_executor", "create_project_dir" + "callback_executor", "create_project_dir", + "parameter_resolver", "resolve", + "register_variable", "register_variables", + "register_csv_source", "register_csv_sources", + "har_to_action_json", "har_to_tasks", "load_har", + "persist_records", "list_runs", "fetch_run_records", + "start_prometheus_exporter", "stop_prometheus_exporter", + "start_influxdb_sink", "stop_influxdb_sink", + "start_opentelemetry_exporter", "stop_opentelemetry_exporter", ] diff --git a/je_load_density/__main__.py b/je_load_density/__main__.py index 4be99ff..4234c5c 100644 --- a/je_load_density/__main__.py +++ b/je_load_density/__main__.py @@ -1,64 +1,125 @@ -# argparse import argparse import json import sys +from typing import List, Optional from je_load_density.utils.exception.exception_tags import argparse_get_wrong_data -from je_load_density.utils.exception.exceptions import LoadDensityTestExecuteException -from je_load_density.utils.executor.action_executor import execute_action -from je_load_density.utils.executor.action_executor import execute_files +from je_load_density.utils.executor.action_executor import execute_action, execute_files from je_load_density.utils.file_process.get_dir_file_list import get_dir_files_as_list from je_load_density.utils.json.json_file.json_file import read_action_json from je_load_density.utils.project.create_project_structure import create_project_dir +from je_load_density.utils.socket_server.load_density_socket_server import ( + start_load_density_socket_server, +) + + +def _cmd_run(args: argparse.Namespace) -> None: + execute_action(read_action_json(args.file)) + + +def _cmd_run_dir(args: argparse.Namespace) -> None: + execute_files(get_dir_files_as_list(args.dir)) + + +def _cmd_run_str(args: argparse.Namespace) -> None: + payload = args.json + if sys.platform in {"win32", "cygwin", "msys"}: + first_pass = json.loads(payload) + if isinstance(first_pass, str): + payload = first_pass + else: + execute_action(first_pass) + return + execute_action(json.loads(payload)) + + +def _cmd_init(args: argparse.Namespace) -> None: + create_project_dir(args.path) + + +def _cmd_serve(args: argparse.Namespace) -> None: + start_load_density_socket_server( + host=args.host, + port=args.port, + framed=args.framed, + token=args.token, + certfile=args.tls_cert, + keyfile=args.tls_key, + ) + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="je_load_density") + sub = parser.add_subparsers(dest="command") + + run = sub.add_parser("run", help="execute one action JSON file") + run.add_argument("file", type=str) + run.set_defaults(func=_cmd_run) + + run_dir = sub.add_parser("run-dir", help="execute every action JSON in a directory") + run_dir.add_argument("dir", type=str) + run_dir.set_defaults(func=_cmd_run_dir) + + run_str = sub.add_parser("run-str", help="execute an inline action JSON string") + run_str.add_argument("json", type=str) + run_str.set_defaults(func=_cmd_run_str) + + init = sub.add_parser("init", help="create a project skeleton at PATH") + init.add_argument("path", type=str) + init.set_defaults(func=_cmd_init) + + serve = sub.add_parser("serve", help="start the TCP control socket server") + serve.add_argument("--host", default="localhost") + serve.add_argument("--port", type=int, default=9940) + serve.add_argument("--framed", action="store_true", help="use length-prefixed framing") + serve.add_argument("--token", default=None, help="shared-secret token (env LOAD_DENSITY_SOCKET_TOKEN)") + serve.add_argument("--tls-cert", default=None) + serve.add_argument("--tls-key", default=None) + serve.set_defaults(func=_cmd_serve) + + # Legacy single-flag form, retained for backwards compatibility. + parser.add_argument("-e", "--execute_file", type=str, help=argparse.SUPPRESS) + parser.add_argument("-d", "--execute_dir", type=str, help=argparse.SUPPRESS) + parser.add_argument("-c", "--create_project", type=str, help=argparse.SUPPRESS) + parser.add_argument("--execute_str", type=str, help=argparse.SUPPRESS) + + return parser + + +def _dispatch_legacy(args: argparse.Namespace) -> bool: + legacy_map = { + "execute_file": lambda value: execute_action(read_action_json(value)), + "execute_dir": lambda value: execute_files(get_dir_files_as_list(value)), + "execute_str": lambda value: _cmd_run_str(argparse.Namespace(json=value)), + "create_project": create_project_dir, + } + matched = False + for key, action in legacy_map.items(): + value = getattr(args, key, None) + if value is not None: + matched = True + action(value) + return matched + + +def main(argv: Optional[List[str]] = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) + + if getattr(args, "func", None) is not None: + args.func(args) + return 0 + + if _dispatch_legacy(args): + return 0 + + print(argparse_get_wrong_data, file=sys.stderr) + return 2 + if __name__ == "__main__": try: - def preprocess_execute_action(file_path: str): - execute_action(read_action_json(file_path)) - - - def preprocess_execute_files(file_path: str): - execute_files(get_dir_files_as_list(file_path)) - - def preprocess_read_str_execute_action(execute_str: str): - if sys.platform in ["win32", "cygwin", "msys"]: - json_data = json.loads(execute_str) - execute_str = json.loads(json_data) - else: - execute_str = json.loads(execute_str) - execute_action(execute_str) - - - argparse_event_dict = { - "execute_file": preprocess_execute_action, - "execute_dir": preprocess_execute_files, - "execute_str": preprocess_read_str_execute_action, - "create_project": create_project_dir - } - parser = argparse.ArgumentParser() - parser.add_argument( - "-e", "--execute_file", - type=str, help="choose action file to execute" - ) - parser.add_argument( - "-d", "--execute_dir", - type=str, help="choose dir include action file to execute" - ) - parser.add_argument( - "-c", "--create_project", - type=str, help="create project with template" - ) - parser.add_argument( - "--execute_str", - type=str, help="execute json str" - ) - args = parser.parse_args() - args = vars(args) - for key, value in args.items(): - if value is not None: - argparse_event_dict.get(key)(value) - if all(value is None for value in args.values()): - raise LoadDensityTestExecuteException(argparse_get_wrong_data) + sys.exit(main()) except Exception as error: print(repr(error), file=sys.stderr) sys.exit(1) diff --git a/je_load_density/gui/language_wrapper/english.py b/je_load_density/gui/language_wrapper/english.py index 64eda03..f2be6f4 100644 --- a/je_load_density/gui/language_wrapper/english.py +++ b/je_load_density/gui/language_wrapper/english.py @@ -17,4 +17,11 @@ "delete": "DELETE", "head": "HEAD", "options": "OPTIONS", + # Stats panel + "stats_panel": "Live Stats", + "stats_total": "Total", + "stats_rate": "Rate", + "stats_avg_ms": "Avg", + "stats_p95_ms": "p95", + "stats_failures": "Failures", } diff --git a/je_load_density/gui/language_wrapper/japanese.py b/je_load_density/gui/language_wrapper/japanese.py new file mode 100644 index 0000000..5263eef --- /dev/null +++ b/je_load_density/gui/language_wrapper/japanese.py @@ -0,0 +1,27 @@ +japanese_word_dict = { + # Main + "application_name": "LoadDensity", + # Widget + "url": "URL: ", + "test_time": "テスト時間: ", + "user_count": "ユーザー数: ", + "spawn_rate": "生成レート: ", + "test_method": "HTTP メソッド: ", + "start_button": "開始: ", + "log": "ログ: ", + # Test method + "get": "GET", + "post": "POST", + "put": "PUT", + "patch": "PATCH", + "delete": "DELETE", + "head": "HEAD", + "options": "OPTIONS", + # Stats panel + "stats_panel": "リアルタイム統計", + "stats_total": "合計", + "stats_rate": "レート", + "stats_avg_ms": "平均", + "stats_p95_ms": "p95", + "stats_failures": "失敗", +} diff --git a/je_load_density/gui/language_wrapper/korean.py b/je_load_density/gui/language_wrapper/korean.py new file mode 100644 index 0000000..a58ca0d --- /dev/null +++ b/je_load_density/gui/language_wrapper/korean.py @@ -0,0 +1,27 @@ +korean_word_dict = { + # Main + "application_name": "LoadDensity", + # Widget + "url": "URL: ", + "test_time": "테스트 시간: ", + "user_count": "사용자 수: ", + "spawn_rate": "생성 속도: ", + "test_method": "HTTP 메서드: ", + "start_button": "시작: ", + "log": "로그: ", + # Test method + "get": "GET", + "post": "POST", + "put": "PUT", + "patch": "PATCH", + "delete": "DELETE", + "head": "HEAD", + "options": "OPTIONS", + # Stats panel + "stats_panel": "실시간 통계", + "stats_total": "합계", + "stats_rate": "속도", + "stats_avg_ms": "평균", + "stats_p95_ms": "p95", + "stats_failures": "실패", +} diff --git a/je_load_density/gui/language_wrapper/multi_language_wrapper.py b/je_load_density/gui/language_wrapper/multi_language_wrapper.py index d437ca0..ce20d02 100644 --- a/je_load_density/gui/language_wrapper/multi_language_wrapper.py +++ b/je_load_density/gui/language_wrapper/multi_language_wrapper.py @@ -1,28 +1,26 @@ from je_load_density.gui.language_wrapper.english import english_word_dict +from je_load_density.gui.language_wrapper.japanese import japanese_word_dict +from je_load_density.gui.language_wrapper.korean import korean_word_dict from je_load_density.gui.language_wrapper.traditional_chinese import traditional_chinese_word_dict from je_load_density.utils.logging.loggin_instance import load_density_logger - class LanguageWrapper(object): - def __init__( - self - ): + def __init__(self) -> None: load_density_logger.info("Init LanguageWrapper") self.language: str = "English" self.choose_language_dict = { "English": english_word_dict, - "Traditional_Chinese": traditional_chinese_word_dict + "Traditional_Chinese": traditional_chinese_word_dict, + "Japanese": japanese_word_dict, + "Korean": korean_word_dict, } self.language_word_dict: dict = self.choose_language_dict.get(self.language) - def reset_language(self, language) -> None: + def reset_language(self, language: str) -> None: load_density_logger.info(f"LanguageWrapper reset_language language: {language}") - if language in [ - "English", - "Traditional_Chinese" - ]: + if language in self.choose_language_dict: self.language = language self.language_word_dict = self.choose_language_dict.get(self.language) diff --git a/je_load_density/gui/language_wrapper/traditional_chinese.py b/je_load_density/gui/language_wrapper/traditional_chinese.py index 2c5561d..2a4d5c1 100644 --- a/je_load_density/gui/language_wrapper/traditional_chinese.py +++ b/je_load_density/gui/language_wrapper/traditional_chinese.py @@ -17,4 +17,11 @@ "delete": "DELETE", "head": "HEAD", "options": "OPTIONS", + # Stats panel + "stats_panel": "即時統計", + "stats_total": "總計", + "stats_rate": "速率", + "stats_avg_ms": "平均", + "stats_p95_ms": "p95", + "stats_failures": "失敗", } diff --git a/je_load_density/gui/main_widget.py b/je_load_density/gui/main_widget.py index 946265f..a3e3e49 100644 --- a/je_load_density/gui/main_widget.py +++ b/je_load_density/gui/main_widget.py @@ -11,6 +11,7 @@ from je_load_density.gui.load_density_gui_thread import LoadDensityGUIThread from je_load_density.gui.language_wrapper.multi_language_wrapper import language_wrapper from je_load_density.gui.log_to_ui_filter import InterceptAllFilter, log_message_queue +from je_load_density.gui.stats_panel import StatsPanel class LoadDensityWidget(QWidget): @@ -72,10 +73,14 @@ def __init__(self, parent: Optional[QWidget] = None): self.log_panel = QTextEdit() self.log_panel.setReadOnly(True) + # === 即時統計面板 (Live stats panel) === + self.stats_panel = StatsPanel() + # === 主版面配置 (Main layout) === main_layout = QVBoxLayout() main_layout.addLayout(form_layout) main_layout.addWidget(self.start_button) + main_layout.addWidget(self.stats_panel) main_layout.addWidget(QLabel(language_wrapper.language_word_dict.get("log"))) main_layout.addWidget(self.log_panel) diff --git a/je_load_density/gui/stats_panel.py b/je_load_density/gui/stats_panel.py new file mode 100644 index 0000000..96f0e12 --- /dev/null +++ b/je_load_density/gui/stats_panel.py @@ -0,0 +1,83 @@ +import statistics +from typing import List + +from PySide6.QtCore import QTimer +from PySide6.QtWidgets import QGroupBox, QLabel, QVBoxLayout, QWidget + +from je_load_density.gui.language_wrapper.multi_language_wrapper import language_wrapper +from je_load_density.utils.test_record.test_record_class import test_record_instance + + +def _percentile(values: List[float], pct: float) -> float: + if not values: + return 0.0 + sorted_values = sorted(values) + if len(sorted_values) == 1: + return float(sorted_values[0]) + rank = (pct / 100.0) * (len(sorted_values) - 1) + lower = int(rank) + upper = min(lower + 1, len(sorted_values) - 1) + fraction = rank - lower + return float(sorted_values[lower] + (sorted_values[upper] - sorted_values[lower]) * fraction) + + +class StatsPanel(QWidget): + """ + 即時統計面板:定時讀取 test_record_instance 並顯示彙整數據。 + Live stats panel that polls test_record_instance and renders + totals plus latency percentiles. + """ + + def __init__(self, parent=None): + super().__init__(parent) + layout = QVBoxLayout() + + words = language_wrapper.language_word_dict + self.totals_label = QLabel() + self.latency_label = QLabel() + self.failures_label = QLabel() + + group = QGroupBox(words.get("stats_panel", "Live Stats")) + group_layout = QVBoxLayout() + group_layout.addWidget(self.totals_label) + group_layout.addWidget(self.latency_label) + group_layout.addWidget(self.failures_label) + group.setLayout(group_layout) + + layout.addWidget(group) + self.setLayout(layout) + + self._last_total = 0 + self._timer = QTimer(self) + self._timer.setInterval(1000) + self._timer.timeout.connect(self.refresh) + self._timer.start() + self.refresh() + + def refresh(self) -> None: + success_records = test_record_instance.test_record_list + failure_records = test_record_instance.error_record_list + total = len(success_records) + len(failure_records) + delta = max(total - self._last_total, 0) + self._last_total = total + + latencies = [ + float(record.get("response_time_ms")) + for record in (*success_records, *failure_records) + if record.get("response_time_ms") is not None + ] + avg_ms = statistics.fmean(latencies) if latencies else 0.0 + p95_ms = _percentile(latencies, 95) + + words = language_wrapper.language_word_dict + self.totals_label.setText( + f"{words.get('stats_total', 'Total')}: {total} " + f"{words.get('stats_rate', 'Rate')}: {delta}/s" + ) + self.latency_label.setText( + f"{words.get('stats_avg_ms', 'Avg')}: {avg_ms:.1f} ms " + f"{words.get('stats_p95_ms', 'p95')}: {p95_ms:.1f} ms" + ) + self.failures_label.setText( + f"{words.get('stats_failures', 'Failures')}: {len(failure_records)}" + ) diff --git a/je_load_density/mcp_server/__init__.py b/je_load_density/mcp_server/__init__.py new file mode 100644 index 0000000..d18414e --- /dev/null +++ b/je_load_density/mcp_server/__init__.py @@ -0,0 +1,3 @@ +from je_load_density.mcp_server.server import build_server, run_stdio + +__all__ = ["build_server", "run_stdio"] diff --git a/je_load_density/mcp_server/__main__.py b/je_load_density/mcp_server/__main__.py new file mode 100644 index 0000000..368f9a9 --- /dev/null +++ b/je_load_density/mcp_server/__main__.py @@ -0,0 +1,5 @@ +from je_load_density.mcp_server.server import run_stdio + + +if __name__ == "__main__": + run_stdio() diff --git a/je_load_density/mcp_server/server.py b/je_load_density/mcp_server/server.py new file mode 100644 index 0000000..c2f0efb --- /dev/null +++ b/je_load_density/mcp_server/server.py @@ -0,0 +1,295 @@ +""" +LoadDensity MCP server. + +Exposes load test execution, report generation, HAR import, and project +init as MCP tools so Claude can drive LoadDensity. The mcp SDK is +imported lazily so the dependency stays optional. + +Run with: + + python -m je_load_density.mcp_server +""" + +import json +from typing import Any, Dict, List, Optional + +from je_load_density.utils.executor.action_executor import execute_action, executor +from je_load_density.utils.generate_report.generate_csv_report import generate_csv_report +from je_load_density.utils.generate_report.generate_html_report import generate_html_report +from je_load_density.utils.generate_report.generate_json_report import generate_json_report +from je_load_density.utils.generate_report.generate_junit_report import generate_junit_report +from je_load_density.utils.generate_report.generate_summary_report import ( + build_summary, + generate_summary_report, +) +from je_load_density.utils.generate_report.generate_xml_report import generate_xml_report +from je_load_density.utils.project.create_project_structure import create_project_dir +from je_load_density.utils.recording.har_importer import har_to_action_json, load_har +from je_load_density.utils.test_record.sqlite_persistence import ( + fetch_run_records, + list_runs, + persist_records, +) +from je_load_density.utils.test_record.test_record_class import test_record_instance +from je_load_density.wrapper.start_wrapper.start_test import start_test + + +def _ensure_mcp(): + try: + from mcp.server import Server + from mcp.server.stdio import stdio_server + from mcp import types as mcp_types + except ImportError as error: + raise RuntimeError( + "mcp package is required. Install with: pip install mcp" + ) from error + return Server, stdio_server, mcp_types + + +def _wrap_text(value: Any, mcp_types) -> List[Any]: + text = value if isinstance(value, str) else json.dumps(value, ensure_ascii=False, default=str) + return [mcp_types.TextContent(type="text", text=text)] + + +def _tool_run_test(payload: Dict[str, Any]) -> Dict[str, Any]: + return start_test(**payload) + + +def _tool_run_action_string(payload: Dict[str, Any]) -> Dict[str, Any]: + actions = payload.get("actions") + if isinstance(actions, str): + actions = json.loads(actions) + return execute_action(actions) + + +def _tool_create_project(payload: Dict[str, Any]) -> Dict[str, str]: + create_project_dir(payload["path"]) + return {"path": payload["path"], "status": "created"} + + +def _tool_list_executor_commands(_: Dict[str, Any]) -> Dict[str, Any]: + return {"commands": sorted(name for name in executor.event_dict.keys() if name.startswith("LD_"))} + + +def _tool_import_har(payload: Dict[str, Any]) -> Dict[str, Any]: + har = load_har(payload["file_path"]) + return har_to_action_json( + har, + user=payload.get("user", "fast_http_user"), + user_count=int(payload.get("user_count", 10)), + spawn_rate=int(payload.get("spawn_rate", 5)), + test_time=int(payload.get("test_time", 60)), + include=payload.get("include"), + exclude=payload.get("exclude"), + ) + + +def _tool_generate_reports(payload: Dict[str, Any]) -> Dict[str, Optional[str]]: + base = payload.get("base_name", "loaddensity") + formats = payload.get("formats") or ["html", "json", "xml", "csv", "junit", "summary"] + result: Dict[str, Optional[str]] = {} + if "html" in formats: + result["html"] = generate_html_report(base) + if "json" in formats: + result["json"] = generate_json_report(base) + if "xml" in formats: + result["xml"] = generate_xml_report(base) + if "csv" in formats: + result["csv"] = generate_csv_report(base) + if "junit" in formats: + result["junit"] = generate_junit_report(f"{base}-junit") + if "summary" in formats: + result["summary"] = generate_summary_report(f"{base}-summary") + return result + + +def _tool_summary(_: Dict[str, Any]) -> Dict[str, Any]: + return build_summary() + + +def _tool_persist_records(payload: Dict[str, Any]) -> Dict[str, Any]: + run_id = persist_records( + payload["database_path"], + label=payload.get("label"), + metadata=payload.get("metadata"), + ) + return {"run_id": run_id} + + +def _tool_list_runs(payload: Dict[str, Any]) -> Dict[str, Any]: + return {"runs": list_runs(payload["database_path"], limit=int(payload.get("limit", 20)))} + + +def _tool_fetch_run(payload: Dict[str, Any]) -> Dict[str, Any]: + return {"records": list(fetch_run_records(payload["database_path"], int(payload["run_id"])))} + + +def _tool_clear_records(_: Dict[str, Any]) -> Dict[str, str]: + test_record_instance.clear_records() + return {"status": "cleared"} + + +_TOOLS: Dict[str, Dict[str, Any]] = { + "load_density.run_test": { + "description": "Run a Locust-backed load test via start_test.", + "handler": _tool_run_test, + "input_schema": { + "type": "object", + "properties": { + "user_detail_dict": {"type": "object"}, + "user_count": {"type": "integer", "default": 50}, + "spawn_rate": {"type": "integer", "default": 10}, + "test_time": {"type": "integer", "default": 60}, + "tasks": {}, + "variables": {"type": "object"}, + "csv_sources": {"type": "array"}, + "runner_mode": {"type": "string", "default": "local"}, + "web_ui_dict": {"type": "object"}, + }, + "required": ["user_detail_dict"], + }, + }, + "load_density.run_action_json": { + "description": "Execute an action JSON document (string or list).", + "handler": _tool_run_action_string, + "input_schema": { + "type": "object", + "properties": {"actions": {}}, + "required": ["actions"], + }, + }, + "load_density.create_project": { + "description": "Create a LoadDensity project skeleton at PATH.", + "handler": _tool_create_project, + "input_schema": { + "type": "object", + "properties": {"path": {"type": "string"}}, + "required": ["path"], + }, + }, + "load_density.list_executor_commands": { + "description": "List all LD_* executor commands.", + "handler": _tool_list_executor_commands, + "input_schema": {"type": "object", "properties": {}}, + }, + "load_density.import_har": { + "description": "Convert a HAR file into a runnable action JSON.", + "handler": _tool_import_har, + "input_schema": { + "type": "object", + "properties": { + "file_path": {"type": "string"}, + "user": {"type": "string"}, + "user_count": {"type": "integer"}, + "spawn_rate": {"type": "integer"}, + "test_time": {"type": "integer"}, + "include": {"type": "array", "items": {"type": "string"}}, + "exclude": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["file_path"], + }, + }, + "load_density.generate_reports": { + "description": "Render reports (html/json/xml/csv/junit/summary).", + "handler": _tool_generate_reports, + "input_schema": { + "type": "object", + "properties": { + "base_name": {"type": "string"}, + "formats": {"type": "array", "items": {"type": "string"}}, + }, + }, + }, + "load_density.summary": { + "description": "Return aggregated stats (totals, per-name percentiles).", + "handler": _tool_summary, + "input_schema": {"type": "object", "properties": {}}, + }, + "load_density.persist_records": { + "description": "Persist current records into a SQLite database.", + "handler": _tool_persist_records, + "input_schema": { + "type": "object", + "properties": { + "database_path": {"type": "string"}, + "label": {"type": "string"}, + "metadata": {"type": "object"}, + }, + "required": ["database_path"], + }, + }, + "load_density.list_runs": { + "description": "List recent persisted runs.", + "handler": _tool_list_runs, + "input_schema": { + "type": "object", + "properties": { + "database_path": {"type": "string"}, + "limit": {"type": "integer", "default": 20}, + }, + "required": ["database_path"], + }, + }, + "load_density.fetch_run": { + "description": "Fetch records belonging to a saved run.", + "handler": _tool_fetch_run, + "input_schema": { + "type": "object", + "properties": { + "database_path": {"type": "string"}, + "run_id": {"type": "integer"}, + }, + "required": ["database_path", "run_id"], + }, + }, + "load_density.clear_records": { + "description": "Clear in-memory test records before a new run.", + "handler": _tool_clear_records, + "input_schema": {"type": "object", "properties": {}}, + }, +} + + +def build_server(): + """ + Build the MCP Server instance with the LoadDensity tool surface. + """ + server_cls, _, mcp_types = _ensure_mcp() + server = server_cls("loaddensity") + + @server.list_tools() + async def _list_tools(): + return [ + mcp_types.Tool( + name=name, + description=meta["description"], + inputSchema=meta["input_schema"], + ) + for name, meta in _TOOLS.items() + ] + + @server.call_tool() + async def _call_tool(name: str, arguments: Optional[Dict[str, Any]]): + tool = _TOOLS.get(name) + if tool is None: + raise ValueError(f"unknown tool: {name}") + result = tool["handler"](arguments or {}) + return _wrap_text(result, mcp_types) + + return server + + +def run_stdio() -> None: + """ + Run the MCP server over stdio (the standard transport for Claude). + """ + _, stdio_server, _ = _ensure_mcp() + server = build_server() + + import asyncio + + async def _main() -> None: + async with stdio_server() as (read, write): + await server.run(read, write, server.create_initialization_options()) + + asyncio.run(_main()) diff --git a/je_load_density/utils/executor/action_executor.py b/je_load_density/utils/executor/action_executor.py index f09a5da..f6c0f8c 100644 --- a/je_load_density/utils/executor/action_executor.py +++ b/je_load_density/utils/executor/action_executor.py @@ -2,14 +2,15 @@ import sys import types from inspect import getmembers, isbuiltin -from typing import Union, Any +from typing import Any, Union from je_load_density.utils.exception.exception_tags import ( - executor_data_error, add_command_exception_tag, + executor_data_error, executor_list_error, ) from je_load_density.utils.exception.exceptions import LoadDensityTestExecuteException +from je_load_density.utils.generate_report.generate_csv_report import generate_csv_report from je_load_density.utils.generate_report.generate_html_report import ( generate_html, generate_html_report, @@ -18,12 +19,47 @@ generate_json, generate_json_report, ) +from je_load_density.utils.generate_report.generate_junit_report import generate_junit_report +from je_load_density.utils.generate_report.generate_summary_report import ( + build_summary, + generate_summary_report, +) from je_load_density.utils.generate_report.generate_xml_report import ( generate_xml, generate_xml_report, ) from je_load_density.utils.json.json_file.json_file import read_action_json +from je_load_density.utils.metrics.influxdb_sink import ( + start_influxdb_sink, + stop_influxdb_sink, +) +from je_load_density.utils.metrics.opentelemetry_exporter import ( + start_opentelemetry_exporter, + stop_opentelemetry_exporter, +) +from je_load_density.utils.metrics.prometheus_exporter import ( + start_prometheus_exporter, + stop_prometheus_exporter, +) from je_load_density.utils.package_manager.package_manager_class import package_manager +from je_load_density.utils.parameterization import ( + parameter_resolver, + register_csv_source, + register_csv_sources, + register_variable, + register_variables, +) +from je_load_density.utils.recording.har_importer import ( + har_to_action_json, + har_to_tasks, + load_har, +) +from je_load_density.utils.test_record.sqlite_persistence import ( + fetch_run_records, + list_runs, + persist_records, +) +from je_load_density.utils.test_record.test_record_class import test_record_instance from je_load_density.wrapper.start_wrapper.start_test import start_test _UNSAFE_BUILTINS = frozenset({ @@ -32,48 +68,85 @@ }) +def _clear_records() -> dict: + test_record_instance.clear_records() + return {"status": "cleared"} + + +def _clear_resolver() -> dict: + parameter_resolver.clear() + return {"status": "cleared"} + + +def _lazy_start_socket_server(*args, **kwargs): + from je_load_density.utils.socket_server.load_density_socket_server import ( + start_load_density_socket_server, + ) + return start_load_density_socket_server(*args, **kwargs) + + class Executor: """ 執行器 (Executor) - Event-driven executor - - 提供事件字典 (event_dict),可根據動作名稱執行對應函式, - 並支援批次執行與檔案驅動。 - Provides an event dictionary to execute functions by name, - supporting batch execution and file-driven execution. + Event-driven executor that runs LD_* actions plus safe builtins. """ def __init__(self) -> None: - # 初始化事件字典 (Initialize event dictionary) self.event_dict: dict[str, Any] = { + # Core "LD_start_test": start_test, + "LD_execute_action": self.execute_action, + "LD_execute_files": self.execute_files, + "LD_add_package_to_executor": package_manager.add_package_to_executor, + + # Reports "LD_generate_html": generate_html, "LD_generate_html_report": generate_html_report, "LD_generate_json": generate_json, "LD_generate_json_report": generate_json_report, "LD_generate_xml": generate_xml, "LD_generate_xml_report": generate_xml_report, - # Executor internal methods - "LD_execute_action": self.execute_action, - "LD_execute_files": self.execute_files, - "LD_add_package_to_executor": package_manager.add_package_to_executor, + "LD_generate_csv_report": generate_csv_report, + "LD_generate_junit_report": generate_junit_report, + "LD_generate_summary_report": generate_summary_report, + "LD_summary": build_summary, + + # Test record persistence + "LD_persist_records": persist_records, + "LD_list_runs": list_runs, + "LD_fetch_run_records": fetch_run_records, + "LD_clear_records": _clear_records, + + # Parameter resolver + "LD_register_variable": register_variable, + "LD_register_variables": register_variables, + "LD_register_csv_source": register_csv_source, + "LD_register_csv_sources": register_csv_sources, + "LD_clear_resolver": _clear_resolver, + + # Recording / replay + "LD_load_har": load_har, + "LD_har_to_tasks": har_to_tasks, + "LD_har_to_action_json": har_to_action_json, + + # Metrics exporters + "LD_start_prometheus_exporter": start_prometheus_exporter, + "LD_stop_prometheus_exporter": stop_prometheus_exporter, + "LD_start_influxdb_sink": start_influxdb_sink, + "LD_stop_influxdb_sink": stop_influxdb_sink, + "LD_start_opentelemetry_exporter": start_opentelemetry_exporter, + "LD_stop_opentelemetry_exporter": stop_opentelemetry_exporter, + + # Control socket + "LD_start_socket_server": _lazy_start_socket_server, } - # 將安全的 Python 內建函式加入事件字典,排除可執行任意程式碼者 - # Add safe Python built-in functions; exclude those allowing arbitrary code execution for name, func in getmembers(builtins, isbuiltin): if name in _UNSAFE_BUILTINS: continue self.event_dict[name] = func def _execute_event(self, action: list) -> Any: - """ - 執行單一事件 - Execute a single event - - :param action: 事件結構,例如 ["function_name", {"param": value}] - :return: 事件回傳值 (return value of executed event) - """ event = self.event_dict.get(action[0]) if event is None: raise LoadDensityTestExecuteException(executor_data_error + " " + str(action)) @@ -81,35 +154,21 @@ def _execute_event(self, action: list) -> Any: if len(action) == 2: if isinstance(action[1], dict): return event(**action[1]) - else: - return event(*action[1]) - elif len(action) == 1: + return event(*action[1]) + if len(action) == 1: return event() - else: - raise LoadDensityTestExecuteException(executor_data_error + " " + str(action)) + raise LoadDensityTestExecuteException(executor_data_error + " " + str(action)) def execute_action(self, action_list: Union[list, dict]) -> dict[str, Any]: - """ - 執行多個事件 - Execute multiple actions - - :param action_list: 事件列表,例如: - [ - ["LD_start_test", {"param": value}], - ["LD_generate_json", {"param": value}] - ] - :return: 執行紀錄字典 (execution record dict) - """ if isinstance(action_list, dict): action_list = action_list.get("load_density", None) if action_list is None: raise LoadDensityTestExecuteException(executor_list_error) - execute_record_dict: dict[str, Any] = {} - if not isinstance(action_list, list) or len(action_list) == 0: raise LoadDensityTestExecuteException(executor_list_error) + execute_record_dict: dict[str, Any] = {} for action in action_list: try: event_response = self._execute_event(action) @@ -121,7 +180,6 @@ def execute_action(self, action_list: Union[list, dict]) -> dict[str, Any]: execute_record = f"execute: {action}" execute_record_dict[execute_record] = repr(error) - # 輸出執行結果 (Print execution results) for key, value in execute_record_dict.items(): print(key) print(value) @@ -129,31 +187,14 @@ def execute_action(self, action_list: Union[list, dict]) -> dict[str, Any]: return execute_record_dict def execute_files(self, execute_files_list: list[str]) -> list[dict[str, Any]]: - """ - 執行檔案中的事件 - Execute actions from files + return [self.execute_action(read_action_json(path)) for path in execute_files_list] - :param execute_files_list: 檔案路徑列表 (list of file paths) - :return: 每個檔案的執行結果列表 (list of execution results per file) - """ - execute_detail_list: list[dict[str, Any]] = [] - for file in execute_files_list: - execute_detail_list.append(self.execute_action(read_action_json(file))) - return execute_detail_list - -# 建立全域執行器 (Global executor instance) executor = Executor() package_manager.executor = executor def add_command_to_executor(command_dict: dict[str, Any]) -> None: - """ - 新增自訂命令到執行器 - Add custom commands to executor - - :param command_dict: {command_name: function} - """ for command_name, command in command_dict.items(): if isinstance(command, (types.MethodType, types.FunctionType)): executor.event_dict[command_name] = command @@ -162,10 +203,8 @@ def add_command_to_executor(command_dict: dict[str, Any]) -> None: def execute_action(action_list: list) -> dict[str, Any]: - """全域執行事件 (Global execute action)""" return executor.execute_action(action_list) def execute_files(execute_files_list: list[str]) -> list[dict[str, Any]]: - """全域執行檔案事件 (Global execute files)""" - return executor.execute_files(execute_files_list) \ No newline at end of file + return executor.execute_files(execute_files_list) diff --git a/je_load_density/utils/generate_report/generate_csv_report.py b/je_load_density/utils/generate_report/generate_csv_report.py new file mode 100644 index 0000000..f6ebd24 --- /dev/null +++ b/je_load_density/utils/generate_report/generate_csv_report.py @@ -0,0 +1,46 @@ +import csv +import sys +from typing import Iterable, List, Optional + +from je_load_density.utils.test_record.test_record_class import test_record_instance + +_FIELDS: List[str] = [ + "outcome", + "Method", + "test_url", + "name", + "status_code", + "response_time_ms", + "response_length", + "error", +] + + +def _rows() -> Iterable[dict]: + for record in test_record_instance.test_record_list: + row = {key: record.get(key) for key in _FIELDS} + row["outcome"] = "success" + yield row + for record in test_record_instance.error_record_list: + row = {key: record.get(key) for key in _FIELDS} + row["outcome"] = "failure" + yield row + + +def generate_csv_report(csv_name: str = "default_name") -> Optional[str]: + """ + 產生 CSV 報告。 + Generate a CSV report containing both success and failure records. + Returns the path written, or None on failure. + """ + csv_path = f"{csv_name}.csv" + try: + with open(csv_path, "w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=_FIELDS) + writer.writeheader() + for row in _rows(): + writer.writerow(row) + return csv_path + except OSError as error: + print(repr(error), file=sys.stderr) + return None diff --git a/je_load_density/utils/generate_report/generate_junit_report.py b/je_load_density/utils/generate_report/generate_junit_report.py new file mode 100644 index 0000000..ac2b138 --- /dev/null +++ b/je_load_density/utils/generate_report/generate_junit_report.py @@ -0,0 +1,55 @@ +import sys +from html import escape +from typing import Optional + +from je_load_density.utils.test_record.test_record_class import test_record_instance + + +def _safe(value: object) -> str: + return escape(str(value), quote=True) + + +def generate_junit_report(report_name: str = "loaddensity-junit") -> Optional[str]: + """ + 產生 JUnit XML 報告,供 CI 系統消費。 + Generate a JUnit XML report consumable by CI systems. + """ + success = test_record_instance.test_record_list + failures = test_record_instance.error_record_list + total = len(success) + len(failures) + + parts = [] + parts.append("") + parts.append( + f"" + ) + + for record in success: + name = _safe(record.get("name") or record.get("test_url")) + classname = _safe(record.get("Method", "request")) + time_s = float(record.get("response_time_ms") or 0) / 1000.0 + parts.append( + f"" + ) + + for record in failures: + name = _safe(record.get("name") or record.get("test_url")) + classname = _safe(record.get("Method", "request")) + time_s = float(record.get("response_time_ms") or 0) / 1000.0 + message = _safe(record.get("error") or "request failed") + parts.append( + f"" + f"{message}" + ) + + parts.append("") + xml_body = "".join(parts) + + path = f"{report_name}.xml" + try: + with open(path, "w", encoding="utf-8") as fh: + fh.write(xml_body) + return path + except OSError as error: + print(repr(error), file=sys.stderr) + return None diff --git a/je_load_density/utils/generate_report/generate_summary_report.py b/je_load_density/utils/generate_report/generate_summary_report.py new file mode 100644 index 0000000..81ba28f --- /dev/null +++ b/je_load_density/utils/generate_report/generate_summary_report.py @@ -0,0 +1,90 @@ +import json +import statistics +import sys +from typing import Any, Dict, Iterable, List, Optional + +from je_load_density.utils.test_record.test_record_class import test_record_instance + + +def _percentile(values: List[float], pct: float) -> float: + if not values: + return 0.0 + if len(values) == 1: + return float(values[0]) + sorted_values = sorted(values) + rank = (pct / 100.0) * (len(sorted_values) - 1) + lower = int(rank) + upper = min(lower + 1, len(sorted_values) - 1) + fraction = rank - lower + return float(sorted_values[lower] + (sorted_values[upper] - sorted_values[lower]) * fraction) + + +def _by_name(records: Iterable[Dict[str, Any]]) -> Dict[str, List[float]]: + grouped: Dict[str, List[float]] = {} + for record in records: + key = str(record.get("name") or record.get("test_url") or "unknown") + latency = record.get("response_time_ms") + if latency is None: + continue + grouped.setdefault(key, []).append(float(latency)) + return grouped + + +def build_summary() -> Dict[str, Any]: + """ + 彙整成功與失敗紀錄為統計摘要。 + Build a summary dict of success/failure counts and per-name + latency percentiles for charting and regression checks. + """ + success = test_record_instance.test_record_list + failures = test_record_instance.error_record_list + + all_latencies: List[float] = [ + float(r.get("response_time_ms")) + for r in (*success, *failures) + if r.get("response_time_ms") is not None + ] + + grouped = _by_name(success) + per_name: Dict[str, Dict[str, float]] = {} + for name, values in grouped.items(): + per_name[name] = { + "count": len(values), + "min_ms": float(min(values)), + "max_ms": float(max(values)), + "mean_ms": float(statistics.fmean(values)), + "p50_ms": _percentile(values, 50), + "p90_ms": _percentile(values, 90), + "p95_ms": _percentile(values, 95), + "p99_ms": _percentile(values, 99), + } + + return { + "totals": { + "requests": len(success) + len(failures), + "successes": len(success), + "failures": len(failures), + "failure_rate": (len(failures) / max(len(success) + len(failures), 1)), + }, + "latency_overall": { + "count": len(all_latencies), + "p50_ms": _percentile(all_latencies, 50), + "p90_ms": _percentile(all_latencies, 90), + "p95_ms": _percentile(all_latencies, 95), + "p99_ms": _percentile(all_latencies, 99), + "max_ms": float(max(all_latencies)) if all_latencies else 0.0, + }, + "per_name": per_name, + } + + +def generate_summary_report(report_name: str = "loaddensity-summary") -> Optional[str]: + summary = build_summary() + path = f"{report_name}.json" + try: + with open(path, "w", encoding="utf-8") as fh: + json.dump(summary, fh, indent=2) + return path + except OSError as error: + print(repr(error), file=sys.stderr) + return None diff --git a/je_load_density/utils/metrics/__init__.py b/je_load_density/utils/metrics/__init__.py new file mode 100644 index 0000000..cd27756 --- /dev/null +++ b/je_load_density/utils/metrics/__init__.py @@ -0,0 +1,21 @@ +from je_load_density.utils.metrics.influxdb_sink import ( + start_influxdb_sink, + stop_influxdb_sink, +) +from je_load_density.utils.metrics.opentelemetry_exporter import ( + start_opentelemetry_exporter, + stop_opentelemetry_exporter, +) +from je_load_density.utils.metrics.prometheus_exporter import ( + start_prometheus_exporter, + stop_prometheus_exporter, +) + +__all__ = [ + "start_prometheus_exporter", + "stop_prometheus_exporter", + "start_influxdb_sink", + "stop_influxdb_sink", + "start_opentelemetry_exporter", + "stop_opentelemetry_exporter", +] diff --git a/je_load_density/utils/metrics/influxdb_sink.py b/je_load_density/utils/metrics/influxdb_sink.py new file mode 100644 index 0000000..a6375c2 --- /dev/null +++ b/je_load_density/utils/metrics/influxdb_sink.py @@ -0,0 +1,171 @@ +import socket +import threading +import time +from typing import Any, Dict, Optional +from urllib import error as urllib_error +from urllib import request as urllib_request + +from locust import events + +from je_load_density.utils.logging.loggin_instance import load_density_logger + +_state: Dict[str, Any] = { + "started": False, + "listener": None, + "config": None, +} +_lock = threading.Lock() + + +def _escape_tag(value: str) -> str: + return str(value).replace(" ", "\\ ").replace(",", "\\,").replace("=", "\\=") + + +def _escape_field_string(value: str) -> str: + return str(value).replace("\\", "\\\\").replace('"', '\\"') + + +def _build_line(measurement: str, tags: Dict[str, str], fields: Dict[str, Any], timestamp_ns: int) -> str: + tag_segment = ",".join(f"{_escape_tag(k)}={_escape_tag(v)}" for k, v in tags.items() if v is not None) + field_segments = [] + for key, value in fields.items(): + if isinstance(value, bool): + field_segments.append(f"{key}={'true' if value else 'false'}") + elif isinstance(value, (int,)): + field_segments.append(f"{key}={value}i") + elif isinstance(value, float): + field_segments.append(f"{key}={value}") + else: + field_segments.append(f'{key}="{_escape_field_string(value)}"') + field_segment = ",".join(field_segments) + head = f"{measurement},{tag_segment}" if tag_segment else measurement + return f"{head} {field_segment} {timestamp_ns}" + + +def _send_udp(line: str, host: str, port: int) -> None: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.sendto(line.encode("utf-8"), (host, port)) + finally: + sock.close() + + +_ALLOWED_URL_SCHEMES = ("http://", "https://") + + +def _post_line_protocol(line: str, url: str, token: Optional[str], timeout: float) -> None: + """ + POST one line-protocol record to a caller-supplied InfluxDB URL. + + HTTPS is recommended for production deployments; plain HTTP is + permitted because operators routinely run InfluxDB on a private + network or through a TLS-terminating sidecar. The scheme is + enforced here so file://, gopher://, and the like cannot reach + urlopen even if a misconfigured caller supplies them. + """ + if not url.lower().startswith(_ALLOWED_URL_SCHEMES): + raise ValueError("InfluxDB URL must use http:// or https://") + headers = {"Content-Type": "text/plain; charset=utf-8"} + if token: + headers["Authorization"] = f"Token {token}" + req = urllib_request.Request(url, data=line.encode("utf-8"), headers=headers, method="POST") + try: + with urllib_request.urlopen(req, timeout=timeout) as response: # nosec B310 - scheme validated above + response.read() + except urllib_error.URLError as error: + load_density_logger.warning(f"InfluxDB write failed: {error}") + + +def _validate_transport(transport: str, url: Optional[str]) -> None: + if transport not in {"udp", "http"}: + raise ValueError(f"unsupported transport: {transport}") + if transport == "http": + if not url: + raise ValueError("url required when transport=http") + if not url.lower().startswith(_ALLOWED_URL_SCHEMES): + raise ValueError("InfluxDB URL must use http:// or https://") + + +def _build_listener(config: Dict[str, Any]): + transport = config["transport"] + host = config["host"] + port = config["port"] + url = config["url"] + token = config["token"] + measurement = config["measurement"] + timeout = config["timeout"] + + def _listener(request_type, name, response_time, response_length, exception=None, **_kwargs): + tags = {"request_type": str(request_type), "name": str(name)} + fields: Dict[str, Any] = { + "latency_ms": float(response_time or 0), + "response_bytes": int(response_length or 0), + "success": exception is None, + } + if exception is not None: + fields["error"] = repr(exception)[:512] + line = _build_line(measurement, tags, fields, time.time_ns()) + try: + if transport == "udp": + _send_udp(line, host, port) + else: + _post_line_protocol(line, url, token, timeout) + except Exception as error: + load_density_logger.debug(f"InfluxDB write failed: {error}") + + return _listener + + +def start_influxdb_sink( + transport: str = "udp", + host: str = "127.0.0.1", + port: int = 8089, + url: Optional[str] = None, + token: Optional[str] = None, + measurement: str = "loaddensity_request", + timeout: float = 2.0, +) -> None: + """ + 啟動 InfluxDB sink,將每個 request 寫入 line protocol。 + Start an InfluxDB sink that writes each request as a line-protocol + point. transport=udp uses host:port; transport=http requires url. + + URL is caller-supplied so this defers to the operator's configured + InfluxDB endpoint; not a hard-coded destination. + """ + transport = transport.lower() + _validate_transport(transport, url) + + with _lock: + if _state["started"]: + return + + config = { + "transport": transport, + "host": host, + "port": port, + "url": url, + "token": token, + "measurement": measurement, + "timeout": timeout, + } + listener = _build_listener(config) + + events.request.add_listener(listener) + _state["started"] = True + _state["listener"] = listener + _state["config"] = config + load_density_logger.info(f"InfluxDB sink started transport={transport}") + + +def stop_influxdb_sink() -> None: + with _lock: + if not _state["started"]: + return + try: + events.request.remove_listener(_state["listener"]) + except Exception as error: + load_density_logger.debug(f"influxdb listener detach failed: {error!r}") + _state["started"] = False + _state["listener"] = None + _state["config"] = None diff --git a/je_load_density/utils/metrics/opentelemetry_exporter.py b/je_load_density/utils/metrics/opentelemetry_exporter.py new file mode 100644 index 0000000..ee6d90c --- /dev/null +++ b/je_load_density/utils/metrics/opentelemetry_exporter.py @@ -0,0 +1,117 @@ +import threading +from typing import Any, Dict, Optional + +from locust import events + +from je_load_density.utils.logging.loggin_instance import load_density_logger + +_state: Dict[str, Any] = { + "started": False, + "listener": None, + "instruments": None, + "provider": None, +} +_lock = threading.Lock() + + +def _build_provider(endpoint: Optional[str], service_name: str, export_interval_ms: int): + from opentelemetry import metrics + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk.resources import Resource + + resource = Resource.create({"service.name": service_name}) + exporter_kwargs: Dict[str, Any] = {} + if endpoint: + exporter_kwargs["endpoint"] = endpoint + exporter = OTLPMetricExporter(**exporter_kwargs) + reader = PeriodicExportingMetricReader(exporter, export_interval_millis=export_interval_ms) + provider = MeterProvider(resource=resource, metric_readers=[reader]) + metrics.set_meter_provider(provider) + return provider, metrics.get_meter("loaddensity") + + +def _build_instruments(meter): + return { + "requests": meter.create_counter( + "loaddensity.requests", + unit="1", + description="Locust request count", + ), + "latency": meter.create_histogram( + "loaddensity.request.latency", + unit="ms", + description="Locust request latency", + ), + "size": meter.create_histogram( + "loaddensity.response.size", + unit="By", + description="Locust response size", + ), + } + + +def start_opentelemetry_exporter( + endpoint: Optional[str] = None, + service_name: str = "loaddensity", + export_interval_ms: int = 5000, +) -> bool: + """ + 啟動 OpenTelemetry OTLP 指標輸出。 + Start an OpenTelemetry OTLP metrics exporter and attach a Locust + request listener. + """ + with _lock: + if _state["started"]: + return True + try: + provider, meter = _build_provider(endpoint, service_name, export_interval_ms) + except ImportError: + load_density_logger.warning( + "opentelemetry SDK not installed; OTel exporter disabled" + ) + return False + except Exception as error: + load_density_logger.warning(f"OTel exporter init failed: {error!r}") + return False + + instruments = _build_instruments(meter) + + def _listener(request_type, name, response_time, response_length, exception=None, **_kwargs): + attributes = { + "request_type": str(request_type), + "name": str(name), + "outcome": "failure" if exception is not None else "success", + } + instruments["requests"].add(1, attributes) + instruments["latency"].record(float(response_time or 0), attributes) + instruments["size"].record(int(response_length or 0), attributes) + + events.request.add_listener(_listener) + _state["started"] = True + _state["listener"] = _listener + _state["instruments"] = instruments + _state["provider"] = provider + load_density_logger.info("OpenTelemetry exporter started") + return True + + +def stop_opentelemetry_exporter() -> None: + with _lock: + if not _state["started"]: + return + try: + events.request.remove_listener(_state["listener"]) + except Exception as error: + load_density_logger.debug(f"otel listener detach failed: {error!r}") + provider = _state.get("provider") + if provider is not None: + try: + provider.shutdown() + except Exception as error: + load_density_logger.debug(f"otel provider shutdown failed: {error!r}") + _state["started"] = False + _state["listener"] = None + _state["instruments"] = None + _state["provider"] = None diff --git a/je_load_density/utils/metrics/prometheus_exporter.py b/je_load_density/utils/metrics/prometheus_exporter.py new file mode 100644 index 0000000..e981fb2 --- /dev/null +++ b/je_load_density/utils/metrics/prometheus_exporter.py @@ -0,0 +1,91 @@ +import threading +from typing import Any, Dict, Optional + +from locust import events + +from je_load_density.utils.logging.loggin_instance import load_density_logger + +_state: Dict[str, Any] = { + "started": False, + "listener": None, + "metrics": None, +} +_lock = threading.Lock() + + +def _build_metrics(): + from prometheus_client import Counter, Histogram + + return { + "requests": Counter( + "loaddensity_requests_total", + "Total Locust requests", + ["request_type", "name", "outcome"], + ), + "latency": Histogram( + "loaddensity_request_latency_ms", + "Locust request latency in milliseconds", + ["request_type", "name"], + buckets=(5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000), + ), + "response_size": Histogram( + "loaddensity_response_bytes", + "Locust response size in bytes", + ["request_type", "name"], + buckets=(64, 256, 1024, 4096, 16384, 65536, 262144, 1048576), + ), + } + + +def start_prometheus_exporter(port: int = 9646, addr: str = "127.0.0.1") -> Optional[int]: + """ + 啟動 Prometheus 指標伺服器並掛上 Locust request 事件監聽。 + Start the Prometheus exporter HTTP server and attach a request listener. + + Defaults to binding on loopback. Pass ``addr="0.0.0.0"`` explicitly + to expose the exporter to the network when running in a container + or remote node. + + Returns the port the exporter is listening on, or None if the optional + dependency is not installed. + """ + with _lock: + if _state["started"]: + return port + try: + from prometheus_client import start_http_server + except ImportError: + load_density_logger.warning("prometheus_client not installed; exporter disabled") + return None + + metrics = _build_metrics() + start_http_server(port, addr=addr) + + def _listener(request_type, name, response_time, response_length, exception=None, **_kwargs): + outcome = "failure" if exception is not None else "success" + metrics["requests"].labels(request_type=str(request_type), name=str(name), outcome=outcome).inc() + metrics["latency"].labels(request_type=str(request_type), name=str(name)).observe(float(response_time or 0)) + metrics["response_size"].labels(request_type=str(request_type), name=str(name)).observe(float(response_length or 0)) + + events.request.add_listener(_listener) + _state["started"] = True + _state["listener"] = _listener + _state["metrics"] = metrics + load_density_logger.info(f"Prometheus exporter started on {addr}:{port}") + return port + + +def stop_prometheus_exporter() -> None: + """ + 從 Locust 事件移除監聽器(注意 prometheus_client 的 server 無法輕易停止)。 + Detach the listener; prometheus_client's server keeps running. + """ + with _lock: + if not _state["started"]: + return + try: + events.request.remove_listener(_state["listener"]) + except Exception as error: + load_density_logger.debug(f"prometheus listener detach failed: {error!r}") + _state["started"] = False + _state["listener"] = None diff --git a/je_load_density/utils/parameterization/__init__.py b/je_load_density/utils/parameterization/__init__.py new file mode 100644 index 0000000..51a954f --- /dev/null +++ b/je_load_density/utils/parameterization/__init__.py @@ -0,0 +1,19 @@ +from je_load_density.utils.parameterization.parameter_resolver import ( + ParameterResolver, + parameter_resolver, + register_csv_source, + register_csv_sources, + register_variable, + register_variables, + resolve, +) + +__all__ = [ + "ParameterResolver", + "parameter_resolver", + "resolve", + "register_csv_source", + "register_csv_sources", + "register_variable", + "register_variables", +] diff --git a/je_load_density/utils/parameterization/parameter_resolver.py b/je_load_density/utils/parameterization/parameter_resolver.py new file mode 100644 index 0000000..0f41970 --- /dev/null +++ b/je_load_density/utils/parameterization/parameter_resolver.py @@ -0,0 +1,173 @@ +import csv +import itertools +import os +import re +import threading +from typing import Any, Dict, Iterable, Iterator, List, Optional + +_PLACEHOLDER_PATTERN = re.compile(r"\$\{([^}]+)\}") +_FUNCTION_PATTERN = re.compile(r"^([a-zA-Z_]\w*)\((.*)\)$") + + +class ParameterResolver: + """ + 參數解析器 + Parameter resolver for ${var} placeholders in load test definitions. + + Supports: + ${env.NAME} -> environment variable + ${var.key} -> registered variable + ${csv.source.column} -> next row from CSV source (cycled) + ${faker.method} -> faker output (if faker installed) + ${func(arg)} -> built-in helpers (uuid, now, randint(min,max)) + + Unknown placeholders are left in place so missing data is visible. + """ + + def __init__(self) -> None: + self._variables: Dict[str, Any] = {} + self._csv_sources: Dict[str, Iterator[Dict[str, str]]] = {} + self._lock = threading.Lock() + self._faker = None + + def register_variable(self, name: str, value: Any) -> None: + with self._lock: + self._variables[name] = value + + def register_csv_source(self, name: str, file_path: str, cycle: bool = True) -> None: + rows = self._read_csv(file_path) + with self._lock: + self._csv_sources[name] = itertools.cycle(rows) if cycle else iter(rows) + + @staticmethod + def _read_csv(file_path: str) -> List[Dict[str, str]]: + with open(file_path, "r", encoding="utf-8", newline="") as fh: + reader = csv.DictReader(fh) + return list(reader) + + def _next_csv_row(self, name: str) -> Optional[Dict[str, str]]: + with self._lock: + source = self._csv_sources.get(name) + if source is None: + return None + try: + return next(source) + except StopIteration: + return None + + def _resolve_token(self, token: str) -> Optional[str]: + token = token.strip() + if not token: + return None + + function_match = _FUNCTION_PATTERN.match(token) + if function_match: + return self._resolve_function(function_match.group(1), function_match.group(2)) + + if "." not in token: + value = self._variables.get(token) + return None if value is None else str(value) + + prefix, _, rest = token.partition(".") + prefix = prefix.lower() + + if prefix == "env": + return os.environ.get(rest) + if prefix == "var": + value = self._variables.get(rest) + return None if value is None else str(value) + if prefix == "csv": + source_name, _, column = rest.partition(".") + row = self._next_csv_row(source_name) + if row is None: + return None + return row.get(column) + if prefix == "faker": + return self._resolve_faker(rest) + return None + + def _resolve_function(self, name: str, raw_args: str) -> Optional[str]: + name = name.lower() + args = [a.strip() for a in raw_args.split(",")] if raw_args else [] + if name == "uuid": + import uuid + return str(uuid.uuid4()) + if name == "now": + import datetime + return datetime.datetime.now().isoformat(timespec="seconds") + if name == "randint" and len(args) == 2: + import secrets + low, high = int(args[0]), int(args[1]) + return str(secrets.randbelow(high - low + 1) + low) + return None + + def _resolve_faker(self, method: str) -> Optional[str]: + if self._faker is None: + try: + from faker import Faker + except ImportError: + return None + self._faker = Faker() + provider = getattr(self._faker, method, None) + if provider is None: + return None + try: + return str(provider()) + except Exception: + return None + + def resolve(self, value: Any) -> Any: + """ + Recursively resolve placeholders inside strings, dicts, lists, and tuples. + Non-string scalar types pass through unchanged. + """ + if isinstance(value, str): + return self._resolve_string(value) + if isinstance(value, dict): + return {k: self.resolve(v) for k, v in value.items()} + if isinstance(value, list): + return [self.resolve(item) for item in value] + if isinstance(value, tuple): + return tuple(self.resolve(item) for item in value) + return value + + def _resolve_string(self, value: str) -> str: + def repl(match: re.Match) -> str: + resolved = self._resolve_token(match.group(1)) + return match.group(0) if resolved is None else resolved + + return _PLACEHOLDER_PATTERN.sub(repl, value) + + def clear(self) -> None: + with self._lock: + self._variables.clear() + self._csv_sources.clear() + + +parameter_resolver = ParameterResolver() + + +def resolve(value: Any) -> Any: + return parameter_resolver.resolve(value) + + +def register_variable(name: str, value: Any) -> None: + parameter_resolver.register_variable(name, value) + + +def register_csv_source(name: str, file_path: str, cycle: bool = True) -> None: + parameter_resolver.register_csv_source(name, file_path, cycle) + + +def register_variables(variables: Dict[str, Any]) -> None: + for key, value in variables.items(): + parameter_resolver.register_variable(key, value) + + +def register_csv_sources(sources: Iterable[Dict[str, Any]]) -> None: + for source in sources: + name = source.get("name") + file_path = source.get("file_path") + cycle = source.get("cycle", True) + if name and file_path: + parameter_resolver.register_csv_source(name, file_path, cycle) diff --git a/je_load_density/utils/recording/__init__.py b/je_load_density/utils/recording/__init__.py new file mode 100644 index 0000000..e8e9840 --- /dev/null +++ b/je_load_density/utils/recording/__init__.py @@ -0,0 +1,7 @@ +from je_load_density.utils.recording.har_importer import ( + har_to_action_json, + har_to_tasks, + load_har, +) + +__all__ = ["har_to_action_json", "har_to_tasks", "load_har"] diff --git a/je_load_density/utils/recording/har_importer.py b/je_load_density/utils/recording/har_importer.py new file mode 100644 index 0000000..77b264e --- /dev/null +++ b/je_load_density/utils/recording/har_importer.py @@ -0,0 +1,141 @@ +import json +import re +from typing import Any, Dict, Iterable, List, Optional + +_NON_REQUEST_HEADERS = frozenset({ + "host", "content-length", "connection", + ":authority", ":method", ":path", ":scheme", +}) + + +def load_har(file_path: str) -> Dict[str, Any]: + """ + 讀取 HAR JSON 檔。 + Read a HAR JSON file from disk. + """ + with open(file_path, "r", encoding="utf-8-sig") as fh: + return json.load(fh) + + +def _extract_request_headers(raw_headers: Any) -> Dict[str, str]: + headers: Dict[str, str] = {} + for header in raw_headers or []: + name = str(header.get("name", "")).strip() + value = header.get("value", "") + if name and name.lower() not in _NON_REQUEST_HEADERS: + headers[name] = value + return headers + + +def _attach_post_body(task: Dict[str, Any], post_data: Dict[str, Any]) -> None: + mime = str(post_data.get("mimeType", "")).lower() + text = post_data.get("text") + params = post_data.get("params") + + if "application/json" in mime and text: + try: + task["json"] = json.loads(text) + except json.JSONDecodeError: + task["data"] = text + return + if params: + task["data"] = {p.get("name"): p.get("value") for p in params if p.get("name")} + return + if text: + task["data"] = text + + +def _entry_to_task(entry: Dict[str, Any]) -> Optional[Dict[str, Any]]: + request = entry.get("request") or {} + method = str(request.get("method", "")).lower() + url = request.get("url") + if not method or not url: + return None + + task: Dict[str, Any] = { + "method": method, + "request_url": url, + "name": f"{method.upper()} {_path_only(url)}", + } + headers = _extract_request_headers(request.get("headers")) + if headers: + task["headers"] = headers + + _attach_post_body(task, request.get("postData") or {}) + + expected_status = (entry.get("response") or {}).get("status") + if isinstance(expected_status, int) and expected_status: + task["assertions"] = [{"type": "status_code", "value": expected_status}] + + return task + + +def _path_only(url: str) -> str: + match = re.match(r"^[a-zA-Z]+://[^/]+(/.*)?$", url) + return match.group(1) or "/" if match else url + + +def _filter_entries( + entries: Iterable[Dict[str, Any]], + include: Optional[List[str]], + exclude: Optional[List[str]], +) -> Iterable[Dict[str, Any]]: + include_patterns = [re.compile(p) for p in (include or [])] + exclude_patterns = [re.compile(p) for p in (exclude or [])] + + for entry in entries: + url = ((entry.get("request") or {}).get("url")) or "" + if include_patterns and not any(p.search(url) for p in include_patterns): + continue + if any(p.search(url) for p in exclude_patterns): + continue + yield entry + + +def har_to_tasks( + har: Dict[str, Any], + include: Optional[List[str]] = None, + exclude: Optional[List[str]] = None, +) -> List[Dict[str, Any]]: + """ + 將 HAR 轉成任務清單。 + Convert a HAR document into a list of LoadDensity tasks. + """ + entries = ((har.get("log") or {}).get("entries")) or [] + tasks: List[Dict[str, Any]] = [] + for entry in _filter_entries(entries, include, exclude): + task = _entry_to_task(entry) + if task is not None: + tasks.append(task) + return tasks + + +def har_to_action_json( + har: Dict[str, Any], + user: str = "fast_http_user", + user_count: int = 10, + spawn_rate: int = 5, + test_time: int = 60, + include: Optional[List[str]] = None, + exclude: Optional[List[str]] = None, +) -> Dict[str, Any]: + """ + 將 HAR 轉成 LoadDensity action JSON。 + Convert a HAR document into a complete action JSON ready to feed + into execute_action. + """ + tasks = har_to_tasks(har, include=include, exclude=exclude) + return { + "load_density": [ + [ + "LD_start_test", + { + "user_detail_dict": {"user": user}, + "tasks": {"mode": "sequence", "tasks": tasks}, + "user_count": user_count, + "spawn_rate": spawn_rate, + "test_time": test_time, + }, + ] + ] + } diff --git a/je_load_density/utils/socket_server/load_density_socket_server.py b/je_load_density/utils/socket_server/load_density_socket_server.py index 16a3a5e..a7bc194 100644 --- a/je_load_density/utils/socket_server/load_density_socket_server.py +++ b/je_load_density/utils/socket_server/load_density_socket_server.py @@ -1,7 +1,11 @@ +import hmac import json +import os +import ssl +import struct import sys from socket import AF_INET, SOCK_STREAM -from typing import Any +from typing import Any, Optional import gevent from gevent import monkey @@ -9,28 +13,53 @@ from je_load_density.utils.executor.action_executor import execute_action +_MAX_PAYLOAD_BYTES = 1 << 20 # 1 MiB +_FRAME_HEADER = struct.Struct("!I") +_RESPONSE_TERMINATOR = b"Return_Data_Over_JE\n" +_AUTH_FAILED = object() + class TCPServer: """ 基於 gevent 的 TCP 伺服器 - TCP server based on gevent + TCP server based on gevent. + + Modes: + legacy - single recv up to 8 KiB, raw JSON line, no auth + framed - 4-byte big-endian length prefix + JSON body + framed+tls - wrap socket with TLS (cert/key required) - - 接收 JSON 指令並執行對應動作 - - 支援 "quit_server" 指令來關閉伺服器 + Auth: + Optional shared secret token compared via hmac. Required to + execute privileged commands (quit_server) and any payload + once a token is configured. """ - def __init__(self) -> None: + def __init__( + self, + framed: bool = False, + token: Optional[str] = None, + certfile: Optional[str] = None, + keyfile: Optional[str] = None, + ) -> None: self.close_flag: bool = False + self.framed: bool = framed + self.token: Optional[str] = token + self.certfile = certfile + self.keyfile = keyfile self.server: socket.socket = socket.socket(AF_INET, SOCK_STREAM) + self._tls_context: Optional[ssl.SSLContext] = None + if certfile and keyfile: + # create_default_context(Purpose.CLIENT_AUTH) is the stdlib + # helper for a TLS server that may verify client certs; it + # ships hardened defaults (TLS 1.2+, secure cipher list, no + # compression). minimum_version is pinned explicitly as a + # belt-and-braces guard if the default ever loosens. + self._tls_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) # NOSONAR S4423 - hardened defaults pinned below + self._tls_context.minimum_version = ssl.TLSVersion.TLSv1_2 + self._tls_context.load_cert_chain(certfile=certfile, keyfile=keyfile) def socket_server(self, host: str, port: int) -> None: - """ - 啟動伺服器 - Start the TCP server - - :param host: 伺服器主機位址 (Server host) - :param port: 伺服器埠號 (Server port) - """ self.server.bind((host, port)) self.server.listen() print(f"Server started on {host}:{port}", flush=True) @@ -38,6 +67,13 @@ def socket_server(self, host: str, port: int) -> None: while not self.close_flag: try: connection, _ = self.server.accept() + if self._tls_context is not None: + try: + connection = self._tls_context.wrap_socket(connection, server_side=True) + except ssl.SSLError as error: + print(f"TLS handshake failed: {error}", file=sys.stderr) + connection.close() + continue gevent.spawn(self.handle, connection) except Exception as error: print(f"Server error: {error}", file=sys.stderr) @@ -46,50 +82,130 @@ def socket_server(self, host: str, port: int) -> None: self.server.close() print("Server shutdown complete", flush=True) - def handle(self, connection: socket.socket) -> None: - """ - 處理單一連線 - Handle a single connection - - :param connection: 客戶端連線 (Client connection) - """ + def _read_frame(self, connection) -> Optional[bytes]: + if not self.framed: + data = connection.recv(8192) + return data or None + header = self._read_exact(connection, _FRAME_HEADER.size) + if header is None: + return None + (length,) = _FRAME_HEADER.unpack(header) + if length == 0 or length > _MAX_PAYLOAD_BYTES: + return None + return self._read_exact(connection, length) + + @staticmethod + def _read_exact(connection, size: int) -> Optional[bytes]: + buffer = bytearray() + while len(buffer) < size: + chunk = connection.recv(size - len(buffer)) + if not chunk: + return None + buffer.extend(chunk) + return bytes(buffer) + + def _send_frame(self, connection, payload: bytes) -> None: + if self.framed: + connection.sendall(_FRAME_HEADER.pack(len(payload)) + payload) + else: + connection.sendall(payload) + + def _check_token(self, supplied: Any) -> bool: + if self.token is None: + return True + if not isinstance(supplied, str): + return False + return hmac.compare_digest(self.token, supplied) + + def handle(self, connection) -> None: try: - connection_data = connection.recv(8192) - if not connection_data: + raw = self._read_frame(connection) + if not raw: return - command_string = connection_data.strip().decode("utf-8") - print(f"Command received: {command_string}", flush=True) + command_string = raw.strip().decode("utf-8", errors="replace") + print(f"Command received: {len(command_string)} bytes", flush=True) if command_string == "quit_server": - self.close_flag = True - connection.send(b"Server shutting down\n") - print("Now quit server", flush=True) - else: - try: - execute_str: Any = json.loads(command_string) - if execute_str is not None: - for execute_return in execute_action(execute_str).values(): - connection.send(f"{execute_return}\n".encode("utf-8")) - connection.send(b"Return_Data_Over_JE\n") - except Exception as error: - connection.send(f"Error: {error}\n".encode("utf-8")) - connection.send(b"Return_Data_Over_JE\n") + self._handle_legacy_quit(connection) + return + command = self._authorise_payload(connection, command_string) + if command is _AUTH_FAILED: + return + if command is None: + self._send_frame(connection, _RESPONSE_TERMINATOR) + return + + self._dispatch_command(connection, command) finally: connection.close() + def _handle_legacy_quit(self, connection) -> None: + if self.token is not None: + self._send_frame(connection, b"Error: token required\n") + return + self.close_flag = True + self._send_frame(connection, b"Server shutting down\n") + print("Now quit server", flush=True) + + def _authorise_payload(self, connection, command_string: str): + """ + Decode the JSON envelope, enforce the token, and return the + actual command to execute. Returns ``_AUTH_FAILED`` when the + client has already been answered (bad JSON, missing/bad token, + or a quit op was honoured). + """ + try: + payload = json.loads(command_string) + except json.JSONDecodeError as error: + self._send_frame(connection, f"Error: {error}\n".encode("utf-8")) + self._send_frame(connection, _RESPONSE_TERMINATOR) + return _AUTH_FAILED + + if isinstance(payload, dict) and ("token" in payload or "command" in payload): + if not self._check_token(payload.get("token")): + self._send_frame(connection, b"Error: unauthorised\n") + return _AUTH_FAILED + if payload.get("op") == "quit": + self.close_flag = True + self._send_frame(connection, b"Server shutting down\n") + return _AUTH_FAILED + return payload.get("command") + + if self.token is not None: + self._send_frame(connection, b"Error: token required\n") + return _AUTH_FAILED -def start_load_density_socket_server(host: str = "localhost", port: int = 9940) -> TCPServer: + return payload + + def _dispatch_command(self, connection, command) -> None: + try: + for execute_return in execute_action(command).values(): + self._send_frame(connection, f"{execute_return}\n".encode("utf-8")) + except Exception as error: + self._send_frame(connection, f"Error: {error}\n".encode("utf-8")) + self._send_frame(connection, _RESPONSE_TERMINATOR) + + +def start_load_density_socket_server( + host: str = "localhost", + port: int = 9940, + framed: bool = False, + token: Optional[str] = None, + certfile: Optional[str] = None, + keyfile: Optional[str] = None, +) -> TCPServer: """ 啟動 LoadDensity TCP 伺服器 - Start LoadDensity TCP server + Start LoadDensity TCP server. - :param host: 主機位址 (Host) - :param port: 埠號 (Port) - :return: TCPServer 實例 (TCPServer instance) + The token may also come from the LOAD_DENSITY_SOCKET_TOKEN + environment variable so secrets are not embedded in callers. """ monkey.patch_all() - server = TCPServer() + if token is None: + token = os.environ.get("LOAD_DENSITY_SOCKET_TOKEN") + server = TCPServer(framed=framed, token=token, certfile=certfile, keyfile=keyfile) server.socket_server(host, port) - return server \ No newline at end of file + return server diff --git a/je_load_density/utils/test_record/sqlite_persistence.py b/je_load_density/utils/test_record/sqlite_persistence.py new file mode 100644 index 0000000..51c3184 --- /dev/null +++ b/je_load_density/utils/test_record/sqlite_persistence.py @@ -0,0 +1,129 @@ +import json +import sqlite3 +import threading +from datetime import datetime, timezone +from typing import Any, Dict, Iterable, List, Optional + +from je_load_density.utils.test_record.test_record_class import test_record_instance + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS load_density_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at TEXT NOT NULL, + label TEXT, + metadata_json TEXT +); +CREATE TABLE IF NOT EXISTS load_density_records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id INTEGER NOT NULL, + outcome TEXT NOT NULL, + method TEXT, + test_url TEXT, + name TEXT, + status_code TEXT, + response_time_ms REAL, + response_length INTEGER, + error TEXT, + FOREIGN KEY (run_id) REFERENCES load_density_runs(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_records_run_id ON load_density_records(run_id); +CREATE INDEX IF NOT EXISTS idx_records_name ON load_density_records(name); +""" + +_lock = threading.Lock() + + +def _connect(database_path: str) -> sqlite3.Connection: + connection = sqlite3.connect(database_path) + connection.row_factory = sqlite3.Row + return connection + + +def _ensure_schema(connection: sqlite3.Connection) -> None: + with connection: + connection.executescript(_SCHEMA) + + +def persist_records( + database_path: str, + label: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> int: + """ + 將目前的測試紀錄寫入 SQLite。 + Persist the current test records into SQLite. Returns the run id. + """ + started_at = datetime.now(tz=timezone.utc).isoformat(timespec="seconds") + metadata_json = json.dumps(metadata or {}, ensure_ascii=False) + + with _lock: + connection = _connect(database_path) + try: + _ensure_schema(connection) + with connection: + cursor = connection.execute( + "INSERT INTO load_density_runs (started_at, label, metadata_json) VALUES (?, ?, ?)", + (started_at, label, metadata_json), + ) + run_id = int(cursor.lastrowid) + + rows: List[tuple] = [] + for record in test_record_instance.test_record_list: + rows.append(_to_row(run_id, "success", record)) + for record in test_record_instance.error_record_list: + rows.append(_to_row(run_id, "failure", record)) + + if rows: + connection.executemany( + "INSERT INTO load_density_records " + "(run_id, outcome, method, test_url, name, status_code, " + " response_time_ms, response_length, error) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + rows, + ) + return run_id + finally: + connection.close() + + +def _to_row(run_id: int, outcome: str, record: Dict[str, Any]) -> tuple: + return ( + run_id, + outcome, + record.get("Method"), + record.get("test_url"), + record.get("name"), + record.get("status_code"), + record.get("response_time_ms"), + record.get("response_length"), + record.get("error"), + ) + + +def list_runs(database_path: str, limit: int = 20) -> List[Dict[str, Any]]: + connection = _connect(database_path) + try: + _ensure_schema(connection) + cursor = connection.execute( + "SELECT id, started_at, label, metadata_json FROM load_density_runs " + "ORDER BY id DESC LIMIT ?", + (limit,), + ) + return [dict(row) for row in cursor.fetchall()] + finally: + connection.close() + + +def fetch_run_records(database_path: str, run_id: int) -> Iterable[Dict[str, Any]]: + connection = _connect(database_path) + try: + _ensure_schema(connection) + cursor = connection.execute( + "SELECT outcome, method, test_url, name, status_code, " + " response_time_ms, response_length, error " + "FROM load_density_records WHERE run_id = ?", + (run_id,), + ) + return [dict(row) for row in cursor.fetchall()] + finally: + connection.close() diff --git a/je_load_density/wrapper/create_locust_env/create_locust_env.py b/je_load_density/wrapper/create_locust_env/create_locust_env.py index abe59a2..1a20030 100644 --- a/je_load_density/wrapper/create_locust_env/create_locust_env.py +++ b/je_load_density/wrapper/create_locust_env/create_locust_env.py @@ -1,55 +1,108 @@ -from typing import List +from typing import Any, Dict, List, Optional import gevent -from locust import User -from locust import events +from locust import User, events from locust.env import Environment from locust.log import setup_logging -from locust.stats import stats_printer, stats_history +from locust.stats import stats_history, stats_printer from je_load_density.utils.logging.loggin_instance import load_density_logger setup_logging("INFO", None) -def prepare_env(user_class: List[User], user_count: int = 50, spawn_rate: int = 10, test_time: int = 60, - web_ui_dict: dict = None, - **kwargs): +def prepare_env( + user_class: List[User], + user_count: int = 50, + spawn_rate: int = 10, + test_time: Optional[int] = 60, + web_ui_dict: Optional[Dict[str, Any]] = None, + runner_mode: str = "local", + master_bind_host: str = "*", + master_bind_port: int = 5557, + master_host: str = "127.0.0.1", + master_port: int = 5557, + expected_workers: int = 0, + **kwargs, +): """ - :param user_class: locust user class - :param user_count: how many user we want to spawn - :param spawn_rate: one time will spawn how many user - :param test_time: total test run time - :param web_ui_dict: web ui dict include host and port like {"host": "127.0.0.1", "port": 8089} - :param kwargs: to catch unknown param - :return: None + 啟動 Locust 環境,支援 local / master / worker 三種模式。 + Prepare a Locust environment in local, master, or worker mode. """ load_density_logger.info( - f"prepare_env, user_class: {user_class}, user_count: {user_count}, spawn_rate: {spawn_rate}, " - f"test_time: {test_time}, web_ui_dict: {web_ui_dict}" + f"prepare_env mode={runner_mode}, user_class={user_class}, user_count={user_count}, " + f"spawn_rate={spawn_rate}, test_time={test_time}, web_ui_dict={web_ui_dict}" ) - env = create_env(user_class, another_event=events) + + env = create_env(user_class, runner_mode=runner_mode, + master_bind_host=master_bind_host, master_bind_port=master_bind_port, + master_host=master_host, master_port=master_port) + + if runner_mode == "worker": + env.runner.greenlet.join() + return env + + if runner_mode == "master" and expected_workers > 0: + _wait_for_workers(env, expected_workers) + env.runner.start(user_count, spawn_rate=spawn_rate) + if web_ui_dict is not None: env.create_web_ui(web_ui_dict.get("host", "127.0.0.1"), web_ui_dict.get("port", 8089)) + if test_time is not None: gevent.spawn_later(test_time, env.runner.quit) + env.runner.greenlet.join() - if web_ui_dict is not None: + + if web_ui_dict is not None and getattr(env, "web_ui", None) is not None: env.web_ui.stop() + return env + -def create_env(user_class: List[User], another_event: events = events): +def create_env( + user_class: List[User], + another_event: events = events, + runner_mode: str = "local", + master_bind_host: str = "*", + master_bind_port: int = 5557, + master_host: str = "127.0.0.1", + master_port: int = 5557, +): """ - :param another_event: you can use your locust event setting but don't change locust request event - :param user_class: locust user class - :return: locust Environment(user_class, events) events is default event + 建立 Locust Environment 並依模式建立 runner。 + Create Locust Environment and build the matching runner. """ load_density_logger.info( - f"create_env, user_class: {user_class}, another_event: {another_event}" + f"create_env mode={runner_mode}, user_class={user_class}, another_event={another_event}" ) env = Environment(user_classes=[user_class], events=another_event) - env.create_local_runner() - gevent.spawn(stats_printer(env.stats)) - gevent.spawn(stats_history, env.runner) + + if runner_mode == "master": + env.create_master_runner(master_bind_host=master_bind_host, master_bind_port=master_bind_port) + elif runner_mode == "worker": + env.create_worker_runner(master_host=master_host, master_port=master_port) + else: + env.create_local_runner() + + if runner_mode != "worker": + gevent.spawn(stats_printer(env.stats)) + gevent.spawn(stats_history, env.runner) return env + + +def _wait_for_workers(env, expected_workers: int, timeout: float = 60.0) -> None: + """ + 等待指定數量的 worker 加入後再開始壓測。 + """ + deadline = gevent.time.time() + timeout + while gevent.time.time() < deadline: + workers = getattr(env.runner, "clients", None) + connected = len(workers) if workers is not None else 0 + if connected >= expected_workers: + return + gevent.sleep(0.5) + load_density_logger.warning( + f"only {connected}/{expected_workers} workers joined within {timeout}s; starting anyway" + ) diff --git a/je_load_density/wrapper/event/request_hook.py b/je_load_density/wrapper/event/request_hook.py index 3dab409..bec0c0e 100644 --- a/je_load_density/wrapper/event/request_hook.py +++ b/je_load_density/wrapper/event/request_hook.py @@ -31,7 +31,9 @@ def request_hook( "text": str(response.text), "content": str(response.content), "headers": str(response.headers), - "error": None, # 成功時 error 為 None + "response_time_ms": float(response_time or 0), + "response_length": int(response_length or 0), + "error": None, } ) else: @@ -43,6 +45,8 @@ def request_hook( "name": str(name), "status_code": str(response.status_code) if response else None, "text": str(response.text) if response else None, - "error": str(exception), # 失敗時紀錄 exception + "response_time_ms": float(response_time or 0), + "response_length": int(response_length or 0), + "error": str(exception), } ) \ No newline at end of file diff --git a/je_load_density/wrapper/proxy/proxy_user.py b/je_load_density/wrapper/proxy/proxy_user.py index 6f1e2e3..b3596fe 100644 --- a/je_load_density/wrapper/proxy/proxy_user.py +++ b/je_load_density/wrapper/proxy/proxy_user.py @@ -1,6 +1,10 @@ from typing import Dict, Any from je_load_density.wrapper.proxy.user.fast_http_user_proxy import ProxyFastHTTPUser from je_load_density.wrapper.proxy.user.http_user_proxy import ProxyHTTPUser +from je_load_density.wrapper.proxy.user.grpc_user_proxy import ProxyGrpcUser +from je_load_density.wrapper.proxy.user.mqtt_user_proxy import ProxyMqttUser +from je_load_density.wrapper.proxy.user.socket_user_proxy import ProxySocketUser +from je_load_density.wrapper.proxy.user.websocket_user_proxy import ProxyWebSocketUser class LocustUserProxy: @@ -17,6 +21,10 @@ def __init__(self) -> None: self.user_dict: Dict[str, Any] = { "fast_http_user": ProxyFastHTTPUser(), "http_user": ProxyHTTPUser(), + "websocket_user": ProxyWebSocketUser(), + "grpc_user": ProxyGrpcUser(), + "mqtt_user": ProxyMqttUser(), + "socket_user": ProxySocketUser(), } def get_user(self, user_type: str) -> Any: diff --git a/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py b/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py index a8aee62..236280f 100644 --- a/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py +++ b/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, Optional +from typing import Any, Dict, Optional class ProxyFastHTTPUser: @@ -11,18 +11,23 @@ class ProxyFastHTTPUser: """ def __init__(self) -> None: - # 使用者細節 (User details) self.user_detail_dict: Optional[Dict[str, Any]] = None - # 任務字典 (Tasks dictionary, HTTP method -> request config) - self.tasks: Optional[Dict[str, Dict[str, Any]]] = None + self.tasks: Optional[Any] = None + self.host: Optional[str] = None + self.extra: Dict[str, Any] = {} - def configure(self, user_detail_dict: Dict[str, Any], tasks: Dict[str, Dict[str, Any]]) -> None: + def configure( + self, + user_detail_dict: Dict[str, Any], + tasks: Optional[Any] = None, + host: Optional[str] = None, + **kwargs: Any, + ) -> None: """ 設定使用者細節與任務 Configure user details and tasks - - :param user_detail_dict: 使用者細節字典 (User details dictionary) - :param tasks: 任務字典 (Tasks dictionary, e.g. {"get": {"request_url": "..."}, ...}) """ self.user_detail_dict = user_detail_dict - self.tasks = tasks \ No newline at end of file + self.tasks = tasks + self.host = host + self.extra = {k: v for k, v in kwargs.items() if k not in {"variables", "csv_sources"}} diff --git a/je_load_density/wrapper/proxy/user/grpc_user_proxy.py b/je_load_density/wrapper/proxy/user/grpc_user_proxy.py new file mode 100644 index 0000000..8072047 --- /dev/null +++ b/je_load_density/wrapper/proxy/user/grpc_user_proxy.py @@ -0,0 +1,26 @@ +from typing import Any, Dict, Optional + + +class ProxyGrpcUser: + """ + 代理 gRPC 使用者類別 + Proxy gRPC User class + """ + + def __init__(self) -> None: + self.user_detail_dict: Optional[Dict[str, Any]] = None + self.tasks: Optional[Any] = None + self.host: Optional[str] = None + self.extra: Dict[str, Any] = {} + + def configure( + self, + user_detail_dict: Dict[str, Any], + tasks: Optional[Any] = None, + host: Optional[str] = None, + **kwargs: Any, + ) -> None: + self.user_detail_dict = user_detail_dict + self.tasks = tasks + self.host = host + self.extra = {k: v for k, v in kwargs.items() if k not in {"variables", "csv_sources"}} diff --git a/je_load_density/wrapper/proxy/user/http_user_proxy.py b/je_load_density/wrapper/proxy/user/http_user_proxy.py index 4e99f0f..c954c4d 100644 --- a/je_load_density/wrapper/proxy/user/http_user_proxy.py +++ b/je_load_density/wrapper/proxy/user/http_user_proxy.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, Optional +from typing import Any, Dict, Optional class ProxyHTTPUser: @@ -11,18 +11,23 @@ class ProxyHTTPUser: """ def __init__(self) -> None: - # 使用者細節 (User details) self.user_detail_dict: Optional[Dict[str, Any]] = None - # 任務字典 (Tasks dictionary, HTTP method -> request config) - self.tasks: Optional[Dict[str, Dict[str, Any]]] = None + self.tasks: Optional[Any] = None + self.host: Optional[str] = None + self.extra: Dict[str, Any] = {} - def configure(self, user_detail_dict: Dict[str, Any], tasks: Dict[str, Dict[str, Any]]) -> None: + def configure( + self, + user_detail_dict: Dict[str, Any], + tasks: Optional[Any] = None, + host: Optional[str] = None, + **kwargs: Any, + ) -> None: """ 設定使用者細節與任務 Configure user details and tasks - - :param user_detail_dict: 使用者細節字典 (User details dictionary) - :param tasks: 任務字典 (Tasks dictionary, e.g. {"get": {"request_url": "..."}, ...}) """ self.user_detail_dict = user_detail_dict - self.tasks = tasks \ No newline at end of file + self.tasks = tasks + self.host = host + self.extra = {k: v for k, v in kwargs.items() if k not in {"variables", "csv_sources"}} diff --git a/je_load_density/wrapper/proxy/user/mqtt_user_proxy.py b/je_load_density/wrapper/proxy/user/mqtt_user_proxy.py new file mode 100644 index 0000000..ede7744 --- /dev/null +++ b/je_load_density/wrapper/proxy/user/mqtt_user_proxy.py @@ -0,0 +1,26 @@ +from typing import Any, Dict, Optional + + +class ProxyMqttUser: + """ + 代理 MQTT 使用者類別 + Proxy MQTT User class + """ + + def __init__(self) -> None: + self.user_detail_dict: Optional[Dict[str, Any]] = None + self.tasks: Optional[Any] = None + self.host: Optional[str] = None + self.extra: Dict[str, Any] = {} + + def configure( + self, + user_detail_dict: Dict[str, Any], + tasks: Optional[Any] = None, + host: Optional[str] = None, + **kwargs: Any, + ) -> None: + self.user_detail_dict = user_detail_dict + self.tasks = tasks + self.host = host + self.extra = {k: v for k, v in kwargs.items() if k not in {"variables", "csv_sources"}} diff --git a/je_load_density/wrapper/proxy/user/socket_user_proxy.py b/je_load_density/wrapper/proxy/user/socket_user_proxy.py new file mode 100644 index 0000000..8c4c875 --- /dev/null +++ b/je_load_density/wrapper/proxy/user/socket_user_proxy.py @@ -0,0 +1,26 @@ +from typing import Any, Dict, Optional + + +class ProxySocketUser: + """ + 代理 Socket 使用者類別 (TCP/UDP) + Proxy raw socket User class + """ + + def __init__(self) -> None: + self.user_detail_dict: Optional[Dict[str, Any]] = None + self.tasks: Optional[Any] = None + self.host: Optional[str] = None + self.extra: Dict[str, Any] = {} + + def configure( + self, + user_detail_dict: Dict[str, Any], + tasks: Optional[Any] = None, + host: Optional[str] = None, + **kwargs: Any, + ) -> None: + self.user_detail_dict = user_detail_dict + self.tasks = tasks + self.host = host + self.extra = {k: v for k, v in kwargs.items() if k not in {"variables", "csv_sources"}} diff --git a/je_load_density/wrapper/proxy/user/websocket_user_proxy.py b/je_load_density/wrapper/proxy/user/websocket_user_proxy.py new file mode 100644 index 0000000..92ddcd6 --- /dev/null +++ b/je_load_density/wrapper/proxy/user/websocket_user_proxy.py @@ -0,0 +1,26 @@ +from typing import Any, Dict, Optional + + +class ProxyWebSocketUser: + """ + 代理 WebSocket 使用者類別 + Proxy WebSocket User class + """ + + def __init__(self) -> None: + self.user_detail_dict: Optional[Dict[str, Any]] = None + self.tasks: Optional[Any] = None + self.host: Optional[str] = None + self.extra: Dict[str, Any] = {} + + def configure( + self, + user_detail_dict: Dict[str, Any], + tasks: Optional[Any] = None, + host: Optional[str] = None, + **kwargs: Any, + ) -> None: + self.user_detail_dict = user_detail_dict + self.tasks = tasks + self.host = host + self.extra = {k: v for k, v in kwargs.items() if k not in {"variables", "csv_sources"}} diff --git a/je_load_density/wrapper/start_wrapper/start_test.py b/je_load_density/wrapper/start_wrapper/start_test.py index 2eeb3e6..cc093e3 100644 --- a/je_load_density/wrapper/start_wrapper/start_test.py +++ b/je_load_density/wrapper/start_wrapper/start_test.py @@ -1,8 +1,41 @@ -from typing import Dict, Any, Optional +from typing import Any, Dict, Optional + from je_load_density.utils.logging.loggin_instance import load_density_logger from je_load_density.wrapper.create_locust_env.create_locust_env import prepare_env -from je_load_density.wrapper.user_template.fast_http_user_template import FastHttpUserWrapper, set_wrapper_fasthttp_user -from je_load_density.wrapper.user_template.http_user_template import HttpUserWrapper, set_wrapper_http_user +from je_load_density.wrapper.user_template.fast_http_user_template import ( + FastHttpUserWrapper, + set_wrapper_fasthttp_user, +) +from je_load_density.wrapper.user_template.grpc_user_template import ( + GrpcUserWrapper, + set_wrapper_grpc_user, +) +from je_load_density.wrapper.user_template.http_user_template import ( + HttpUserWrapper, + set_wrapper_http_user, +) +from je_load_density.wrapper.user_template.mqtt_user_template import ( + MqttUserWrapper, + set_wrapper_mqtt_user, +) +from je_load_density.wrapper.user_template.socket_user_template import ( + SocketUserWrapper, + set_wrapper_socket_user, +) +from je_load_density.wrapper.user_template.websocket_user_template import ( + WebSocketUserWrapper, + set_wrapper_websocket_user, +) + + +_USER_REGISTRY: Dict[str, Dict[str, Any]] = { + "fast_http_user": {"actually_user": FastHttpUserWrapper, "init": set_wrapper_fasthttp_user}, + "http_user": {"actually_user": HttpUserWrapper, "init": set_wrapper_http_user}, + "websocket_user": {"actually_user": WebSocketUserWrapper, "init": set_wrapper_websocket_user}, + "grpc_user": {"actually_user": GrpcUserWrapper, "init": set_wrapper_grpc_user}, + "mqtt_user": {"actually_user": MqttUserWrapper, "init": set_wrapper_mqtt_user}, + "socket_user": {"actually_user": SocketUserWrapper, "init": set_wrapper_socket_user}, +} def start_test( @@ -11,58 +44,55 @@ def start_test( spawn_rate: int = 10, test_time: Optional[int] = 60, web_ui_dict: Optional[Dict[str, Any]] = None, - **kwargs + runner_mode: str = "local", + master_bind_host: str = "*", + master_bind_port: int = 5557, + master_host: str = "127.0.0.1", + master_port: int = 5557, + expected_workers: int = 0, + **kwargs, ) -> Dict[str, Any]: """ - 啟動壓力測試 - Start load test + 啟動壓力測試。Start load test. - :param user_detail_dict: 使用者設定字典 (User detail dictionary) - :param user_count: 使用者數量 (Number of users to spawn) - :param spawn_rate: 每秒生成使用者數量 (Spawn rate per second) - :param test_time: 測試持續時間 (Test duration in seconds) - :param web_ui_dict: Web UI 設定,例如 {"host": "127.0.0.1", "port": 8089} - :param kwargs: 其他參數 (extra parameters) - :return: 測試設定摘要字典 (Summary dictionary of test configuration) + runner_mode: local | master | worker """ load_density_logger.info( f"start_test, user_detail_dict={user_detail_dict}, user_count={user_count}, " - f"spawn_rate={spawn_rate}, test_time={test_time}, web_ui_dict={web_ui_dict}, params={kwargs}" + f"spawn_rate={spawn_rate}, test_time={test_time}, web_ui_dict={web_ui_dict}, " + f"runner_mode={runner_mode}, params={kwargs}" ) - # 使用者類型映射 (User type mapping) - user_dict = { - "fast_http_user": {"actually_user": FastHttpUserWrapper, "init": set_wrapper_fasthttp_user}, - "http_user": {"actually_user": HttpUserWrapper, "init": set_wrapper_http_user}, - } - user_type = user_detail_dict.get("user", "fast_http_user") - user = user_dict.get(user_type) - + user = _USER_REGISTRY.get(user_type) if user is None: raise ValueError(f"Unsupported user type: {user_type}") actually_user = user["actually_user"] init_function = user["init"] - # 初始化使用者設定 (Initialize user configuration) init_function(user_detail_dict, **kwargs) - # 建立並執行測試環境 (Create and run test environment) prepare_env( user_class=actually_user, user_count=user_count, spawn_rate=spawn_rate, test_time=test_time, web_ui_dict=web_ui_dict, - **kwargs + runner_mode=runner_mode, + master_bind_host=master_bind_host, + master_bind_port=master_bind_port, + master_host=master_host, + master_port=master_port, + expected_workers=expected_workers, + **kwargs, ) - # 回傳結構化結果 (Return structured result) return { "user_detail": user_detail_dict, "user_count": user_count, "spawn_rate": spawn_rate, "test_time": test_time, "web_ui": web_ui_dict, - } \ No newline at end of file + "runner_mode": runner_mode, + } diff --git a/je_load_density/wrapper/user_template/fast_http_user_template.py b/je_load_density/wrapper/user_template/fast_http_user_template.py index 9e90614..2dcb28e 100644 --- a/je_load_density/wrapper/user_template/fast_http_user_template.py +++ b/je_load_density/wrapper/user_template/fast_http_user_template.py @@ -1,6 +1,13 @@ -from typing import Dict, Any +from typing import Any, Dict + from locust import FastHttpUser, between, task + +from je_load_density.utils.parameterization import ( + register_csv_sources, + register_variables, +) from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy +from je_load_density.wrapper.user_template.scenario_runner import run_scenario def set_wrapper_fasthttp_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: @@ -8,6 +15,11 @@ def set_wrapper_fasthttp_user(user_detail_dict: Dict[str, Any], **kwargs) -> typ 設定 FastHttpUser 的代理使用者 Configure FastHttpUser proxy user """ + if isinstance(kwargs.get("variables"), dict): + register_variables(kwargs["variables"]) + if isinstance(kwargs.get("csv_sources"), list): + register_csv_sources(kwargs["csv_sources"]) + locust_wrapper_proxy.user_dict.get("fast_http_user").configure(user_detail_dict, **kwargs) return FastHttpUserWrapper @@ -23,7 +35,6 @@ class FastHttpUserWrapper(FastHttpUser): def __init__(self, environment): super().__init__(environment) - # HTTP 方法映射 (HTTP method mapping) self.method: Dict[str, Any] = { "get": self.client.get, "post": self.client.post, @@ -36,17 +47,7 @@ def __init__(self, environment): @task def test(self) -> None: - """ - 執行測試任務 - Execute test tasks - """ proxy_user = locust_wrapper_proxy.user_dict.get("fast_http_user") if not proxy_user or not proxy_user.tasks: return - - for test_task_method, test_task_data in proxy_user.tasks.items(): - http_method = self.method.get(str(test_task_method).lower()) - if http_method and isinstance(test_task_data, dict): - request_url = test_task_data.get("request_url") - if request_url: - http_method(request_url) \ No newline at end of file + run_scenario(self.method, proxy_user.tasks) diff --git a/je_load_density/wrapper/user_template/grpc_user_template.py b/je_load_density/wrapper/user_template/grpc_user_template.py new file mode 100644 index 0000000..4baebf4 --- /dev/null +++ b/je_load_density/wrapper/user_template/grpc_user_template.py @@ -0,0 +1,165 @@ +import importlib +import re +import time +from typing import Any, Dict, Tuple + +from locust import User, between, task + +from je_load_density.utils.logging.loggin_instance import load_density_logger +from je_load_density.utils.parameterization import ( + parameter_resolver, + register_csv_sources, + register_variables, +) +from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy + + +def set_wrapper_grpc_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: + """ + 設定 gRPC User 的代理使用者 + Configure gRPC User proxy + """ + if isinstance(kwargs.get("variables"), dict): + register_variables(kwargs["variables"]) + if isinstance(kwargs.get("csv_sources"), list): + register_csv_sources(kwargs["csv_sources"]) + + locust_wrapper_proxy.user_dict.get("grpc_user").configure(user_detail_dict, **kwargs) + return GrpcUserWrapper + + +_SAFE_DOTTED_PATH = re.compile(r"^[A-Za-z_]\w*(\.[A-Za-z_]\w*)*$") + + +def _import_dotted(path: str) -> Any: + """ + Resolve a dotted ``module.attr`` path supplied in a load-test scenario. + + The gRPC user genuinely needs to load operator-authored stub + modules at runtime, so a static import literal is not viable. We + accept this by validating that ``path`` is a syntactically safe + Python identifier chain (no separators, no relative dots, no + dunders bridged via traversal) before delegating to importlib. + """ + if not isinstance(path, str) or not _SAFE_DOTTED_PATH.match(path): + raise ImportError(f"invalid dotted import path: {path!r}") + module_name, _, attr = path.rpartition(".") + if not module_name: + raise ImportError(f"invalid dotted import path: {path!r}") + # nosemgrep: python.lang.security.audit.non-literal-import.non-literal-import + module = importlib.import_module(module_name) + return getattr(module, attr) + + +class GrpcUserWrapper(User): + """ + Locust gRPC User 包裝類別 + Locust gRPC User wrapper class + + Each task entry should look like:: + + { + "name": "say_hello", + "stub_path": "pkg.greeter_pb2_grpc.GreeterStub", + "request_path": "pkg.greeter_pb2.HelloRequest", + "method": "SayHello", + "payload": {"name": "world"}, + "metadata": [["x-token", "..."]], + "timeout": 10 + } + """ + + host = "localhost:50051" + wait_time = between(0.1, 0.2) + + def __init__(self, environment): + super().__init__(environment) + self._channel = None + self._target = "" + + def _ensure_grpc(self): + try: + import grpc + except ImportError as error: + raise RuntimeError("grpcio is required for GrpcUser") from error + return grpc + + def _ensure_channel(self, target: str): + grpc = self._ensure_grpc() + if self._channel is None or self._target != target: + if self._channel is not None: + try: + self._channel.close() + except Exception as error: + load_density_logger.debug(f"grpc channel close before reconnect failed: {error!r}") + self._channel = grpc.insecure_channel(target) + self._target = target + return self._channel + + def _fire(self, name: str, target: str, start: float, length: int, exception: Exception = None) -> None: + self.environment.events.request.fire( + request_type="GRPC", + name=name, + response_time=(time.monotonic() - start) * 1000, + response_length=length, + exception=exception, + context={}, + url=target, + response=None, + start_time=start, + ) + + def _do_step(self, raw_task: Dict[str, Any]) -> None: + step = parameter_resolver.resolve(raw_task) + target = step.get("target") or step.get("host") or self.host + method_name = step.get("method", "") + stub_path = step.get("stub_path", "") + request_path = step.get("request_path", "") + payload = step.get("payload") or {} + metadata = _coerce_metadata(step.get("metadata")) + timeout = float(step.get("timeout", 10)) + name = step.get("name") or f"{stub_path}.{method_name}" + + start = time.monotonic() + try: + channel = self._ensure_channel(target) + stub_cls = _import_dotted(stub_path) + request_cls = _import_dotted(request_path) + stub = stub_cls(channel) + method = getattr(stub, method_name) + + request = request_cls(**payload) if isinstance(payload, dict) else request_cls() + response = method(request, timeout=timeout, metadata=metadata) + length = response.ByteSize() if hasattr(response, "ByteSize") else 0 + self._fire(name, target, start, length) + except Exception as error: + self._fire(name, target, start, 0, error) + + @task + def run_tasks(self) -> None: + proxy_user = locust_wrapper_proxy.user_dict.get("grpc_user") + if not proxy_user or not proxy_user.tasks: + return + tasks = proxy_user.tasks + if isinstance(tasks, dict) and "tasks" in tasks: + tasks = tasks.get("tasks") or [] + if not isinstance(tasks, list): + load_density_logger.warning("grpc_user.tasks must be a list") + return + for raw_task in tasks: + if isinstance(raw_task, dict): + self._do_step(raw_task) + + +def _coerce_metadata(metadata: Any) -> Tuple[Tuple[str, str], ...]: + if not metadata: + return () + if isinstance(metadata, dict): + return tuple((str(k), str(v)) for k, v in metadata.items()) + if isinstance(metadata, list): + result = [] + for item in metadata: + if isinstance(item, (list, tuple)) and len(item) == 2: + result.append((str(item[0]), str(item[1]))) + return tuple(result) + return () diff --git a/je_load_density/wrapper/user_template/http_user_template.py b/je_load_density/wrapper/user_template/http_user_template.py index 0e64999..9fe6e9c 100644 --- a/je_load_density/wrapper/user_template/http_user_template.py +++ b/je_load_density/wrapper/user_template/http_user_template.py @@ -1,8 +1,13 @@ -from typing import Dict, Any +from typing import Any, Dict -from locust import HttpUser, task, between +from locust import HttpUser, between, task +from je_load_density.utils.parameterization import ( + register_csv_sources, + register_variables, +) from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy +from je_load_density.wrapper.user_template.scenario_runner import run_scenario def set_wrapper_http_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: @@ -10,6 +15,11 @@ def set_wrapper_http_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: 設定 HttpUser 的代理使用者 Configure HttpUser proxy user """ + if isinstance(kwargs.get("variables"), dict): + register_variables(kwargs["variables"]) + if isinstance(kwargs.get("csv_sources"), list): + register_csv_sources(kwargs["csv_sources"]) + locust_wrapper_proxy.user_dict.get("http_user").configure(user_detail_dict, **kwargs) return HttpUserWrapper @@ -25,7 +35,6 @@ class HttpUserWrapper(HttpUser): def __init__(self, environment): super().__init__(environment) - # HTTP 方法映射 (HTTP method mapping) self.method: Dict[str, Any] = { "get": self.client.get, "post": self.client.post, @@ -38,17 +47,7 @@ def __init__(self, environment): @task def test(self) -> None: - """ - 執行測試任務 - Execute test tasks - """ proxy_user = locust_wrapper_proxy.user_dict.get("http_user") if not proxy_user or not proxy_user.tasks: return - - for test_task_method, test_task_data in proxy_user.tasks.items(): - http_method = self.method.get(str(test_task_method).lower()) - if http_method and isinstance(test_task_data, dict): - request_url = test_task_data.get("request_url") - if request_url: - http_method(request_url) + run_scenario(self.method, proxy_user.tasks) diff --git a/je_load_density/wrapper/user_template/mqtt_user_template.py b/je_load_density/wrapper/user_template/mqtt_user_template.py new file mode 100644 index 0000000..ec11362 --- /dev/null +++ b/je_load_density/wrapper/user_template/mqtt_user_template.py @@ -0,0 +1,182 @@ +import secrets +import time +from typing import Any, Dict + +from locust import User, between, task + +from je_load_density.utils.logging.loggin_instance import load_density_logger +from je_load_density.utils.parameterization import ( + parameter_resolver, + register_csv_sources, + register_variables, +) +from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy + + +def set_wrapper_mqtt_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: + """ + 設定 MQTT User 的代理使用者 + Configure MQTT User proxy + """ + if isinstance(kwargs.get("variables"), dict): + register_variables(kwargs["variables"]) + if isinstance(kwargs.get("csv_sources"), list): + register_csv_sources(kwargs["csv_sources"]) + + locust_wrapper_proxy.user_dict.get("mqtt_user").configure(user_detail_dict, **kwargs) + return MqttUserWrapper + + +class MqttUserWrapper(User): + """ + Locust MQTT User 包裝類別 + Locust MQTT User wrapper class + + Each task entry should look like:: + + { + "method": "publish", # publish | subscribe | connect | disconnect + "broker": "127.0.0.1:1883", + "topic": "telemetry/x", + "payload": "...", + "qos": 1, + "retain": false, + "username": "...", + "password": "...", + "client_id": "...", + "name": "publish-telemetry" + } + """ + + host = "127.0.0.1:1883" + wait_time = between(0.1, 0.2) + + def __init__(self, environment): + super().__init__(environment) + self._client = None + self._broker_target = "" + + def _ensure_paho(self): + try: + import paho.mqtt.client as paho_client + except ImportError as error: + raise RuntimeError("paho-mqtt is required for MqttUser") from error + return paho_client + + def _ensure_client(self, broker: str, step: Dict[str, Any]): + paho_client = self._ensure_paho() + if self._client is not None and self._broker_target == broker: + return self._client + + if self._client is not None: + try: + self._client.disconnect() + except Exception as error: + load_density_logger.debug(f"mqtt disconnect before reconnect failed: {error!r}") + + client_id = step.get("client_id") or f"loaddensity-{secrets.token_hex(4)}" + client = paho_client.Client(client_id=client_id, clean_session=True) + username = step.get("username") + password = step.get("password") + if username: + client.username_pw_set(username, password or None) + + host, _, port = broker.partition(":") + client.connect(host, int(port or 1883), keepalive=int(step.get("keepalive", 60))) + client.loop_start() + self._client = client + self._broker_target = broker + return client + + def _fire(self, name: str, broker: str, start: float, length: int, exception: Exception = None) -> None: + self.environment.events.request.fire( + request_type="MQTT", + name=name, + response_time=(time.monotonic() - start) * 1000, + response_length=length, + exception=exception, + context={}, + url=broker, + response=None, + start_time=start, + ) + + def _do_step(self, raw_task: Dict[str, Any]) -> None: + step = parameter_resolver.resolve(raw_task) + method = str(step.get("method", "publish")).lower() + broker = step.get("broker") or step.get("host") or self.host + name = step.get("name") or f"{method}:{step.get('topic', '')}" + + start = time.monotonic() + try: + length = self._dispatch_step(method, broker, step) + self._fire(name, broker, start, length) + except Exception as error: + self._fire(name, broker, start, 0, error) + + def _dispatch_step(self, method: str, broker: str, step: Dict[str, Any]) -> int: + if method == "disconnect": + self._teardown_client() + return 0 + + client = self._ensure_client(broker, step) + + if method == "connect": + return 0 + if method == "publish": + return self._publish(client, step) + if method == "subscribe": + return self._subscribe(client, step) + raise ValueError(f"unsupported mqtt method: {method}") + + def _teardown_client(self) -> None: + if self._client is not None: + self._client.loop_stop() + self._client.disconnect() + self._client = None + + @staticmethod + def _publish(client, step: Dict[str, Any]) -> int: + topic = step.get("topic", "") + qos = int(step.get("qos", 0)) + retain = bool(step.get("retain", False)) + payload = step.get("payload", "") + info = client.publish(topic, payload=payload, qos=qos, retain=retain) + if hasattr(info, "wait_for_publish"): + info.wait_for_publish(timeout=float(step.get("timeout", 5))) + if info.rc != 0: + raise RuntimeError(f"publish failed rc={info.rc}") + return len(payload) if isinstance(payload, (bytes, str)) else 0 + + @staticmethod + def _subscribe(client, step: Dict[str, Any]) -> int: + topic = step.get("topic", "") + qos = int(step.get("qos", 0)) + result, _ = client.subscribe(topic, qos=qos) + if result != 0: + raise RuntimeError(f"subscribe failed rc={result}") + return 0 + + @task + def run_tasks(self) -> None: + proxy_user = locust_wrapper_proxy.user_dict.get("mqtt_user") + if not proxy_user or not proxy_user.tasks: + return + tasks = proxy_user.tasks + if isinstance(tasks, dict) and "tasks" in tasks: + tasks = tasks.get("tasks") or [] + if not isinstance(tasks, list): + load_density_logger.warning("mqtt_user.tasks must be a list") + return + for raw_task in tasks: + if isinstance(raw_task, dict): + self._do_step(raw_task) + + def on_stop(self) -> None: + if self._client is not None: + try: + self._client.loop_stop() + self._client.disconnect() + except Exception as error: + load_density_logger.debug(f"mqtt on_stop cleanup failed: {error!r}") + self._client = None diff --git a/je_load_density/wrapper/user_template/request_executor.py b/je_load_density/wrapper/user_template/request_executor.py new file mode 100644 index 0000000..14b71fc --- /dev/null +++ b/je_load_density/wrapper/user_template/request_executor.py @@ -0,0 +1,207 @@ +import json as json_module +from typing import Any, Dict, Iterable, List, Optional, Tuple + +from je_load_density.utils.logging.loggin_instance import load_density_logger +from je_load_density.utils.parameterization import parameter_resolver + +_REQUEST_KW = ( + "params", "headers", "cookies", "json", "data", + "timeout", "allow_redirects", "verify", "files", +) + + +def _build_kwargs(task: Dict[str, Any]) -> Dict[str, Any]: + kwargs: Dict[str, Any] = {} + for key in _REQUEST_KW: + if key in task and task[key] is not None: + kwargs[key] = task[key] + + auth = task.get("auth") + if isinstance(auth, dict): + auth_type = str(auth.get("type", "")).lower() + if auth_type == "basic": + kwargs["auth"] = (auth.get("username", ""), auth.get("password", "")) + elif auth_type == "bearer": + headers = dict(kwargs.get("headers") or {}) + headers["Authorization"] = f"Bearer {auth.get('token', '')}" + kwargs["headers"] = headers + + name = task.get("name") + if name: + kwargs["name"] = name + + return kwargs + + +def _assert_status_code(response: Any, assertion: Dict[str, Any]) -> Optional[str]: + target = assertion.get("value") + actual = getattr(response, "status_code", None) + if int(actual) != int(target): + return f"status_code expected {target}, got {actual}" + return None + + +def _assert_contains(response: Any, assertion: Dict[str, Any]) -> Optional[str]: + target = assertion.get("value") + text = getattr(response, "text", "") or "" + if str(target) not in text: + return f"body does not contain {target!r}" + return None + + +def _assert_not_contains(response: Any, assertion: Dict[str, Any]) -> Optional[str]: + target = assertion.get("value") + text = getattr(response, "text", "") or "" + if str(target) in text: + return f"body unexpectedly contains {target!r}" + return None + + +def _assert_json_path(response: Any, assertion: Dict[str, Any]) -> Optional[str]: + path = assertion.get("path", "") + expected = assertion.get("value") + actual = _resolve_json_path(response, path) + if actual != expected: + return f"json_path {path} expected {expected!r}, got {actual!r}" + return None + + +def _assert_header(response: Any, assertion: Dict[str, Any]) -> Optional[str]: + target = assertion.get("value") + header_name = assertion.get("name", "") + headers = getattr(response, "headers", {}) or {} + if headers.get(header_name) != target: + return f"header {header_name} expected {target!r}, got {headers.get(header_name)!r}" + return None + + +_ASSERTION_HANDLERS = { + "status_code": _assert_status_code, + "contains": _assert_contains, + "not_contains": _assert_not_contains, + "json_path": _assert_json_path, + "header": _assert_header, +} + + +def _check_assertions(response: Any, assertions: Iterable[Dict[str, Any]]) -> Tuple[bool, Optional[str]]: + for assertion in assertions: + kind = str(assertion.get("type", "")).lower() + handler = _ASSERTION_HANDLERS.get(kind) + if handler is None: + continue + reason = handler(response, assertion) + if reason is not None: + return False, reason + return True, None + + +def _resolve_json_path(response: Any, path: str) -> Any: + try: + body = response.json() + except Exception: + return None + cursor: Any = body + for part in path.split("."): + if not part: + continue + if isinstance(cursor, list): + try: + cursor = cursor[int(part)] + continue + except (ValueError, IndexError): + return None + if isinstance(cursor, dict): + cursor = cursor.get(part) + if cursor is None: + return None + else: + return None + return cursor + + +def _apply_extractors(response: Any, extractors: Iterable[Dict[str, Any]]) -> None: + for extractor in extractors: + var_name = extractor.get("var") + if not var_name: + continue + kind = str(extractor.get("from", "json_path")).lower() + if kind == "json_path": + value = _resolve_json_path(response, extractor.get("path", "")) + elif kind == "header": + headers = getattr(response, "headers", {}) or {} + value = headers.get(extractor.get("name", "")) + elif kind == "status_code": + value = getattr(response, "status_code", None) + else: + value = None + if value is not None: + parameter_resolver.register_variable(var_name, value) + + +def _normalise_tasks(raw_tasks: Any) -> List[Dict[str, Any]]: + """ + Accepts either: + {"get": {...}, "post": {...}} - legacy single-method-per-task + [{"method": "get", ...}, ...] - new list-of-tasks form + Returns a normalised list with explicit "method". + """ + if isinstance(raw_tasks, list): + result = [] + for item in raw_tasks: + if isinstance(item, dict): + result.append(dict(item)) + return result + if isinstance(raw_tasks, dict): + return [{"method": method, **(payload if isinstance(payload, dict) else {})} + for method, payload in raw_tasks.items()] + return [] + + +def execute_task(method_map: Dict[str, Any], task: Dict[str, Any]) -> None: + """ + Resolve placeholders, execute one request, run assertions, and apply extractors. + """ + resolved = parameter_resolver.resolve(task) + + method = str(resolved.get("method", "")).lower() + request_url = resolved.get("request_url") or resolved.get("url") + if not method or not request_url: + return + + http_method = method_map.get(method) + if http_method is None: + load_density_logger.warning(f"unsupported HTTP method: {method}") + return + + kwargs = _build_kwargs(resolved) + assertions = resolved.get("assertions") or [] + extractors = resolved.get("extract") or [] + + if not assertions and not extractors: + http_method(request_url, **kwargs) + return + + catch_kwargs = dict(kwargs) + catch_kwargs["catch_response"] = True + with http_method(request_url, **catch_kwargs) as response: + ok, reason = _check_assertions(response, assertions) + if not ok: + response.failure(reason) + else: + _apply_extractors(response, extractors) + response.success() + + +def execute_tasks(method_map: Dict[str, Any], raw_tasks: Any) -> None: + for task in _normalise_tasks(raw_tasks): + try: + execute_task(method_map, task) + except Exception as error: + load_density_logger.error(f"task execution failed: {error!r}") + + +def task_body_as_json(body: Any) -> str: + if isinstance(body, (dict, list)): + return json_module.dumps(body) + return str(body) diff --git a/je_load_density/wrapper/user_template/scenario_runner.py b/je_load_density/wrapper/user_template/scenario_runner.py new file mode 100644 index 0000000..79337b2 --- /dev/null +++ b/je_load_density/wrapper/user_template/scenario_runner.py @@ -0,0 +1,108 @@ +import secrets +from typing import Any, Dict, List, Optional + +from je_load_density.utils.logging.loggin_instance import load_density_logger +from je_load_density.utils.parameterization import parameter_resolver +from je_load_density.wrapper.user_template.request_executor import ( + _normalise_tasks, + execute_task, +) + + +def _coerce_tasks_payload(raw_tasks: Any) -> Dict[str, Any]: + """ + Accept the new dict form with mode and tasks, or a bare list/dict. + Returns {"mode": "sequence|weighted|conditional", "tasks": [...]}. + """ + if isinstance(raw_tasks, dict) and "tasks" in raw_tasks and isinstance(raw_tasks.get("tasks"), (list, dict)): + mode = str(raw_tasks.get("mode", "sequence")).lower() + tasks = _normalise_tasks(raw_tasks.get("tasks")) + return {"mode": mode, "tasks": tasks} + return {"mode": "sequence", "tasks": _normalise_tasks(raw_tasks)} + + +def _condition_passes(task: Dict[str, Any]) -> bool: + run_if = task.get("run_if") + skip_if = task.get("skip_if") + + if run_if is not None and not _eval_condition(run_if): + return False + if skip_if is not None and _eval_condition(skip_if): + return False + return True + + +def _is_pair(value: Any) -> bool: + return isinstance(value, list) and len(value) == 2 + + +_CONDITION_OPS = { + "equals": lambda v: v[0] == v[1] if _is_pair(v) else False, + "not_equals": lambda v: v[0] != v[1] if _is_pair(v) else False, + "in": lambda v: (v[0] in (v[1] or [])) if _is_pair(v) else False, + "truthy": bool, +} + + +def _eval_condition(expression: Any) -> bool: + """ + Resolve a condition. Supports: + bool / int -> truthy check + "${var.x}" -> truthy after resolve + {"equals": ["${var.x}", "ok"]} -> equality + {"not_equals": [...]} + {"in": ["${var.x}", ["a", "b"]]} + {"truthy": "${var.x}"} + """ + if isinstance(expression, (bool, int)): + return bool(expression) + if isinstance(expression, str): + return bool(parameter_resolver.resolve(expression)) + if not isinstance(expression, dict): + return False + for op, args in expression.items(): + handler = _CONDITION_OPS.get(op.lower()) + if handler is None: + continue + return handler(parameter_resolver.resolve(args)) + return False + + +def _pick_weighted(tasks: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + weights = [max(int(t.get("weight", 1) or 1), 0) for t in tasks] + total = sum(weights) + if total <= 0: + return None + pick = secrets.randbelow(total) + cursor = 0 + for task, weight in zip(tasks, weights): + cursor += weight + if pick < cursor: + return task + return tasks[-1] + + +def run_scenario(method_map: Dict[str, Any], raw_tasks: Any) -> None: + payload = _coerce_tasks_payload(raw_tasks) + mode = payload["mode"] + tasks = payload["tasks"] + if not tasks: + return + + if mode == "weighted": + chosen = _pick_weighted(tasks) + if chosen and _condition_passes(chosen): + _safe_execute(method_map, chosen) + return + + for task in tasks: + if not _condition_passes(task): + continue + _safe_execute(method_map, task) + + +def _safe_execute(method_map: Dict[str, Any], task: Dict[str, Any]) -> None: + try: + execute_task(method_map, task) + except Exception as error: + load_density_logger.error(f"scenario step failed: {error!r}") diff --git a/je_load_density/wrapper/user_template/socket_user_template.py b/je_load_density/wrapper/user_template/socket_user_template.py new file mode 100644 index 0000000..13a6d6a --- /dev/null +++ b/je_load_density/wrapper/user_template/socket_user_template.py @@ -0,0 +1,147 @@ +import socket +import time +from typing import Any, Dict + +from locust import User, between, task + +from je_load_density.utils.logging.loggin_instance import load_density_logger +from je_load_density.utils.parameterization import ( + parameter_resolver, + register_csv_sources, + register_variables, +) +from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy + + +def set_wrapper_socket_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: + """ + 設定 Socket User 的代理使用者 (TCP/UDP) + Configure raw socket User proxy + """ + if isinstance(kwargs.get("variables"), dict): + register_variables(kwargs["variables"]) + if isinstance(kwargs.get("csv_sources"), list): + register_csv_sources(kwargs["csv_sources"]) + + locust_wrapper_proxy.user_dict.get("socket_user").configure(user_detail_dict, **kwargs) + return SocketUserWrapper + + +class SocketUserWrapper(User): + """ + Locust 原生 Socket User 包裝類別 (TCP/UDP) + Raw socket Locust user (TCP/UDP). + + Each task entry should look like:: + + { + "protocol": "tcp", # tcp | udp + "target": "127.0.0.1:9000", + "payload": "...", # str or hex string with prefix "hex:" + "expect_bytes": 64, # bytes to read; 0 to skip read + "expect_substring": "OK", # optional substring assertion + "timeout": 5, + "name": "ping" + } + """ + + host = "127.0.0.1:9000" + wait_time = between(0.1, 0.2) + + def _fire(self, name: str, target: str, protocol: str, start: float, length: int, exception: Exception = None) -> None: + self.environment.events.request.fire( + request_type=protocol.upper(), + name=name, + response_time=(time.monotonic() - start) * 1000, + response_length=length, + exception=exception, + context={}, + url=target, + response=None, + start_time=start, + ) + + @staticmethod + def _to_bytes(value: Any) -> bytes: + if isinstance(value, bytes): + return value + text = str(value or "") + if text.startswith("hex:"): + return bytes.fromhex(text[4:]) + return text.encode("utf-8") + + def _do_step(self, raw_task: Dict[str, Any]) -> None: + step = parameter_resolver.resolve(raw_task) + protocol = str(step.get("protocol", "tcp")).lower() + target = step.get("target") or step.get("host") or self.host + host, _, port = target.partition(":") + port = int(port or 0) + timeout = float(step.get("timeout", 5)) + payload = self._to_bytes(step.get("payload", "")) + expect_bytes = int(step.get("expect_bytes", 0)) + expect_substring = step.get("expect_substring") + name = step.get("name") or f"{protocol}:{target}" + + start = time.monotonic() + try: + if protocol == "tcp": + length = self._do_tcp(host, port, payload, expect_bytes, expect_substring, timeout) + elif protocol == "udp": + length = self._do_udp(host, port, payload, expect_bytes, expect_substring, timeout) + else: + raise ValueError(f"unsupported protocol: {protocol}") + self._fire(name, target, protocol, start, length) + except Exception as error: + self._fire(name, target, protocol, start, 0, error) + + @staticmethod + def _do_tcp(host: str, port: int, payload: bytes, expect_bytes: int, expect_substring: Any, timeout: float) -> int: + with socket.create_connection((host, port), timeout=timeout) as sock: + sock.sendall(payload) + data = b"" + if expect_bytes > 0: + sock.settimeout(timeout) + while len(data) < expect_bytes: + chunk = sock.recv(expect_bytes - len(data)) + if not chunk: + break + data += chunk + _verify_substring(data, expect_substring) + return len(payload) + len(data) + + @staticmethod + def _do_udp(host: str, port: int, payload: bytes, expect_bytes: int, expect_substring: Any, timeout: float) -> int: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(timeout) + try: + sock.sendto(payload, (host, port)) + data = b"" + if expect_bytes > 0: + data, _ = sock.recvfrom(max(expect_bytes, 1)) + _verify_substring(data, expect_substring) + return len(payload) + len(data) + finally: + sock.close() + + @task + def run_tasks(self) -> None: + proxy_user = locust_wrapper_proxy.user_dict.get("socket_user") + if not proxy_user or not proxy_user.tasks: + return + tasks = proxy_user.tasks + if isinstance(tasks, dict) and "tasks" in tasks: + tasks = tasks.get("tasks") or [] + if not isinstance(tasks, list): + load_density_logger.warning("socket_user.tasks must be a list") + return + for raw_task in tasks: + if isinstance(raw_task, dict): + self._do_step(raw_task) + + +def _verify_substring(data: bytes, expect: Any) -> None: + if expect is None: + return + text = data.decode("utf-8", errors="replace") + if str(expect) not in text: + raise AssertionError(f"expected {expect!r} in response") diff --git a/je_load_density/wrapper/user_template/websocket_user_template.py b/je_load_density/wrapper/user_template/websocket_user_template.py new file mode 100644 index 0000000..fde5197 --- /dev/null +++ b/je_load_density/wrapper/user_template/websocket_user_template.py @@ -0,0 +1,171 @@ +import time +from typing import Any, Dict + +from locust import User, between, task + +from je_load_density.utils.logging.loggin_instance import load_density_logger +from je_load_density.utils.parameterization import ( + parameter_resolver, + register_csv_sources, + register_variables, +) +from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy + + +def set_wrapper_websocket_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: + """ + 設定 WebSocket User 的代理使用者 + Configure WebSocket User proxy + """ + if isinstance(kwargs.get("variables"), dict): + register_variables(kwargs["variables"]) + if isinstance(kwargs.get("csv_sources"), list): + register_csv_sources(kwargs["csv_sources"]) + + locust_wrapper_proxy.user_dict.get("websocket_user").configure(user_detail_dict, **kwargs) + return WebSocketUserWrapper + + +class WebSocketUserWrapper(User): + """ + Locust WebSocket User 包裝類別 + Locust WebSocket User wrapper class + + Each task entry should look like:: + + { + "method": "send", # send | recv | sendrecv | connect | close + "request_url": "ws://...", # required for connect; otherwise reused + "name": "login", # event name; defaults to URL + "payload": "...", + "expect": "substring", # optional substring assertion on recv + "timeout": 5 + } + """ + + host = "ws://localhost" + wait_time = between(0.1, 0.2) + + def __init__(self, environment): + super().__init__(environment) + self._ws = None + self._url: str = "" + + def _ensure_websocket(self): + if self._ws is None: + try: + from websocket import create_connection + except ImportError as error: + raise RuntimeError("websocket-client is required for WebSocketUser") from error + self._create_connection = create_connection + return self._create_connection + + def _connect(self, url: str, timeout: float) -> None: + create_connection = self._ensure_websocket() + if self._ws is not None and self._url == url: + return + if self._ws is not None: + try: + self._ws.close() + except Exception as error: + load_density_logger.debug(f"websocket close before reconnect failed: {error!r}") + self._ws = create_connection(url, timeout=timeout) + self._url = url + + def _fire(self, name: str, start: float, length: int, exception: Exception = None) -> None: + self.environment.events.request.fire( + request_type="WS", + name=name, + response_time=(time.monotonic() - start) * 1000, + response_length=length, + exception=exception, + context={}, + url=self._url, + response=None, + start_time=start, + ) + + def _do_step(self, raw_task: Dict[str, Any]) -> None: + step = parameter_resolver.resolve(raw_task) + method = str(step.get("method", "send")).lower() + url = step.get("request_url") or step.get("url") or self._url + name = step.get("name") or url or method + timeout = float(step.get("timeout", 5)) + + start = time.monotonic() + try: + length = self._dispatch_step(method, url, timeout, step) + self._fire(name, start, length) + except Exception as error: + self._fire(name, start, 0, error) + + def _dispatch_step(self, method: str, url: str, timeout: float, step: Dict[str, Any]) -> int: + if method == "connect": + self._connect(url, timeout) + return 0 + if method == "close": + self._close_socket() + return 0 + + if self._ws is None and url: + self._connect(url, timeout) + if self._ws is None: + raise RuntimeError("websocket connection not established") + + if method == "send": + return self._send_only(step) + if method == "recv": + return self._recv_only(timeout, step) + if method == "sendrecv": + return self._send_then_recv(timeout, step) + raise ValueError(f"unsupported websocket method: {method}") + + def _close_socket(self) -> None: + if self._ws is not None: + self._ws.close() + self._ws = None + + def _send_only(self, step: Dict[str, Any]) -> int: + payload = step.get("payload", "") + self._ws.send(payload) + return len(payload) + + def _recv_only(self, timeout: float, step: Dict[str, Any]) -> int: + self._ws.settimeout(timeout) + data = self._ws.recv() + _verify_expect(data, step.get("expect")) + return len(data) if data else 0 + + def _send_then_recv(self, timeout: float, step: Dict[str, Any]) -> int: + payload = step.get("payload", "") + self._ws.send(payload) + self._ws.settimeout(timeout) + data = self._ws.recv() + _verify_expect(data, step.get("expect")) + return len(payload) + (len(data) if data else 0) + + @task + def run_tasks(self) -> None: + proxy_user = locust_wrapper_proxy.user_dict.get("websocket_user") + if not proxy_user or not proxy_user.tasks: + return + tasks = proxy_user.tasks + if isinstance(tasks, dict) and "tasks" in tasks: + tasks = tasks.get("tasks") or [] + if not isinstance(tasks, list): + load_density_logger.warning("websocket_user.tasks must be a list") + return + for raw_task in tasks: + if isinstance(raw_task, dict): + self._do_step(raw_task) + + +def _verify_expect(data: Any, expect: Any) -> None: + if expect is None: + return + if isinstance(data, bytes): + text = data.decode("utf-8", errors="replace") + else: + text = str(data) + if str(expect) not in text: + raise AssertionError(f"expected {expect!r} in response") diff --git a/pyproject.toml b/pyproject.toml index 55e8eb6..8b633dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,35 @@ find = { namespaces = false } [project.optional-dependencies] gui = ["PySide6==6.11.0", "qt-material"] +websocket = ["websocket-client>=1.6"] +grpc = ["grpcio>=1.60", "protobuf>=4.25"] +mqtt = ["paho-mqtt>=2.0"] +prometheus = ["prometheus-client>=0.19"] +opentelemetry = [ + "opentelemetry-api>=1.24", + "opentelemetry-sdk>=1.24", + "opentelemetry-exporter-otlp-proto-grpc>=1.24", +] +metrics = [ + "prometheus-client>=0.19", + "opentelemetry-api>=1.24", + "opentelemetry-sdk>=1.24", + "opentelemetry-exporter-otlp-proto-grpc>=1.24", +] +faker = ["faker>=24.0"] +mcp = ["mcp>=1.0"] +all = [ + "PySide6==6.11.0", "qt-material", + "websocket-client>=1.6", + "grpcio>=1.60", "protobuf>=4.25", + "paho-mqtt>=2.0", + "prometheus-client>=0.19", + "opentelemetry-api>=1.24", + "opentelemetry-sdk>=1.24", + "opentelemetry-exporter-otlp-proto-grpc>=1.24", + "faker>=24.0", + "mcp>=1.0", +] [tool.bandit] exclude_dirs = ["test", "tests", ".venv", "build", "dist"]