Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions tests/unit/common/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def workflow(self) -> Workflow:
return Workflow(
name="Workflow Test",
makespan=100.0,
executed_at=str(datetime.now().astimezone().isoformat())
executed_at=str(datetime.now().astimezone().isoformat()),
runtime_system_name="WfCommons",
)

@pytest.mark.unit
Expand Down Expand Up @@ -62,7 +63,7 @@ def test_workflow_creation(self, workflow: Workflow) -> None:
},
"runtimeSystem": {
"name": "WfCommons",
"version": f"{__version__}",
"version": f"unknown",
"url": f"https://docs.wfcommons.org/en/v{__version__}/"
}
}
Expand Down
2 changes: 1 addition & 1 deletion wfcommons/common/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(self,
self.created_at: str = str(datetime.now().astimezone().isoformat())
self.schema_version: str = f"{__schema_version__}"
self.runtime_system_name: Optional[str] = "WfCommons" if not runtime_system_name else runtime_system_name
self.runtime_system_version: Optional[str] = str(__version__) if not runtime_system_version else runtime_system_version
self.runtime_system_version: Optional[str] = "unknown" if not runtime_system_version else runtime_system_version
self.runtime_system_url: Optional[str] = f"https://docs.wfcommons.org/en/v{__version__}/" if not runtime_system_url else runtime_system_url
self.executed_at: Optional[str] = str(datetime.now().astimezone().isoformat()) if not executed_at else executed_at
self.makespan: Optional[float] = makespan
Expand Down
22 changes: 14 additions & 8 deletions wfcommons/wfinstances/logs/snakemake.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class SnakemakeLogsParser(LogsParser):
:param path_prefix_rewrite: A tuple that specifies that a file path prefix(for the workflow data files)
should be replaced by another prefix (this is useful when the workflow execution was on a
different machine than the log parsing)
:param snakemake_version: The Snakemake version (e.g., "9.20.0")
:type snakemake_version: str
"""

def __init__(self,
Expand All @@ -55,7 +57,8 @@ def __init__(self,
description: Optional[str] = None,
logger: Optional[Logger] = None,
rules_to_ignore: Optional[list[str]] = None,
path_prefix_rewrite: Optional[tuple[str, str]] = None
path_prefix_rewrite: Optional[tuple[str, str]] = None,
snakemake_version: Optional[str] = "unknown"
) -> None:
"""Create an object of the Snakemake parser."""

Expand All @@ -69,6 +72,7 @@ def __init__(self,

self.execution_dir : pathlib.Path = execution_dir
self.snkmt_db: pathlib.Path = snkmt_db
self.snakemake_version: str = snakemake_version

self.file_map = {}
self.file_objects = {}
Expand Down Expand Up @@ -99,6 +103,7 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
self.workflow = Workflow(name=self.workflow_name,
description=self.description,
runtime_system_name=self.wms_name,
runtime_system_version=self.snakemake_version,
runtime_system_url=self.wms_url)

# Parse the sqlite db for to identify rules
Expand All @@ -111,13 +116,14 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
self._create_tasks()

# Set the workflow's makespan
workflow_start_time = math.inf
workflow_end_time = 0
for task in self.workflow.tasks.values():
task_start_time = datetime.fromisoformat(task.start_time).timestamp()
task_end_time = task_start_time + task.runtime
workflow_start_time = min(task_start_time, workflow_start_time)
workflow_end_time = max(task_end_time, workflow_end_time)
conn = sqlite3.connect(self.snkmt_db)
cursor = conn.cursor()
cursor.execute("SELECT * FROM workflows")
rows = cursor.fetchall()
if len(rows) != 1:
raise SystemError("The SQLite database has more than one entry in the workflows table")
workflow_start_time = datetime.fromisoformat(rows[0][2]).timestamp()
workflow_end_time = datetime.fromisoformat(rows[0][4]).timestamp()
self.workflow.makespan = workflow_end_time - workflow_start_time

return self.workflow
Expand Down
Loading