import random
from typing import Dict, List, Tuple, Union
import numpy as np
from episimmer.read_file import ReadAgents, ReadLocations, ReadOneTimeEvents
from episimmer.simulate import Simulate
from episimmer.utils.time import Time
from episimmer.world import World
from .base import AgentVulnerability, VulnerableAgent
[docs]class SimpleVulnerableAgent(VulnerableAgent):
r"""
Class that implements the Simple Vulnerable Agent module. This class scores the agent based on the number of times
he was infected in multiple simulations.
Inherits :class:`~episimmer.vulnerability_detection.base.VulnerableAgent` class.
.. math::
Agent\ Score = \sum_{i=0}^{N-1}{ 1[Agent\ Infected]}
where N refers to the total number of simulation runs.
The Algorithm Parameter File in the vd_config file must contain the parameters states and num_runs.
'states' refers to the states in the simulation that are infectious or post-infectious
'num_runs' refers to the total number of simulation runs
An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below
.. code-block:: text
:caption: vd_config.txt
:linenos:
VD Target <Agent>
VD Algorithm <SimpleVulnerableAgent>
Algorithm Parameter File <parameter.json>
Pre Processing <>
Post Processing <>
Output Mode <Default>
.. code-block:: json
:caption: parameter.json
:linenos:
{
"states":["Infected","Recovered"],
"num_runs":100
}
Args:
world_obj: World object of simulation
parameter_dict: Dictionary of parameters relevant to the algorithm
"""
def __init__(self, world_obj: World,
parameter_dict: Dict[str, Union[List[str], int]]):
super().__init__(world_obj)
self.states: List[str] = parameter_dict['states']
self.num_runs: int = parameter_dict['num_runs']
self.init_scores()
[docs] def update_agent_scores(self, agents_obj: ReadAgents) -> None:
"""
Function to update the agent scores.
Args:
agents_obj: ReadAgents object
"""
for agent in agents_obj.agents.values():
if agent.state in self.states:
self.agent_scores[agent.index] += 1
[docs] def one_run(self) -> None:
"""
Executes a single run of the detection module.
"""
_, agents_obj, _ = self.world_obj.one_world()
self.update_agent_scores(agents_obj)
[docs] def run_detection(self) -> None:
"""
Function to run the complete detection module.
"""
for i in range(self.num_runs):
if (i + 1) % (self.num_runs / 10) == 0:
print('Iteration running : ', i + 1)
self.one_run()
[docs]class EarlyVulnerableAgent(VulnerableAgent):
r"""
Class that implements the Early Vulnerable Agent module. This class scores the agent based on the number of times
he was infected in multiple simulations and also given a higher score if he was infected earlier.
Inherits :class:`~episimmer.vulnerability_detection.base.VulnerableAgent` class.
.. math::
Agent\ Score = \sum_{i=0}^{N-1}(Total\ Timesteps - Timestep\ Infected)
where N refers to the total number of simulation runs. Note that if the agent was not infected, his score for
that run would be 0.
The Algorithm Parameter File in the vd_config file must contain the parameters states and num_runs.
'states' refers to the states in the simulation that are infectious or post-infectious
'num_runs' refers to the total number of simulation runs
An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below
.. code-block:: text
:caption: vd_config.txt
:linenos:
VD Target <Agent>
VD Algorithm <EarlyVulnerableAgent>
Algorithm Parameter File <parameter.json>
Pre Processing <>
Post Processing <>
Output Mode <Default>
.. code-block:: json
:caption: parameter.json
:linenos:
{
"states":["Infected","Recovered"],
"num_runs":100
}
Args:
world_obj: World object of simulation
parameter_dict: Dictionary of parameters relevant to the algorithm
"""
def __init__(self, world_obj: World,
parameter_dict: Dict[str, Union[List[str], int]]):
super().__init__(world_obj)
self.states: List[str] = parameter_dict['states']
self.num_runs: int = parameter_dict['num_runs']
self.agent_start_infection_step: Dict[str, Union[int, None]] = {}
self.init_scores()
[docs] def reset_start_infection(self, agents_obj: ReadAgents) -> None:
"""
Resets each agent's start infection step dictionary
Args:
agents_obj: ReadAgents object
"""
for agent_index in agents_obj.agents.keys():
self.agent_start_infection_step[agent_index] = None
[docs] def update_agent_scores(self, agents_obj: ReadAgents) -> None:
"""
Function to update the agent scores.
Args:
agents_obj: ReadAgents object
"""
total_time_steps = self.world_obj.config_obj.time_steps
for agent in agents_obj.agents.values():
if self.agent_start_infection_step[agent.index] is not None:
self.agent_scores[
agent.
index] += total_time_steps - self.agent_start_infection_step[
agent.index]
[docs] def update_agent_start_infection(self, agents_obj: ReadAgents,
time_step: int) -> None:
"""
Updates the agents start infection time step
Args:
agents_obj: ReadAgents object
time_step: Current time step
"""
for agent in agents_obj.agents.values():
if agent.state in self.states and self.agent_start_infection_step[
agent.index] is None:
self.agent_start_infection_step[agent.index] = time_step
[docs] def one_run(
self) -> Tuple[Dict[str, List[int]], ReadAgents, ReadLocations]:
"""
Executes a single run of the module.
Returns:
The end_state, agents object and locations object
"""
time_steps = self.world_obj.config_obj.time_steps
Time.new_world()
agents_obj = ReadAgents(self.world_obj.agents_filename,
self.world_obj.config_obj)
self.reset_start_infection(agents_obj)
locations_obj = ReadLocations(self.world_obj.locations_filename,
self.world_obj.config_obj)
one_time_event_obj = ReadOneTimeEvents(
self.world_obj.one_time_event_file)
sim_obj = Simulate(self.world_obj.config_obj, self.world_obj.model,
self.world_obj.policy_list, agents_obj,
locations_obj)
sim_obj.on_start_simulation()
for current_time_step in range(time_steps):
sim_obj.on_start_time_step(
self.world_obj.interaction_files_list,
self.world_obj.event_files_list,
self.world_obj.probabilistic_interaction_files_list,
one_time_event_obj)
sim_obj.handle_time_step_for_all_agents()
sim_obj.end_time_step()
self.update_agent_start_infection(agents_obj, current_time_step)
Time.increment_current_time_step()
self.update_agent_scores(agents_obj)
end_state = sim_obj.end_simulation()
return end_state, agents_obj, locations_obj
[docs] def run_detection(self) -> None:
"""
Function to run the complete detection module.
"""
for i in range(self.num_runs):
if (i + 1) % (self.num_runs / 10) == 0:
print('Iteration running : ', i + 1)
self.one_run()
[docs]class SimpleAgentVulnerability(AgentVulnerability):
r"""
Class that implements the Simple Agent Vulnerability module. This class scores the agent based on the severity of
outbreak after the agent's removal. Multiple simulations are run by randomly selecting and removing an agent
and running the simulation. The agent score is calculated by finding the running average of scores for each run.
Inherits :class:`~episimmer.vulnerability_detection.base.AgentVulnerability` class.
.. math::
Agent\ Score = 1 - \frac{Total\ Agents\ Infected}{Total\ Agents}
The Algorithm Parameter File in the vd_config file must contain the parameters states, num_runs and
num_agents_to_remove.
'states' refers to the states in the simulation that are infectious or post-infectious
'num_runs' refers to the total number of simulation runs
'num_agents_to_remove' refers to the number of agents to be removed
An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below
.. code-block:: text
:caption: vd_config.txt
:linenos:
VD Target <Agent>
VD Algorithm <SimpleAgentVulnerability>
Algorithm Parameter File <parameter.json>
Pre Processing <>
Post Processing <>
Output Mode <Default>
.. code-block:: json
:caption: parameter.json
:linenos:
{
"states":["Infected","Recovered"],
"num_runs":100,
"num_agents_to_remove":1
}
Args:
world_obj: World object of simulation
parameter_dict: Dictionary of parameters relevant to the algorithm
"""
def __init__(self, world_obj: World,
parameter_dict: Dict[str, Union[List[str], int]]):
super().__init__(world_obj)
self.states: List[str] = parameter_dict['states']
self.num_runs: int = parameter_dict['num_runs']
self.num_agents_to_remove: int = parameter_dict['num_agents_to_remove']
self.agents_to_remove: Union[List[str], None] = None
self.agent_counts: Dict[str, float] = {}
self.init_scores()
[docs] def init_scores(self) -> None:
"""
Initialises scores and counts for all agents with value 0.0 and 0 respectively.
"""
agents_obj = ReadAgents(self.world_obj.agents_filename,
self.world_obj.config_obj)
for agent_index in agents_obj.agents.keys():
self.agent_scores[agent_index] = 0.0
self.agent_counts[agent_index] = 0
[docs] def remove_agents(self, agents_obj: ReadAgents,
num_agents_to_remove: int) -> None:
"""
Function used to remove agents from the list of valid agents.
Args:
agents_obj: ReadAgents object
num_agents_to_remove: Number of agents to remove each simulation
"""
agents = list(agents_obj.agents)
self.agents_to_remove = random.sample(agents, num_agents_to_remove)
for agent in self.agents_to_remove:
agents_obj.agents.pop(agent)
self.agent_counts[agent] += 1
[docs] def update_agent_scores(self, end_state: Dict[str, List[int]]) -> None:
"""
Function to update the agent scores.
Args:
end_state: Dictionary mapping states to time step wise population
"""
agent_score = 0
for state in self.states:
agent_score += end_state[state][-1]
total_agents = 0
for key in end_state:
total_agents += end_state[key][-1]
score = 1.0 - agent_score / total_agents
for agent in self.agents_to_remove:
# self.agent_scores[agent] += score
self.agent_scores[agent] += 1.0 / (self.agent_counts[agent]) * (
score - self.agent_scores[agent])
[docs] def one_run(self) -> None:
"""
Executes a single run of the detection module.
"""
Time.new_world()
time_steps = self.world_obj.config_obj.time_steps
agents_obj = ReadAgents(self.world_obj.agents_filename,
self.world_obj.config_obj)
if self.num_agents_to_remove > 0:
self.remove_agents(agents_obj, self.num_agents_to_remove)
locations_obj = ReadLocations(self.world_obj.locations_filename,
self.world_obj.config_obj)
one_time_event_obj = ReadOneTimeEvents(
self.world_obj.one_time_event_file)
sim_obj = Simulate(self.world_obj.config_obj, self.world_obj.model,
self.world_obj.policy_list, agents_obj,
locations_obj)
sim_obj.on_start_simulation()
for current_time_step in range(time_steps):
sim_obj.on_start_time_step(
self.world_obj.interaction_files_list,
self.world_obj.event_files_list,
self.world_obj.probabilistic_interaction_files_list,
one_time_event_obj)
sim_obj.handle_time_step_for_all_agents()
sim_obj.end_time_step()
Time.increment_current_time_step()
end_state = sim_obj.end_simulation()
self.update_agent_scores(end_state)
[docs] def run_detection(self) -> None:
"""
Function to run the complete detection module.
"""
for i in range(self.num_runs):
if (i + 1) % (self.num_runs / 10) == 0:
print('Iteration running : ', i + 1)
self.one_run()
[docs]class ChunkAgentVulnerability(AgentVulnerability):
r"""
Class that implements the Chunk Agent Vulnerability module. This class scores the agent based on the severity of
outbreak after the agent's removal. Multiple simulations are run by randomly selecting and removing an agent
and running the simulation. The agent score is calculated by finding the running average of scores for each run.
For higher stability, chunks of the entire simulation are considered instead of the entire simulation.
Inherits :class:`~episimmer.vulnerability_detection.base.AgentVulnerability` class.
.. math::
Agent\ Score = \sum_{i=0}^{N-1}((Agents\ Infected\ at\ timestep\ \gamma) - (Agents\
Infected\ at\ timestep\ \beta))
where :math:`\beta` is a randomly generated integer based on the user defined parameters for chunk length, and
:math:`\gamma` = :math:`\beta` + :math:`chunk\ length` - 1. N refers to the total number of simulation runs.
The Algorithm Parameter File in the vd_config file must contain the parameters states, num_runs,
num_agents_to_remove and chunk_len_range.
'states' refers to the states in the simulation that are infectious or post-infectious
'num_runs' refers to the total number of simulation runs
'num_agents_to_remove' refers to the number of agents to be removed
'chunk_len_range' refers to the range of time steps the simulation should be run (default = [1,50])
An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below
.. code-block:: text
:caption: vd_config.txt
:linenos:
VD Target <Agent>
VD Algorithm <ChunkAgentVulnerability>
Algorithm Parameter File <parameter.json>
Pre Processing <>
Post Processing <>
Output Mode <Default>
.. code-block:: json
:caption: parameter.json
:linenos:
{
"states":["Infected","Recovered"],
"num_runs":100,
"num_agents_to_remove":1,
"chunk_len_range":[1,30]
}
Args:
world_obj: World object of simulation
parameter_dict: Dictionary of parameters relevant to the algorithm
"""
def __init__(self, world_obj: World,
parameter_dict: Dict[str, Union[List[str], int, List[int]]]):
super().__init__(world_obj)
self.states: List[str] = parameter_dict['states']
self.num_runs: int = parameter_dict['num_runs']
self.num_agents_to_remove: int = parameter_dict['num_agents_to_remove']
self.agents_to_remove: Union[List[str], None] = None
self.time_steps: int = self.world_obj.config_obj.time_steps
self.start_agent_score: Union[float, None] = None
self.end_agent_score: Union[float, None] = None
chunk_len_range = []
if ('chunk_len_range' not in parameter_dict.keys()
or not parameter_dict['chunk_len_range']):
chunk_len_range = [1, 50]
else:
chunk_len_range = parameter_dict['chunk_len_range']
self.range_list: List[int] = list(
range(int(chunk_len_range[0]),
int(chunk_len_range[1]) + 1))
self.init_scores()
[docs] def remove_agents(self, agents_obj: ReadAgents,
num_agents_to_remove: int) -> None:
"""
Function used to remove agents from the list of valid agents.
Args:
agents_obj: ReadAgents object
num_agents_to_remove: Number of agents to remove each simulation
"""
agents = list(agents_obj.agents)
self.agents_to_remove = random.sample(agents, num_agents_to_remove)
for agent in self.agents_to_remove:
agents_obj.agents.pop(agent)
[docs] def reset_world(self) -> None:
"""
Initialisations to be performed each world.
"""
self.start_agent_score = None
self.end_agent_score = None
[docs] def get_scores(self, agents_obj: ReadAgents) -> float:
"""
Returns the agent score based on the agent states
Args:
agents_obj: ReadAgents object
Returns:
The agent score
"""
score = 0.0
for agent in agents_obj.agents.values():
for state in self.states:
if agent.state == state:
score += 1
break
return score
[docs] def update_agent_scores(self, agents_obj: ReadAgents, time_step: int,
mode: str) -> None:
"""
Function to update the agent scores.
Args:
agents_obj: ReadAgents object
time_step: Current time step
mode: Mode decides whether the simulation is at its start or end
"""
if mode == 'start':
self.start_agent_score = self.get_scores(agents_obj)
elif mode == 'end':
self.end_agent_score = self.get_scores(agents_obj)
for agent in self.agents_to_remove:
self.agent_scores[
agent] += self.end_agent_score - self.start_agent_score
[docs] def one_run(self, start: int, end: int) -> None:
"""
Executes a single run of the module from start to end time step.
Args:
start: Start time step where simulation must be considered
end: End time step where simulation must be ended
"""
Time.new_world()
agents_obj = ReadAgents(self.world_obj.agents_filename,
self.world_obj.config_obj)
self.reset_world()
if self.num_agents_to_remove > 0:
self.remove_agents(agents_obj, self.num_agents_to_remove)
locations_obj = ReadLocations(self.world_obj.locations_filename,
self.world_obj.config_obj)
one_time_event_obj = ReadOneTimeEvents(
self.world_obj.one_time_event_file)
sim_obj = Simulate(self.world_obj.config_obj, self.world_obj.model,
self.world_obj.policy_list, agents_obj,
locations_obj)
sim_obj.on_start_simulation()
flag = 0
for current_time_step in range(self.time_steps):
sim_obj.on_start_time_step(
self.world_obj.interaction_files_list,
self.world_obj.event_files_list,
self.world_obj.probabilistic_interaction_files_list,
one_time_event_obj)
sim_obj.handle_time_step_for_all_agents()
sim_obj.end_time_step()
if current_time_step == start:
self.update_agent_scores(agents_obj, current_time_step,
'start')
flag += 1
if current_time_step == end:
self.update_agent_scores(agents_obj, current_time_step, 'end')
flag += 1
break
Time.increment_current_time_step()
sim_obj.end_simulation()
assert flag == 2
[docs] def generate_bounds(self) -> Tuple[int, int]:
"""
Returns the start and end time steps for a simulation
Returns:
Start and End time steps.
"""
chunk_len = random.choice(self.range_list)
if chunk_len <= 0:
raise ValueError(
'Inappropriate chunk len range provided. Please provide positive ranges.'
)
if chunk_len >= self.time_steps:
return 0, self.time_steps - 1
start_list = list(range(self.time_steps - chunk_len + 1))
start = random.choice(start_list)
end = start + chunk_len - 1
return start, end
[docs] def run_detection(self) -> None:
"""
Function to run the complete detection module.
"""
for i in range(self.num_runs):
if (i + 1) % (self.num_runs / 10) == 0:
print('Iteration running : ', i + 1)
start, end = self.generate_bounds()
if start is None:
continue
self.one_run(start, end)
[docs] def get_maximum_agent_vulnerability(self, n: int) -> Dict[str, float]:
"""
Function to return the n maximum agent vulnerabilities as a dictionary.
Args:
n: Number of elements to be returned
Returns:
A dictionary mapping agent keys to agent scores. Scores are ordered in ascendinng order
"""
res_max = {
key: round(value, 4)
for key, value in sorted(self.agent_scores.items(),
key=lambda x: (x[1], x[0]))[:n]
}
return res_max
[docs] def get_minimum_agent_vulnerability(self, n: int) -> Dict[str, float]:
"""
Function to return the n minimum agent vulnerabilities as a dictionary.
Args:
n: Number of elements to be returned
Returns:
A dictionary mapping agent keys to agent scores. Scores are ordered in descending order
"""
res_min = {
key: round(value, 4)
for key, value in sorted(self.agent_scores.items(),
key=lambda x: (x[1], x[0]),
reverse=True)[:n]
}
return res_min
[docs]class BanditAlgos(AgentVulnerability):
r"""
Class that implements the Agent Vulnerability module with Bandit Algorithms.
Inherits :class:`~episimmer.vulnerability_detection.base.AgentVulnerability` class.
We first select an agent for removal based on either epsilon greedy or the UCB1 algorithm, then we run the
simulation.
**I) Agent Selection**
*Epsilon Greedy*
Random selection of agent done with probability :math:`\epsilon` and greedy (highest scoring agent) selection of
agent with probability 1 - :math:`\epsilon`.
*UCB1*
Selection of agent based on equation -
.. math::
\arg \max_{a} \bigg( running\ avg\ score_{a} + \sqrt{\frac{2 \log{t}}{N_t(a)}} \bigg)
where :math:`a` represents an agent and :math:`N_t(a)` represents the number of times agent :math:`a` has been
selected for removal.
**II) Agent Score**
After removal, the simulation is run to get the score of the removed agent. Score of the agent is based on the
severity of outbreak after the agent's removal.
The agent score is calculated by finding the running average of scores for each run. The score for a single run is
given by -
.. math::
Agent\ Score = 1 - \frac{Total\ Agents\ Infected}{Total\ Agents}
The Algorithm Parameter File in the vd_config file must contain the parameters 'states', 'num_runs',
and 'mode'.
'states' refers to the states in the simulation that are infectious or post-infectious
'num_runs' refers to the total number of simulation runs
'mode' refers to the mode or algorithm to be used to select agent. Two options are present - 'EPS' or epsilon
greedy and 'UCB' or Upper Confidence Bound algorithm. (default = 'EPS')
An example of the vd_config.txt file and the algorithm parameter file (parameter.json) is given below
.. code-block:: text
:caption: vd_config.txt
:linenos:
VD Target <Agent>
VD Algorithm <BanditAlgos>
Algorithm Parameter File <parameter.json>
Pre Processing <>
Post Processing <>
Output Mode <Default>
.. code-block:: json
:caption: parameter.json
:linenos:
{
"states":["Infected","Recovered"],
"num_runs":100,
"mode":"UCB"
}
Args:
world_obj: World object of simulation
parameter_dict: Dictionary of parameters relevant to the algorithm
"""
def __init__(self, world_obj: World,
parameter_dict: Dict[str, Union[List[str], int, str]]):
super().__init__(world_obj)
self.states: List[str] = parameter_dict['states']
self.num_runs: int = parameter_dict['num_runs']
if 'mode' not in parameter_dict.keys() or not parameter_dict['mode']:
self.mode: str = 'EPS'
else:
self.mode = parameter_dict['mode']
self.agent_counts: Dict[int, int] = {}
self.eps: float = 0.01
self.rm_agent: Union[int, None] = None
self.t: Union[int, None] = None
self.init_scores()
[docs] def init_scores(self) -> None:
"""
Initialises agent scores to the score from one run of the algorithm without the said agent. Agent counts is
also initialised to 0.
"""
agents_obj = ReadAgents(self.world_obj.agents_filename,
self.world_obj.config_obj)
for agent_index in agents_obj.agents.keys():
# self.agent_scores[int(agent_index)] = 1.0
self.agent_scores[int(agent_index)] = self.get_init_score(
agent_index)
self.agent_counts[int(agent_index)] = 0
[docs] def get_init_score(self, agent_index: str) -> float:
"""
Returns the agent's score by running one simulation without the agent
Args:
agent_index: Current agent's index
Returns:
Score of the agent
"""
agents_obj = ReadAgents(self.world_obj.agents_filename,
self.world_obj.config_obj)
agents_obj.agents.pop(agent_index)
end_state, _, _ = self.one_run_helper(agents_obj)
return self.get_score(end_state)
[docs] def remove_agents(self, agents_obj: ReadAgents) -> None:
"""
Function to remove an agent
Args:
agents_obj: ReadAgents object
"""
agents = list(agents_obj.agents)
if self.mode == 'EPS':
if random.random() < self.eps:
self.rm_agent = random.randint(0, len(agents) - 1)
else:
self.rm_agent = max(range(len(agents)),
key=lambda x: self.agent_scores[x])
elif self.mode == 'UCB':
self.rm_agent = max(range(len(agents)),
key=lambda x: self.agent_scores[x] + np.
sqrt(2 * np.log(self.t) /
(1 + self.agent_counts[x])))
else:
raise Exception('Enter Valid mode.')
agents_obj.agents.pop(str(self.rm_agent))
self.agent_counts[self.rm_agent] += 1
[docs] def get_score(self, end_state: Dict[str, List[int]]) -> float:
"""
Function to return agent's score
Args:
end_state: Dictionary mapping states to time step wise population
Returns:
Score of the agent
"""
agent_score = 0
for state in self.states:
agent_score += end_state[state][-1]
total_agents = 0
for key in end_state:
total_agents += end_state[key][-1]
score = 1.0 - agent_score / total_agents
return score
[docs] def update_agent_scores(self, end_state: Dict[str, List[int]]) -> None:
"""
Function to update the agent scores.
Args:
end_state: Dictionary mapping states to time step wise population
"""
score = self.get_score(end_state)
self.agent_scores[self.rm_agent] += 1.0 / (self.agent_counts[
self.rm_agent]) * (score - self.agent_scores[self.rm_agent])
[docs] def one_run_helper(
self, agents_obj: ReadAgents
) -> Tuple[Dict[str, List[int]], ReadAgents, ReadLocations]:
"""
One Run helper
Args:
agents_obj: ReadAgents object
Returns:
The end_state, agents object and locations object
"""
Time.new_world()
time_steps = self.world_obj.config_obj.time_steps
locations_obj = ReadLocations(self.world_obj.locations_filename,
self.world_obj.config_obj)
one_time_event_obj = ReadOneTimeEvents(
self.world_obj.one_time_event_file)
sim_obj = Simulate(self.world_obj.config_obj, self.world_obj.model,
self.world_obj.policy_list, agents_obj,
locations_obj)
sim_obj.on_start_simulation()
for current_time_step in range(time_steps):
sim_obj.on_start_time_step(
self.world_obj.interaction_files_list,
self.world_obj.event_files_list,
self.world_obj.probabilistic_interaction_files_list,
one_time_event_obj)
sim_obj.handle_time_step_for_all_agents()
sim_obj.end_time_step()
Time.increment_current_time_step()
end_state = sim_obj.end_simulation()
return end_state, agents_obj, locations_obj
[docs] def one_run(self) -> None:
"""
Executes a single run of the detection module.
"""
agents_obj = ReadAgents(self.world_obj.agents_filename,
self.world_obj.config_obj)
self.remove_agents(agents_obj)
end_state, _, _ = self.one_run_helper(agents_obj)
self.update_agent_scores(end_state)
[docs] def run_detection(self) -> None:
"""
Function to run the complete detection module.
"""
for i in range(self.num_runs):
if (i + 1) % (self.num_runs / 10) == 0:
print('Iteration running : ', i + 1)
self.t = i + 1
self.one_run()
[docs] def print_default_output(self, n: int) -> None:
"""
Prints the n maximum agent scores.
Args:
n: Number of elements to be printed
"""
print(self.get_max_score_agents(n))