From 13abeb4ebdcf35c528d32908bc2b31f97ba1ed99 Mon Sep 17 00:00:00 2001 From: Ondrej Lukas Date: Thu, 3 Jul 2025 12:05:17 +0200 Subject: [PATCH 1/7] correctly pass the configuration yaml --- AIDojoCoordinator/utils/utils.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/AIDojoCoordinator/utils/utils.py b/AIDojoCoordinator/utils/utils.py index 4bf042f4..83ae312a 100644 --- a/AIDojoCoordinator/utils/utils.py +++ b/AIDojoCoordinator/utils/utils.py @@ -219,15 +219,21 @@ def read_agents_known_services(self, type_agent: str, type_data: str) -> dict: # Check the host is a good ip _ = netaddr.IPAddress(ip) known_services_host = IP(ip) - if data.lower() == "random": - known_services[known_services_host] = "random" - name = data[0] - type = data[1] - version = data[2] - is_local = data[3] - - known_services[known_services_host] = Service(name, type, version, is_local) - + known_services[known_services_host] = [] + for service in data: # process each item in the list + if isinstance(service, list): # Service defined as list + name = service[0] + type = service[1] + version = service[2] + is_local = service[3] + known_services[known_services_host].append(Service(name, type, version, is_local)) + elif isinstance(service, str): # keyword + if service.lower() == "random": + known_services[known_services_host].append("random") + else: + logging.warning(f"Unsupported values in agent known_services{ip}:{service}") + else: + logging.warning(f"Unsupported values in agent known_services{ip}:{service}") except (ValueError, netaddr.AddrFormatError): known_services = {} return known_services From b28642195059a4ad369b6c97bb5416568ddee11d Mon Sep 17 00:00:00 2001 From: Ondrej Lukas Date: Thu, 3 Jul 2025 12:05:37 +0200 Subject: [PATCH 2/7] correctly select the random services if the keyword is used --- AIDojoCoordinator/worlds/NSEGameCoordinator.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/AIDojoCoordinator/worlds/NSEGameCoordinator.py b/AIDojoCoordinator/worlds/NSEGameCoordinator.py index 3410b116..27156025 100644 --- a/AIDojoCoordinator/worlds/NSEGameCoordinator.py +++ b/AIDojoCoordinator/worlds/NSEGameCoordinator.py @@ -114,10 +114,21 @@ def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->Ga #return value back to the original net_obj.value += 256 known_services ={} - for ip, service_list in view["known_services"]: - known_services[self._ip_mapping[ip]] = service_list + for ip, service_list in view["known_services"].items(): + if self._ip_mapping[ip] not in known_services: + known_services[self._ip_mapping[ip]] = set() + for s in service_list: + if isinstance(s, Service): + known_services[self._ip_mapping[ip]].add(s) + elif isinstance(s, str): + if s == "random": # randomly select the service + self.logger.info(f"\tSelecting service randomly in {self._ip_mapping[ip]}") + # select candidates that are not explicitly listed + service_candidates = [s for s in self._services[self._ip_to_hostname[ip]] if s not in known_services[self._ip_mapping[ip]]] + # randomly select from candidates + known_services[self._ip_mapping[ip]].add(random.choice(service_candidates)) known_data = {} - for ip, data_list in view["known_data"]: + for ip, data_list in view["known_data"].items(): known_data[self._ip_mapping[ip]] = data_list game_state = GameState(controlled_hosts, known_hosts, known_services, known_data, known_networks) self.logger.info(f"Generated GameState:{game_state}") From f9a045a63e98965c7f96c15bd51388be0f1239c2 Mon Sep 17 00:00:00 2001 From: Ondrej Lukas Date: Thu, 3 Jul 2025 12:17:47 +0200 Subject: [PATCH 3/7] Add keyword to set --- AIDojoCoordinator/utils/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AIDojoCoordinator/utils/utils.py b/AIDojoCoordinator/utils/utils.py index 83ae312a..a1e5510e 100644 --- a/AIDojoCoordinator/utils/utils.py +++ b/AIDojoCoordinator/utils/utils.py @@ -178,7 +178,7 @@ def read_agents_known_data(self, type_agent: str, type_data: str) -> dict: known_data[known_data_host] = set() for datum in data: if not isinstance(datum, list) and datum.lower() == "random": - known_data[known_data_host] = "random" + known_data[known_data_host].add("random") else: known_data_content_str_user = datum[0] known_data_content_str_data = datum[1] From 8db7bcdbbafa35d187cdb94c03deb43b8b3d1cf1 Mon Sep 17 00:00:00 2001 From: Ondrej Lukas Date: Thu, 3 Jul 2025 12:26:08 +0200 Subject: [PATCH 4/7] split view parsing into methods --- .../worlds/NSEGameCoordinator.py | 135 ++++++++++++++---- 1 file changed, 108 insertions(+), 27 deletions(-) diff --git a/AIDojoCoordinator/worlds/NSEGameCoordinator.py b/AIDojoCoordinator/worlds/NSEGameCoordinator.py index 27156025..9495cfeb 100644 --- a/AIDojoCoordinator/worlds/NSEGameCoordinator.py +++ b/AIDojoCoordinator/worlds/NSEGameCoordinator.py @@ -7,6 +7,7 @@ import copy from faker import Faker from pathlib import Path +from typing import Iterable import netaddr, re import json @@ -55,20 +56,14 @@ def _initialize(self)->None: self._firewall_original = copy.deepcopy(self._firewall) self.logger.info("Environment initialization finished") - def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->GameState: + + def _get_controlled_hosts_from_view(self, view_controlled_hosts:Iterable)->set: """ - Builds a GameState from given view. - If there is a keyword 'random' used, it is replaced by a valid option at random. - - Currently, we artificially extend the knonw_networks with +- 1 in the third octet. + Parses view and translates all keywords. Produces set of controlled host (IP) """ - self.logger.info(f'Generating state from view:{view}') - # re-map all networks based on current mapping in self._network_mapping - known_networks = set([self._network_mapping[net] for net in view["known_networks"]]) - controlled_hosts = set() # controlled_hosts - for host in view['controlled_hosts']: + for host in view_controlled_hosts: if isinstance(host, IP): controlled_hosts.add(self._ip_mapping[host]) self.logger.debug(f'\tThe attacker has control of host {self._ip_mapping[host]}.') @@ -85,7 +80,77 @@ def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->Ga controlled_hosts = controlled_hosts.union(self._get_all_local_ips()) else: self.logger.error(f"Unsupported value encountered in start_position['controlled_hosts']: {host}") + return controlled_hosts + + def _get_services_from_view(self, view_known_services:dict)->dict: + known_services ={} + for ip, service_list in view_known_services.items(): + if self._ip_mapping[ip] not in known_services: + known_services[self._ip_mapping[ip]] = set() + for s in service_list: + if isinstance(s, Service): + known_services[self._ip_mapping[ip]].add(s) + elif isinstance(s, str): + if s == "random": # randomly select the service + self.logger.info(f"\tSelecting service randomly in {self._ip_mapping[ip]}") + # select candidates that are not explicitly listed + service_candidates = [s for s in self._services[self._ip_to_hostname[ip]] if s not in known_services[self._ip_mapping[ip]]] + # randomly select from candidates + known_services[self._ip_mapping[ip]].add(random.choice(service_candidates)) + return known_services + + def _get_data_from_view(self, view_known_data:dict)->dict: + known_data = {} + for ip, data_list in view_known_data.items(): + if self._ip_mapping[ip] not in known_data: + known_data[self._ip_mapping[ip]] = set() + for datum in data_list: + if isinstance(datum, Data): + known_data[self._ip_mapping[ip]].add(datum) + elif isinstance(datum, str): + if datum == "random": # randomly select the data + self.logger.info(f"\tSelecting data randomly in {self._ip_mapping[ip]}") + # select candidates that are not explicitly listed + data_candidates = [d for d in self._data[self._ip_to_hostname[ip]] if d not in known_data[self._ip_mapping[ip]]] + if len(data_candidates) > 0: + # randomly select from candidates + known_data[self._ip_mapping[ip]].add(random.choice(data_candidates)) + else: + self.logger.warning("\tNo available data. Skipping") + return known_data + def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->GameState: + """ + Builds a GameState from given view. + If there is a keyword 'random' used, it is replaced by a valid option at random. + + Currently, we artificially extend the knonw_networks with +- 1 in the third octet. + """ + self.logger.info(f'Generating state from view:{view}') + # re-map all networks based on current mapping in self._network_mapping + known_networks = set([self._network_mapping[net] for net in view["known_networks"]]) + + controlled_hosts = self._get_controlled_hosts_from_view(view["controlled_hosts"]) + # controlled_hosts = set() + # # controlled_hosts + # for host in view['controlled_hosts']: + # if isinstance(host, IP): + # controlled_hosts.add(self._ip_mapping[host]) + # self.logger.debug(f'\tThe attacker has control of host {self._ip_mapping[host]}.') + # elif host == 'random': + # # Random start + # self.logger.debug('\tAdding random starting position of agent') + # self.logger.debug(f'\t\tChoosing from {self.hosts_to_start}') + # selected = random.choice(self.hosts_to_start) + # controlled_hosts.add(selected) + # self.logger.debug(f'\t\tMaking agent start in {selected}') + # elif host == "all_local": + # # all local ips + # self.logger.debug('\t\tAdding all local hosts to agent') + # controlled_hosts = controlled_hosts.union(self._get_all_local_ips()) + # else: + # self.logger.error(f"Unsupported value encountered in start_position['controlled_hosts']: {host}") # re-map all known based on current mapping in self._ip_mapping + known_hosts = set([self._ip_mapping[ip] for ip in view["known_hosts"]]) # Add all controlled hosts to known_hosts known_hosts = known_hosts.union(controlled_hosts) @@ -113,23 +178,39 @@ def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->Ga known_networks.add(ip) #return value back to the original net_obj.value += 256 - known_services ={} - for ip, service_list in view["known_services"].items(): - if self._ip_mapping[ip] not in known_services: - known_services[self._ip_mapping[ip]] = set() - for s in service_list: - if isinstance(s, Service): - known_services[self._ip_mapping[ip]].add(s) - elif isinstance(s, str): - if s == "random": # randomly select the service - self.logger.info(f"\tSelecting service randomly in {self._ip_mapping[ip]}") - # select candidates that are not explicitly listed - service_candidates = [s for s in self._services[self._ip_to_hostname[ip]] if s not in known_services[self._ip_mapping[ip]]] - # randomly select from candidates - known_services[self._ip_mapping[ip]].add(random.choice(service_candidates)) - known_data = {} - for ip, data_list in view["known_data"].items(): - known_data[self._ip_mapping[ip]] = data_list + # known_services ={} + # for ip, service_list in view["known_services"].items(): + # if self._ip_mapping[ip] not in known_services: + # known_services[self._ip_mapping[ip]] = set() + # for s in service_list: + # if isinstance(s, Service): + # known_services[self._ip_mapping[ip]].add(s) + # elif isinstance(s, str): + # if s == "random": # randomly select the service + # self.logger.info(f"\tSelecting service randomly in {self._ip_mapping[ip]}") + # # select candidates that are not explicitly listed + # service_candidates = [s for s in self._services[self._ip_to_hostname[ip]] if s not in known_services[self._ip_mapping[ip]]] + # # randomly select from candidates + # known_services[self._ip_mapping[ip]].add(random.choice(service_candidates)) + known_services = self._get_services_from_view(view["known_services"]) + # known_data = {} + # for ip, data_list in view["known_data"].items(): + # if self._ip_mapping[ip] not in known_data: + # known_data[self._ip_mapping[ip]] = set() + # for datum in data_list: + # if isinstance(datum, Data): + # known_data[self._ip_mapping[ip]].add(datum) + # elif isinstance(datum, str): + # if datum == "random": # randomly select the data + # self.logger.info(f"\tSelecting data randomly in {self._ip_mapping[ip]}") + # # select candidates that are not explicitly listed + # data_candidates = [d for d in self._data[self._ip_to_hostname[ip]] if d not in known_data[self._ip_mapping[ip]]] + # if len(data_candidates) > 0: + # # randomly select from candidates + # known_data[self._ip_mapping[ip]].add(random.choice(data_candidates)) + # else: + # self.logger.warning("\tNo available data. Skipping") + known_data = self._get_data_from_view(view["known_data"]) game_state = GameState(controlled_hosts, known_hosts, known_services, known_data, known_networks) self.logger.info(f"Generated GameState:{game_state}") return game_state From 4cb3ea12d4884edea226637e351e2c862ca10e7d Mon Sep 17 00:00:00 2001 From: Ondrej Lukas Date: Thu, 3 Jul 2025 12:27:41 +0200 Subject: [PATCH 5/7] remove old code --- .../worlds/NSEGameCoordinator.py | 60 ++----------------- 1 file changed, 4 insertions(+), 56 deletions(-) diff --git a/AIDojoCoordinator/worlds/NSEGameCoordinator.py b/AIDojoCoordinator/worlds/NSEGameCoordinator.py index 9495cfeb..b77dabcd 100644 --- a/AIDojoCoordinator/worlds/NSEGameCoordinator.py +++ b/AIDojoCoordinator/worlds/NSEGameCoordinator.py @@ -55,7 +55,6 @@ def _initialize(self)->None: self._data_original = copy.deepcopy(self._data) self._firewall_original = copy.deepcopy(self._firewall) self.logger.info("Environment initialization finished") - def _get_controlled_hosts_from_view(self, view_controlled_hosts:Iterable)->set: """ @@ -128,33 +127,11 @@ def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->Ga self.logger.info(f'Generating state from view:{view}') # re-map all networks based on current mapping in self._network_mapping known_networks = set([self._network_mapping[net] for net in view["known_networks"]]) - - controlled_hosts = self._get_controlled_hosts_from_view(view["controlled_hosts"]) - # controlled_hosts = set() - # # controlled_hosts - # for host in view['controlled_hosts']: - # if isinstance(host, IP): - # controlled_hosts.add(self._ip_mapping[host]) - # self.logger.debug(f'\tThe attacker has control of host {self._ip_mapping[host]}.') - # elif host == 'random': - # # Random start - # self.logger.debug('\tAdding random starting position of agent') - # self.logger.debug(f'\t\tChoosing from {self.hosts_to_start}') - # selected = random.choice(self.hosts_to_start) - # controlled_hosts.add(selected) - # self.logger.debug(f'\t\tMaking agent start in {selected}') - # elif host == "all_local": - # # all local ips - # self.logger.debug('\t\tAdding all local hosts to agent') - # controlled_hosts = controlled_hosts.union(self._get_all_local_ips()) - # else: - # self.logger.error(f"Unsupported value encountered in start_position['controlled_hosts']: {host}") - # re-map all known based on current mapping in self._ip_mapping - + # parse controlled hosts + controlled_hosts = self._get_controlled_hosts_from_view(view["controlled_hosts"]) known_hosts = set([self._ip_mapping[ip] for ip in view["known_hosts"]]) # Add all controlled hosts to known_hosts known_hosts = known_hosts.union(controlled_hosts) - if add_neighboring_nets: # Extend the known networks with the neighbouring networks # This is to solve in the env (and not in the agent) the problem @@ -178,38 +155,9 @@ def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->Ga known_networks.add(ip) #return value back to the original net_obj.value += 256 - # known_services ={} - # for ip, service_list in view["known_services"].items(): - # if self._ip_mapping[ip] not in known_services: - # known_services[self._ip_mapping[ip]] = set() - # for s in service_list: - # if isinstance(s, Service): - # known_services[self._ip_mapping[ip]].add(s) - # elif isinstance(s, str): - # if s == "random": # randomly select the service - # self.logger.info(f"\tSelecting service randomly in {self._ip_mapping[ip]}") - # # select candidates that are not explicitly listed - # service_candidates = [s for s in self._services[self._ip_to_hostname[ip]] if s not in known_services[self._ip_mapping[ip]]] - # # randomly select from candidates - # known_services[self._ip_mapping[ip]].add(random.choice(service_candidates)) + # parse known services known_services = self._get_services_from_view(view["known_services"]) - # known_data = {} - # for ip, data_list in view["known_data"].items(): - # if self._ip_mapping[ip] not in known_data: - # known_data[self._ip_mapping[ip]] = set() - # for datum in data_list: - # if isinstance(datum, Data): - # known_data[self._ip_mapping[ip]].add(datum) - # elif isinstance(datum, str): - # if datum == "random": # randomly select the data - # self.logger.info(f"\tSelecting data randomly in {self._ip_mapping[ip]}") - # # select candidates that are not explicitly listed - # data_candidates = [d for d in self._data[self._ip_to_hostname[ip]] if d not in known_data[self._ip_mapping[ip]]] - # if len(data_candidates) > 0: - # # randomly select from candidates - # known_data[self._ip_mapping[ip]].add(random.choice(data_candidates)) - # else: - # self.logger.warning("\tNo available data. Skipping") + # parse known data known_data = self._get_data_from_view(view["known_data"]) game_state = GameState(controlled_hosts, known_hosts, known_services, known_data, known_networks) self.logger.info(f"Generated GameState:{game_state}") From fc7bfb643e4da0423b08d3191deaf8b21542fa9f Mon Sep 17 00:00:00 2001 From: Ondrej Lukas Date: Thu, 3 Jul 2025 14:36:51 +0200 Subject: [PATCH 6/7] fix typo --- AIDojoCoordinator/worlds/NSEGameCoordinator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/AIDojoCoordinator/worlds/NSEGameCoordinator.py b/AIDojoCoordinator/worlds/NSEGameCoordinator.py index b77dabcd..717fba79 100644 --- a/AIDojoCoordinator/worlds/NSEGameCoordinator.py +++ b/AIDojoCoordinator/worlds/NSEGameCoordinator.py @@ -117,6 +117,7 @@ def _get_data_from_view(self, view_known_data:dict)->dict: else: self.logger.warning("\tNo available data. Skipping") return known_data + def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->GameState: """ Builds a GameState from given view. @@ -126,7 +127,7 @@ def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->Ga """ self.logger.info(f'Generating state from view:{view}') # re-map all networks based on current mapping in self._network_mapping - known_networks = set([self._network_mapping[net] for net in view["known_networks"]]) + known_networks = set([self._network_mapping[net] for net in view["known_networks"]]) # parse controlled hosts controlled_hosts = self._get_controlled_hosts_from_view(view["controlled_hosts"]) known_hosts = set([self._ip_mapping[ip] for ip in view["known_hosts"]]) From 4771e0047652410560d97715ad9a4a7d790b6be0 Mon Sep 17 00:00:00 2001 From: Ondrej Lukas Date: Thu, 3 Jul 2025 14:37:40 +0200 Subject: [PATCH 7/7] reorganize imports --- AIDojoCoordinator/worlds/NSEGameCoordinator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/AIDojoCoordinator/worlds/NSEGameCoordinator.py b/AIDojoCoordinator/worlds/NSEGameCoordinator.py index 717fba79..d95aa40f 100644 --- a/AIDojoCoordinator/worlds/NSEGameCoordinator.py +++ b/AIDojoCoordinator/worlds/NSEGameCoordinator.py @@ -5,11 +5,11 @@ import random import numpy as np import copy +import netaddr, re +import json from faker import Faker from pathlib import Path from typing import Iterable -import netaddr, re -import json from AIDojoCoordinator.game_components import GameState, Action, ActionType, IP, Network, Data, Service from AIDojoCoordinator.coordinator import GameCoordinator