Source code for episimmer.vulnerability_detection.agent_vd

import random
from typing import Dict, List, Tuple, Union

import numpy as np

from episimmer.read_file import ReadAgents, ReadLocations, ReadOneTimeEvents
from episimmer.simulate import Simulate
from episimmer.utils.time import Time
from episimmer.world import World

from .base import AgentVulnerability, VulnerableAgent


[docs]class SimpleVulnerableAgent(VulnerableAgent): r""" Class that implements the Simple Vulnerable Agent module. This class scores the agent based on the number of times he was infected in multiple simulations. Inherits :class:`~episimmer.vulnerability_detection.base.VulnerableAgent` class. .. math:: Agent\ Score = \sum_{i=0}^{N-1}{ 1[Agent\ Infected]} where N refers to the total number of simulation runs. The Algorithm Parameter File in the vd_config file must contain the parameters states and num_runs. 'states' refers to the states in the simulation that are infectious or post-infectious 'num_runs' refers to the total number of simulation runs An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below .. code-block:: text :caption: vd_config.txt :linenos: VD Target <Agent> VD Algorithm <SimpleVulnerableAgent> Algorithm Parameter File <parameter.json> Pre Processing <> Post Processing <> Output Mode <Default> .. code-block:: json :caption: parameter.json :linenos: { "states":["Infected","Recovered"], "num_runs":100 } Args: world_obj: World object of simulation parameter_dict: Dictionary of parameters relevant to the algorithm """ def __init__(self, world_obj: World, parameter_dict: Dict[str, Union[List[str], int]]): super().__init__(world_obj) self.states: List[str] = parameter_dict['states'] self.num_runs: int = parameter_dict['num_runs'] self.init_scores()
[docs] def update_agent_scores(self, agents_obj: ReadAgents) -> None: """ Function to update the agent scores. Args: agents_obj: ReadAgents object """ for agent in agents_obj.agents.values(): if agent.state in self.states: self.agent_scores[agent.index] += 1
[docs] def one_run(self) -> None: """ Executes a single run of the detection module. """ _, agents_obj, _ = self.world_obj.one_world() self.update_agent_scores(agents_obj)
[docs] def run_detection(self) -> None: """ Function to run the complete detection module. """ for i in range(self.num_runs): if (i + 1) % (self.num_runs / 10) == 0: print('Iteration running : ', i + 1) self.one_run()
[docs]class EarlyVulnerableAgent(VulnerableAgent): r""" Class that implements the Early Vulnerable Agent module. This class scores the agent based on the number of times he was infected in multiple simulations and also given a higher score if he was infected earlier. Inherits :class:`~episimmer.vulnerability_detection.base.VulnerableAgent` class. .. math:: Agent\ Score = \sum_{i=0}^{N-1}(Total\ Timesteps - Timestep\ Infected) where N refers to the total number of simulation runs. Note that if the agent was not infected, his score for that run would be 0. The Algorithm Parameter File in the vd_config file must contain the parameters states and num_runs. 'states' refers to the states in the simulation that are infectious or post-infectious 'num_runs' refers to the total number of simulation runs An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below .. code-block:: text :caption: vd_config.txt :linenos: VD Target <Agent> VD Algorithm <EarlyVulnerableAgent> Algorithm Parameter File <parameter.json> Pre Processing <> Post Processing <> Output Mode <Default> .. code-block:: json :caption: parameter.json :linenos: { "states":["Infected","Recovered"], "num_runs":100 } Args: world_obj: World object of simulation parameter_dict: Dictionary of parameters relevant to the algorithm """ def __init__(self, world_obj: World, parameter_dict: Dict[str, Union[List[str], int]]): super().__init__(world_obj) self.states: List[str] = parameter_dict['states'] self.num_runs: int = parameter_dict['num_runs'] self.agent_start_infection_step: Dict[str, Union[int, None]] = {} self.init_scores()
[docs] def reset_start_infection(self, agents_obj: ReadAgents) -> None: """ Resets each agent's start infection step dictionary Args: agents_obj: ReadAgents object """ for agent_index in agents_obj.agents.keys(): self.agent_start_infection_step[agent_index] = None
[docs] def update_agent_scores(self, agents_obj: ReadAgents) -> None: """ Function to update the agent scores. Args: agents_obj: ReadAgents object """ total_time_steps = self.world_obj.config_obj.time_steps for agent in agents_obj.agents.values(): if self.agent_start_infection_step[agent.index] is not None: self.agent_scores[ agent. index] += total_time_steps - self.agent_start_infection_step[ agent.index]
[docs] def update_agent_start_infection(self, agents_obj: ReadAgents, time_step: int) -> None: """ Updates the agents start infection time step Args: agents_obj: ReadAgents object time_step: Current time step """ for agent in agents_obj.agents.values(): if agent.state in self.states and self.agent_start_infection_step[ agent.index] is None: self.agent_start_infection_step[agent.index] = time_step
[docs] def one_run( self) -> Tuple[Dict[str, List[int]], ReadAgents, ReadLocations]: """ Executes a single run of the module. Returns: The end_state, agents object and locations object """ time_steps = self.world_obj.config_obj.time_steps Time.new_world() agents_obj = ReadAgents(self.world_obj.agents_filename, self.world_obj.config_obj) self.reset_start_infection(agents_obj) locations_obj = ReadLocations(self.world_obj.locations_filename, self.world_obj.config_obj) one_time_event_obj = ReadOneTimeEvents( self.world_obj.one_time_event_file) sim_obj = Simulate(self.world_obj.config_obj, self.world_obj.model, self.world_obj.policy_list, agents_obj, locations_obj) sim_obj.on_start_simulation() for current_time_step in range(time_steps): sim_obj.on_start_time_step( self.world_obj.interaction_files_list, self.world_obj.event_files_list, self.world_obj.probabilistic_interaction_files_list, one_time_event_obj) sim_obj.handle_time_step_for_all_agents() sim_obj.end_time_step() self.update_agent_start_infection(agents_obj, current_time_step) Time.increment_current_time_step() self.update_agent_scores(agents_obj) end_state = sim_obj.end_simulation() return end_state, agents_obj, locations_obj
[docs] def run_detection(self) -> None: """ Function to run the complete detection module. """ for i in range(self.num_runs): if (i + 1) % (self.num_runs / 10) == 0: print('Iteration running : ', i + 1) self.one_run()
[docs]class SimpleAgentVulnerability(AgentVulnerability): r""" Class that implements the Simple Agent Vulnerability module. This class scores the agent based on the severity of outbreak after the agent's removal. Multiple simulations are run by randomly selecting and removing an agent and running the simulation. The agent score is calculated by finding the running average of scores for each run. Inherits :class:`~episimmer.vulnerability_detection.base.AgentVulnerability` class. .. math:: Agent\ Score = 1 - \frac{Total\ Agents\ Infected}{Total\ Agents} The Algorithm Parameter File in the vd_config file must contain the parameters states, num_runs and num_agents_to_remove. 'states' refers to the states in the simulation that are infectious or post-infectious 'num_runs' refers to the total number of simulation runs 'num_agents_to_remove' refers to the number of agents to be removed An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below .. code-block:: text :caption: vd_config.txt :linenos: VD Target <Agent> VD Algorithm <SimpleAgentVulnerability> Algorithm Parameter File <parameter.json> Pre Processing <> Post Processing <> Output Mode <Default> .. code-block:: json :caption: parameter.json :linenos: { "states":["Infected","Recovered"], "num_runs":100, "num_agents_to_remove":1 } Args: world_obj: World object of simulation parameter_dict: Dictionary of parameters relevant to the algorithm """ def __init__(self, world_obj: World, parameter_dict: Dict[str, Union[List[str], int]]): super().__init__(world_obj) self.states: List[str] = parameter_dict['states'] self.num_runs: int = parameter_dict['num_runs'] self.num_agents_to_remove: int = parameter_dict['num_agents_to_remove'] self.agents_to_remove: Union[List[str], None] = None self.agent_counts: Dict[str, float] = {} self.init_scores()
[docs] def init_scores(self) -> None: """ Initialises scores and counts for all agents with value 0.0 and 0 respectively. """ agents_obj = ReadAgents(self.world_obj.agents_filename, self.world_obj.config_obj) for agent_index in agents_obj.agents.keys(): self.agent_scores[agent_index] = 0.0 self.agent_counts[agent_index] = 0
[docs] def remove_agents(self, agents_obj: ReadAgents, num_agents_to_remove: int) -> None: """ Function used to remove agents from the list of valid agents. Args: agents_obj: ReadAgents object num_agents_to_remove: Number of agents to remove each simulation """ agents = list(agents_obj.agents) self.agents_to_remove = random.sample(agents, num_agents_to_remove) for agent in self.agents_to_remove: agents_obj.agents.pop(agent) self.agent_counts[agent] += 1
[docs] def update_agent_scores(self, end_state: Dict[str, List[int]]) -> None: """ Function to update the agent scores. Args: end_state: Dictionary mapping states to time step wise population """ agent_score = 0 for state in self.states: agent_score += end_state[state][-1] total_agents = 0 for key in end_state: total_agents += end_state[key][-1] score = 1.0 - agent_score / total_agents for agent in self.agents_to_remove: # self.agent_scores[agent] += score self.agent_scores[agent] += 1.0 / (self.agent_counts[agent]) * ( score - self.agent_scores[agent])
[docs] def one_run(self) -> None: """ Executes a single run of the detection module. """ Time.new_world() time_steps = self.world_obj.config_obj.time_steps agents_obj = ReadAgents(self.world_obj.agents_filename, self.world_obj.config_obj) if self.num_agents_to_remove > 0: self.remove_agents(agents_obj, self.num_agents_to_remove) locations_obj = ReadLocations(self.world_obj.locations_filename, self.world_obj.config_obj) one_time_event_obj = ReadOneTimeEvents( self.world_obj.one_time_event_file) sim_obj = Simulate(self.world_obj.config_obj, self.world_obj.model, self.world_obj.policy_list, agents_obj, locations_obj) sim_obj.on_start_simulation() for current_time_step in range(time_steps): sim_obj.on_start_time_step( self.world_obj.interaction_files_list, self.world_obj.event_files_list, self.world_obj.probabilistic_interaction_files_list, one_time_event_obj) sim_obj.handle_time_step_for_all_agents() sim_obj.end_time_step() Time.increment_current_time_step() end_state = sim_obj.end_simulation() self.update_agent_scores(end_state)
[docs] def run_detection(self) -> None: """ Function to run the complete detection module. """ for i in range(self.num_runs): if (i + 1) % (self.num_runs / 10) == 0: print('Iteration running : ', i + 1) self.one_run()
[docs]class ChunkAgentVulnerability(AgentVulnerability): r""" Class that implements the Chunk Agent Vulnerability module. This class scores the agent based on the severity of outbreak after the agent's removal. Multiple simulations are run by randomly selecting and removing an agent and running the simulation. The agent score is calculated by finding the running average of scores for each run. For higher stability, chunks of the entire simulation are considered instead of the entire simulation. Inherits :class:`~episimmer.vulnerability_detection.base.AgentVulnerability` class. .. math:: Agent\ Score = \sum_{i=0}^{N-1}((Agents\ Infected\ at\ timestep\ \gamma) - (Agents\ Infected\ at\ timestep\ \beta)) where :math:`\beta` is a randomly generated integer based on the user defined parameters for chunk length, and :math:`\gamma` = :math:`\beta` + :math:`chunk\ length` - 1. N refers to the total number of simulation runs. The Algorithm Parameter File in the vd_config file must contain the parameters states, num_runs, num_agents_to_remove and chunk_len_range. 'states' refers to the states in the simulation that are infectious or post-infectious 'num_runs' refers to the total number of simulation runs 'num_agents_to_remove' refers to the number of agents to be removed 'chunk_len_range' refers to the range of time steps the simulation should be run (default = [1,50]) An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below .. code-block:: text :caption: vd_config.txt :linenos: VD Target <Agent> VD Algorithm <ChunkAgentVulnerability> Algorithm Parameter File <parameter.json> Pre Processing <> Post Processing <> Output Mode <Default> .. code-block:: json :caption: parameter.json :linenos: { "states":["Infected","Recovered"], "num_runs":100, "num_agents_to_remove":1, "chunk_len_range":[1,30] } Args: world_obj: World object of simulation parameter_dict: Dictionary of parameters relevant to the algorithm """ def __init__(self, world_obj: World, parameter_dict: Dict[str, Union[List[str], int, List[int]]]): super().__init__(world_obj) self.states: List[str] = parameter_dict['states'] self.num_runs: int = parameter_dict['num_runs'] self.num_agents_to_remove: int = parameter_dict['num_agents_to_remove'] self.agents_to_remove: Union[List[str], None] = None self.time_steps: int = self.world_obj.config_obj.time_steps self.start_agent_score: Union[float, None] = None self.end_agent_score: Union[float, None] = None chunk_len_range = [] if ('chunk_len_range' not in parameter_dict.keys() or not parameter_dict['chunk_len_range']): chunk_len_range = [1, 50] else: chunk_len_range = parameter_dict['chunk_len_range'] self.range_list: List[int] = list( range(int(chunk_len_range[0]), int(chunk_len_range[1]) + 1)) self.init_scores()
[docs] def remove_agents(self, agents_obj: ReadAgents, num_agents_to_remove: int) -> None: """ Function used to remove agents from the list of valid agents. Args: agents_obj: ReadAgents object num_agents_to_remove: Number of agents to remove each simulation """ agents = list(agents_obj.agents) self.agents_to_remove = random.sample(agents, num_agents_to_remove) for agent in self.agents_to_remove: agents_obj.agents.pop(agent)
[docs] def reset_world(self) -> None: """ Initialisations to be performed each world. """ self.start_agent_score = None self.end_agent_score = None
[docs] def get_scores(self, agents_obj: ReadAgents) -> float: """ Returns the agent score based on the agent states Args: agents_obj: ReadAgents object Returns: The agent score """ score = 0.0 for agent in agents_obj.agents.values(): for state in self.states: if agent.state == state: score += 1 break return score
[docs] def update_agent_scores(self, agents_obj: ReadAgents, time_step: int, mode: str) -> None: """ Function to update the agent scores. Args: agents_obj: ReadAgents object time_step: Current time step mode: Mode decides whether the simulation is at its start or end """ if mode == 'start': self.start_agent_score = self.get_scores(agents_obj) elif mode == 'end': self.end_agent_score = self.get_scores(agents_obj) for agent in self.agents_to_remove: self.agent_scores[ agent] += self.end_agent_score - self.start_agent_score
[docs] def one_run(self, start: int, end: int) -> None: """ Executes a single run of the module from start to end time step. Args: start: Start time step where simulation must be considered end: End time step where simulation must be ended """ Time.new_world() agents_obj = ReadAgents(self.world_obj.agents_filename, self.world_obj.config_obj) self.reset_world() if self.num_agents_to_remove > 0: self.remove_agents(agents_obj, self.num_agents_to_remove) locations_obj = ReadLocations(self.world_obj.locations_filename, self.world_obj.config_obj) one_time_event_obj = ReadOneTimeEvents( self.world_obj.one_time_event_file) sim_obj = Simulate(self.world_obj.config_obj, self.world_obj.model, self.world_obj.policy_list, agents_obj, locations_obj) sim_obj.on_start_simulation() flag = 0 for current_time_step in range(self.time_steps): sim_obj.on_start_time_step( self.world_obj.interaction_files_list, self.world_obj.event_files_list, self.world_obj.probabilistic_interaction_files_list, one_time_event_obj) sim_obj.handle_time_step_for_all_agents() sim_obj.end_time_step() if current_time_step == start: self.update_agent_scores(agents_obj, current_time_step, 'start') flag += 1 if current_time_step == end: self.update_agent_scores(agents_obj, current_time_step, 'end') flag += 1 break Time.increment_current_time_step() sim_obj.end_simulation() assert flag == 2
[docs] def generate_bounds(self) -> Tuple[int, int]: """ Returns the start and end time steps for a simulation Returns: Start and End time steps. """ chunk_len = random.choice(self.range_list) if chunk_len <= 0: raise ValueError( 'Inappropriate chunk len range provided. Please provide positive ranges.' ) if chunk_len >= self.time_steps: return 0, self.time_steps - 1 start_list = list(range(self.time_steps - chunk_len + 1)) start = random.choice(start_list) end = start + chunk_len - 1 return start, end
[docs] def run_detection(self) -> None: """ Function to run the complete detection module. """ for i in range(self.num_runs): if (i + 1) % (self.num_runs / 10) == 0: print('Iteration running : ', i + 1) start, end = self.generate_bounds() if start is None: continue self.one_run(start, end)
[docs] def get_maximum_agent_vulnerability(self, n: int) -> Dict[str, float]: """ Function to return the n maximum agent vulnerabilities as a dictionary. Args: n: Number of elements to be returned Returns: A dictionary mapping agent keys to agent scores. Scores are ordered in ascendinng order """ res_max = { key: round(value, 4) for key, value in sorted(self.agent_scores.items(), key=lambda x: (x[1], x[0]))[:n] } return res_max
[docs] def get_minimum_agent_vulnerability(self, n: int) -> Dict[str, float]: """ Function to return the n minimum agent vulnerabilities as a dictionary. Args: n: Number of elements to be returned Returns: A dictionary mapping agent keys to agent scores. Scores are ordered in descending order """ res_min = { key: round(value, 4) for key, value in sorted(self.agent_scores.items(), key=lambda x: (x[1], x[0]), reverse=True)[:n] } return res_min
[docs]class BanditAlgos(AgentVulnerability): r""" Class that implements the Agent Vulnerability module with Bandit Algorithms. Inherits :class:`~episimmer.vulnerability_detection.base.AgentVulnerability` class. We first select an agent for removal based on either epsilon greedy or the UCB1 algorithm, then we run the simulation. **I) Agent Selection** *Epsilon Greedy* Random selection of agent done with probability :math:`\epsilon` and greedy (highest scoring agent) selection of agent with probability 1 - :math:`\epsilon`. *UCB1* Selection of agent based on equation - .. math:: \arg \max_{a} \bigg( running\ avg\ score_{a} + \sqrt{\frac{2 \log{t}}{N_t(a)}} \bigg) where :math:`a` represents an agent and :math:`N_t(a)` represents the number of times agent :math:`a` has been selected for removal. **II) Agent Score** After removal, the simulation is run to get the score of the removed agent. Score of the agent is based on the severity of outbreak after the agent's removal. The agent score is calculated by finding the running average of scores for each run. The score for a single run is given by - .. math:: Agent\ Score = 1 - \frac{Total\ Agents\ Infected}{Total\ Agents} The Algorithm Parameter File in the vd_config file must contain the parameters 'states', 'num_runs', and 'mode'. 'states' refers to the states in the simulation that are infectious or post-infectious 'num_runs' refers to the total number of simulation runs 'mode' refers to the mode or algorithm to be used to select agent. Two options are present - 'EPS' or epsilon greedy and 'UCB' or Upper Confidence Bound algorithm. (default = 'EPS') An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below .. code-block:: text :caption: vd_config.txt :linenos: VD Target <Agent> VD Algorithm <BanditAlgos> Algorithm Parameter File <parameter.json> Pre Processing <> Post Processing <> Output Mode <Default> .. code-block:: json :caption: parameter.json :linenos: { "states":["Infected","Recovered"], "num_runs":100, "mode":"UCB" } Args: world_obj: World object of simulation parameter_dict: Dictionary of parameters relevant to the algorithm """ def __init__(self, world_obj: World, parameter_dict: Dict[str, Union[List[str], int, str]]): super().__init__(world_obj) self.states: List[str] = parameter_dict['states'] self.num_runs: int = parameter_dict['num_runs'] if 'mode' not in parameter_dict.keys() or not parameter_dict['mode']: self.mode: str = 'EPS' else: self.mode = parameter_dict['mode'] self.agent_counts: Dict[int, int] = {} self.eps: float = 0.01 self.rm_agent: Union[int, None] = None self.t: Union[int, None] = None self.init_scores()
[docs] def init_scores(self) -> None: """ Initialises agent scores to the score from one run of the algorithm without the said agent. Agent counts is also initialised to 0. """ agents_obj = ReadAgents(self.world_obj.agents_filename, self.world_obj.config_obj) for agent_index in agents_obj.agents.keys(): # self.agent_scores[int(agent_index)] = 1.0 self.agent_scores[int(agent_index)] = self.get_init_score( agent_index) self.agent_counts[int(agent_index)] = 0
[docs] def get_init_score(self, agent_index: str) -> float: """ Returns the agent's score by running one simulation without the agent Args: agent_index: Current agent's index Returns: Score of the agent """ agents_obj = ReadAgents(self.world_obj.agents_filename, self.world_obj.config_obj) agents_obj.agents.pop(agent_index) end_state, _, _ = self.one_run_helper(agents_obj) return self.get_score(end_state)
[docs] def remove_agents(self, agents_obj: ReadAgents) -> None: """ Function to remove an agent Args: agents_obj: ReadAgents object """ agents = list(agents_obj.agents) if self.mode == 'EPS': if random.random() < self.eps: self.rm_agent = random.randint(0, len(agents) - 1) else: self.rm_agent = max(range(len(agents)), key=lambda x: self.agent_scores[x]) elif self.mode == 'UCB': self.rm_agent = max(range(len(agents)), key=lambda x: self.agent_scores[x] + np. sqrt(2 * np.log(self.t) / (1 + self.agent_counts[x]))) else: raise Exception('Enter Valid mode.') agents_obj.agents.pop(str(self.rm_agent)) self.agent_counts[self.rm_agent] += 1
[docs] def get_score(self, end_state: Dict[str, List[int]]) -> float: """ Function to return agent's score Args: end_state: Dictionary mapping states to time step wise population Returns: Score of the agent """ agent_score = 0 for state in self.states: agent_score += end_state[state][-1] total_agents = 0 for key in end_state: total_agents += end_state[key][-1] score = 1.0 - agent_score / total_agents return score
[docs] def update_agent_scores(self, end_state: Dict[str, List[int]]) -> None: """ Function to update the agent scores. Args: end_state: Dictionary mapping states to time step wise population """ score = self.get_score(end_state) self.agent_scores[self.rm_agent] += 1.0 / (self.agent_counts[ self.rm_agent]) * (score - self.agent_scores[self.rm_agent])
[docs] def one_run_helper( self, agents_obj: ReadAgents ) -> Tuple[Dict[str, List[int]], ReadAgents, ReadLocations]: """ One Run helper Args: agents_obj: ReadAgents object Returns: The end_state, agents object and locations object """ Time.new_world() time_steps = self.world_obj.config_obj.time_steps locations_obj = ReadLocations(self.world_obj.locations_filename, self.world_obj.config_obj) one_time_event_obj = ReadOneTimeEvents( self.world_obj.one_time_event_file) sim_obj = Simulate(self.world_obj.config_obj, self.world_obj.model, self.world_obj.policy_list, agents_obj, locations_obj) sim_obj.on_start_simulation() for current_time_step in range(time_steps): sim_obj.on_start_time_step( self.world_obj.interaction_files_list, self.world_obj.event_files_list, self.world_obj.probabilistic_interaction_files_list, one_time_event_obj) sim_obj.handle_time_step_for_all_agents() sim_obj.end_time_step() Time.increment_current_time_step() end_state = sim_obj.end_simulation() return end_state, agents_obj, locations_obj
[docs] def one_run(self) -> None: """ Executes a single run of the detection module. """ agents_obj = ReadAgents(self.world_obj.agents_filename, self.world_obj.config_obj) self.remove_agents(agents_obj) end_state, _, _ = self.one_run_helper(agents_obj) self.update_agent_scores(end_state)
[docs] def run_detection(self) -> None: """ Function to run the complete detection module. """ for i in range(self.num_runs): if (i + 1) % (self.num_runs / 10) == 0: print('Iteration running : ', i + 1) self.t = i + 1 self.one_run()
[docs] def print_default_output(self, n: int) -> None: """ Prints the n maximum agent scores. Args: n: Number of elements to be printed """ print(self.get_max_score_agents(n))