Skip to content

test_setups

analyse_scale_robustness(all_histories, multipliers)

Function to generate an analysis of a set of simulation tests with different multipliers applied in the environment. It returns a pandas dataframe summarizing the results for each multiplier pairs. The results analyzed are the following:

  • convergence
  • steps taken
  • discounted rewards
  • extra steps taken (compared to a minimum path)
  • t min over t (a ratio of how optimal the path taken was)

For each result, the mean, standard deviation along with the mean and standard deviation of the successful trajectories are recorded.

Parameters:

Name Type Description Default
all_histories list[SimulationHistory]

A list of all the simulation histories to summarize

required
multipliers ndarray

An array of the multiplier pairs used (for the y multiplier then the x multiplier)

required

Returns:

Name Type Description
df DataFrame

The analysis dataframe.

Source code in olfactory_navigation/test_setups.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def analyse_scale_robustness(all_histories: list[SimulationHistory],
                             multipliers: np.ndarray
                             ) -> pd.DataFrame:
    '''
    Function to generate an analysis of a set of simulation tests with different multipliers applied in the environment.
    It returns a pandas dataframe summarizing the results for each multiplier pairs.
    The results analyzed are the following:

    - convergence
    - steps taken
    - discounted rewards
    - extra steps taken (compared to a minimum path)
    - t min over t (a ratio of how optimal the path taken was)

    For each result, the mean, standard deviation along with the mean and standard deviation of the successful trajectories are recorded. 

    Parameters
    ----------
    all_histories : list[SimulationHistory]
        A list of all the simulation histories to summarize
    multipliers : np.ndarray
        An array of the multiplier pairs used (for the y multiplier then the x multiplier)

    Returns
    -------
    df : pd.DataFrame
        The analysis dataframe.
    '''
    rows = []
    # For each simulation history and multiplier, the analysis dataframe is extracted
    for hist, multiplier in zip(all_histories, multipliers):
        df = hist.analysis_df

        # Then the summarized metrics are collapsed on a single row
        col_metric_dict = {'multiplier': int(multiplier)}
        for col in ['converged', 'reached_horizon', 'steps_taken', 'discounted_rewards', 'extra_steps', 't_min_over_t']:
            for metric in ['mean', 'standard_deviation', 'success_mean', 'success_standard_deviation']:
                col_metric_dict[f'{col}_{metric}'] = df.loc[metric, col]

        rows.append(col_metric_dict)

    # Creating the dataframe from all the rows
    df = pd.DataFrame(rows)

    # Removal of 4 unnecessary columns
    df = df.drop(columns=['converged_success_mean',
                          'converged_success_standard_deviation',
                          'reached_horizon_success_mean',
                          'reached_horizon_success_standard_deviation'])

    return df

analyse_shape_robustness(all_histories, multipliers)

Function to generate an analysis of a set of simulation tests with different multipliers applied in the environment. It returns a pandas dataframe summarizing the results for each multiplier pairs. The results analyzed are the following:

  • convergence
  • steps taken
  • discounted rewards
  • extra steps taken (compared to a minimum path)
  • t min over t (a ratio of how optimal the path taken was)

For each result, the mean, standard deviation along with the mean and standard deviation of the successful trajectories are recorded.

Parameters:

Name Type Description Default
all_histories list[SimulationHistory]

A list of all the simulation histories to summarize

required
multipliers ndarray

An array of the multiplier pairs used (for the y multiplier then the x multiplier)

required

Returns:

Name Type Description
df DataFrame

The analysis dataframe.

Source code in olfactory_navigation/test_setups.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def analyse_shape_robustness(all_histories: list[SimulationHistory],
                             multipliers: np.ndarray
                             ) -> pd.DataFrame:
    '''
    Function to generate an analysis of a set of simulation tests with different multipliers applied in the environment.
    It returns a pandas dataframe summarizing the results for each multiplier pairs.
    The results analyzed are the following:

    - convergence
    - steps taken
    - discounted rewards
    - extra steps taken (compared to a minimum path)
    - t min over t (a ratio of how optimal the path taken was)

    For each result, the mean, standard deviation along with the mean and standard deviation of the successful trajectories are recorded. 

    Parameters
    ----------
    all_histories : list[SimulationHistory]
        A list of all the simulation histories to summarize
    multipliers : np.ndarray
        An array of the multiplier pairs used (for the y multiplier then the x multiplier)

    Returns
    -------
    df : pd.DataFrame
        The analysis dataframe.
    '''
    rows = []
    # For each simulation history and multiplier, the analysis dataframe is extracted
    for hist, multiplier_pair in zip(all_histories, multipliers):
        df = hist.analysis_df

        # Then the summarized metrics are collapsed on a single row
        col_metric_dict = {'y_multiplier': multiplier_pair[0].astype(int), 'x_multiplier': multiplier_pair[1].astype(int)}
        for col in ['converged', 'reached_horizon', 'steps_taken', 'discounted_rewards', 'extra_steps', 't_min_over_t']:
            for metric in ['mean', 'standard_deviation', 'success_mean', 'success_standard_deviation']:
                col_metric_dict[f'{col}_{metric}'] = df.loc[metric, col]

        rows.append(col_metric_dict)

    # Creating the dataframe from all the rows
    df = pd.DataFrame(rows)

    # Removal of 4 unnecessary columns
    df = df.drop(columns=['converged_success_mean',
                          'converged_success_standard_deviation',
                          'reached_horizon_success_mean',
                          'reached_horizon_success_standard_deviation'])

    return df

run_all_starts_test(agent, environment=None, time_shift=0, time_loop=True, horizon=1000, skip_initialization=False, reward_discount=0.99, print_progress=True, print_stats=True, use_gpu=False)

Function to run a test with all the available starting positions based on the environment provided (or the environmnent of the agent).

Parameters:

Name Type Description Default
agent Agent

The agent to be tested

required
environment Environment

The environment to run the simulations in. By default, the environment linked to the agent will used. This parameter is intended if the environment needs to be modified compared to environment the agent was trained on.

None
time_shift int or ndarray

The time at which to start the olfactory simulation array. It can be either a single value, or n values.

0
time_loop bool

Whether to loop the time if reaching the end. (starts back at 0)

True
horizon int

The amount of steps to run the simulation for before killing the remaining simulations.

1000
skip_initialization bool

Whether to skip the initialization of the agent. This is to be used in case the agent is initialized in some custom manner beforehand.

False
reward_discount float

How much a given reward is discounted based on how long it took to get it. It is purely used to compute the Average Discount Reward (ADR) after the simulation.

0.99
print_progress bool

Wheter to show a progress bar of what step the simulations are at.

True
print_stats bool

Wheter to print the stats at the end of the run.

True
use_gpu bool

Whether to run the simulations on the GPU or not.

False

Returns:

Name Type Description
hist SimulationHistory

A SimulationHistory object that tracked all the positions, actions and observations.

Source code in olfactory_navigation/test_setups.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def run_all_starts_test(
             agent: Agent,
             environment: Environment | None = None,
             time_shift: int | np.ndarray = 0,
             time_loop: bool = True,
             horizon: int = 1000,
             skip_initialization: bool = False,
             reward_discount: float = 0.99,
             print_progress: bool = True,
             print_stats: bool = True,
             use_gpu: bool = False
             ) -> SimulationHistory:
    '''
    Function to run a test with all the available starting positions based on the environment provided (or the environmnent of the agent).

    Parameters
    ----------
    agent : Agent
        The agent to be tested
    environment : Environment, optional
        The environment to run the simulations in.
        By default, the environment linked to the agent will used.
        This parameter is intended if the environment needs to be modified compared to environment the agent was trained on.
    time_shift : int or np.ndarray, default=0
        The time at which to start the olfactory simulation array.
        It can be either a single value, or n values.
    time_loop : bool, default=True
        Whether to loop the time if reaching the end. (starts back at 0)
    horizon : int, default=1000
        The amount of steps to run the simulation for before killing the remaining simulations.
    skip_initialization : bool, default=False
        Whether to skip the initialization of the agent. This is to be used in case the agent is initialized in some custom manner beforehand.
    reward_discount : float, default=0.99
        How much a given reward is discounted based on how long it took to get it.
        It is purely used to compute the Average Discount Reward (ADR) after the simulation.
    print_progress : bool, default=True
        Wheter to show a progress bar of what step the simulations are at.
    print_stats : bool, default=True
        Wheter to print the stats at the end of the run.
    use_gpu : bool, default=False
        Whether to run the simulations on the GPU or not.

    Returns
    -------
    hist : SimulationHistory
        A SimulationHistory object that tracked all the positions, actions and observations.
    '''
    # Handle the case an specific environment is given
    environment_provided = environment is not None
    if environment_provided:
        assert environment.shape == agent.environment.shape, "The provided environment's shape doesn't match the environment has been trained on..."
    else:
        environment = agent.environment

    # Gathering starting points
    start_points = np.argwhere(environment.start_probabilities > 0)
    n = len(start_points)

    return run_test(
        agent=agent,
        n=n,
        start_points=start_points,
        environment=environment if environment_provided else None,
        time_shift=time_shift,
        time_loop=time_loop,
        horizon=horizon,
        skip_initialization=skip_initialization,
        reward_discount=reward_discount,
        print_progress=print_progress,
        print_stats=print_stats,
        use_gpu=use_gpu
    )

run_n_by_cell_test(agent, cell_width=10, n_by_cell=10, environment=None, time_shift=0, time_loop=True, horizon=1000, skip_initialization=False, reward_discount=0.99, print_progress=True, print_stats=True, use_gpu=False)

Function to run a test with simulations starting in different cells across the available starting zones. A number n_by_cell determines how many simulations should start within each cell (the same position can be chosen multiple times).

Parameters:

Name Type Description Default
agent Agent

The agent to be tested

required
cell_width int

The size of the sides of each cells to be considered.

10
n_by_cell int

How many simulations should start within each cell.

10
environment Environment

The environment to run the simulations in. By default, the environment linked to the agent will used. This parameter is intended if the environment needs to be modified compared to environment the agent was trained on.

None
time_shift int or ndarray

The time at which to start the olfactory simulation array. It can be either a single value, or n values.

0
time_loop bool

Whether to loop the time if reaching the end. (starts back at 0)

True
horizon int

The amount of steps to run the simulation for before killing the remaining simulations.

1000
skip_initialization bool

Whether to skip the initialization of the agent. This is to be used in case the agent is initialized in some custom manner beforehand.

False
reward_discount float

How much a given reward is discounted based on how long it took to get it. It is purely used to compute the Average Discount Reward (ADR) after the simulation.

0.99
print_progress bool

Wheter to show a progress bar of what step the simulations are at.

True
print_stats bool

Wheter to print the stats at the end of the run.

True
use_gpu bool

Whether to run the simulations on the GPU or not.

False

Returns:

Name Type Description
hist SimulationHistory

A SimulationHistory object that tracked all the positions, actions and observations.

Source code in olfactory_navigation/test_setups.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def run_n_by_cell_test(
             agent: Agent,
             cell_width: int = 10,
             n_by_cell: int = 10,
             environment: Environment | None = None,
             time_shift: int | np.ndarray = 0,
             time_loop: bool = True,
             horizon: int = 1000,
             skip_initialization: bool = False,
             reward_discount: float = 0.99,
             print_progress: bool = True,
             print_stats: bool = True,
             use_gpu: bool = False
             ) -> SimulationHistory:
    '''
    Function to run a test with simulations starting in different cells across the available starting zones.
    A number n_by_cell determines how many simulations should start within each cell (the same position can be chosen multiple times).

    Parameters
    ----------
    agent : Agent
        The agent to be tested
    cell_width : int, default=10
        The size of the sides of each cells to be considered.
    n_by_cell : int, default=10
        How many simulations should start within each cell.
    environment : Environment, optional
        The environment to run the simulations in.
        By default, the environment linked to the agent will used.
        This parameter is intended if the environment needs to be modified compared to environment the agent was trained on.
    time_shift : int or np.ndarray, default=0
        The time at which to start the olfactory simulation array.
        It can be either a single value, or n values.
    time_loop : bool, default=True
        Whether to loop the time if reaching the end. (starts back at 0)
    horizon : int, default=1000
        The amount of steps to run the simulation for before killing the remaining simulations.
    skip_initialization : bool, default=False
        Whether to skip the initialization of the agent. This is to be used in case the agent is initialized in some custom manner beforehand.
    reward_discount : float, default=0.99
        How much a given reward is discounted based on how long it took to get it.
        It is purely used to compute the Average Discount Reward (ADR) after the simulation.
    print_progress : bool, default=True
        Wheter to show a progress bar of what step the simulations are at.
    print_stats : bool, default=True
        Wheter to print the stats at the end of the run.
    use_gpu : bool, default=False
        Whether to run the simulations on the GPU or not.

    Returns
    -------
    hist : SimulationHistory
        A SimulationHistory object that tracked all the positions, actions and observations.
    '''
    # Handle the case an specific environment is given
    environment_provided = environment is not None
    if environment_provided:
        assert environment.shape == agent.environment.shape, "The provided environment's shape doesn't match the environment has been trained on..."
    else:
        environment = agent.environment

    # Gathering starting points
    cells_x = int(environment.shape[0] / cell_width)
    cells_y = int(environment.shape[1] / cell_width)

    indices = np.arange(np.prod(environment.shape), dtype=int)
    indices_grid = indices.reshape(environment.shape)
    all_chosen_indices = []

    for i in range(cells_x):
        for j in range(cells_y):
            cell_probs = environment.start_probabilities[(i*cell_width):(i*cell_width)+cell_width, (j*cell_width):(j*cell_width)+cell_width]
            if np.any(cell_probs > 0):
                cell_indices = indices_grid[(i*cell_width):(i*cell_width)+cell_width, (j*cell_width):(j*cell_width)+cell_width]
                cell_probs /= np.sum(cell_probs)

                chosen_indices = np.random.choice(cell_indices.ravel(), size=n_by_cell, replace=True, p=cell_probs.ravel()).tolist()
                all_chosen_indices += chosen_indices

    n = len(all_chosen_indices)
    start_points = np.array(np.unravel_index(all_chosen_indices, environment.shape)).T

    return run_test(
        agent=agent,
        n=n,
        start_points=start_points,
        environment=environment if environment_provided else None,
        time_shift=time_shift,
        time_loop=time_loop,
        horizon=horizon,
        skip_initialization=skip_initialization,
        reward_discount=reward_discount,
        print_progress=print_progress,
        print_stats=print_stats,
        use_gpu=use_gpu
    )

test_scale_robustness(agent, skip_initialization=False, step_percentage=20, min_percentage=20, max_percentage=200, multipliers=None, use_gpu=False, print_progress=True, print_stats=True, save=True, save_folder=None, save_analysis=True)

Function to test the robustness of an agent in a environment where the scale of the environment's shape is altered by some percentage.

A list of multipliers will be constructed from the min_percentage to 100% and up to max_percentage values with between each percentage step_percentage values. These percentage multipliers will be applied both in the x and y direction but cropped to the largest allowed multiplier along each axis.

This complete test consists in running from all possible start positions of the original environment.

Parameters:

Name Type Description Default
agent Agent

The agent to run the shape robustness test on.

required
skip_initialization bool

Whether to skip the initialization of the agent. This is to be used in case the agent is initialized in some custom manner beforehand.

False
step_percentage int

Starting at 100%, how much of a percentage step to do to reach the min and max percentages.

20
min_percentage int

The minimum percentage of deformation to apply on the environment's odor plume.

20
max_percentage int

The maximum percentage of deformation to apply on the environment's odor plume. If this value is larger than the maximum shape allowed by the margins, the largest allowed percentage will be used.

200
multipliers list[int]

If provided, the step_percentage, min_percentage and max_percentage parameters will be ignored. A list of percentages of deformations to use to deforme the environment's odor plume.

None
use_gpu bool

Whether to use the GPU to speed up the tests.

False
print_progress bool

Whether to display a progress bar of how many test have been performed so far.

True
print_stats bool

Whether to display statistics at the end of each test.

True
save bool

Whether to save the results of each test to a save_folder. Each test's result will be under the name 'test_env_mult-.csv'

True
save_folder str

The path to which the test results are saved. If not provided, it will automatically create a new folder './results/scale_robustness_test/'

None
save_analysis bool

Whether to save the analysis of the histories. It will be saved under a file named '_analysis.csv' in the save_folder.

True

Returns:

Name Type Description
all_histories list[SimulationHistory]

A list of SimulationHistory instances.

Source code in olfactory_navigation/test_setups.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def test_scale_robustness(agent: Agent,
                          skip_initialization: bool = False,
                          step_percentage: int = 20,
                          min_percentage:int = 20,
                          max_percentage:int = 200,
                          multipliers: list[int] | None = None,
                          use_gpu: bool = False,
                          print_progress: bool = True,
                          print_stats: bool = True,
                          save: bool = True,
                          save_folder: str = None,
                          save_analysis: bool = True
                          ) -> list[SimulationHistory]:
    '''
    Function to test the robustness of an agent in a environment where the scale of the environment's shape is altered by some percentage.

    A list of multipliers will be constructed from the min_percentage to 100% and up to max_percentage values with between each percentage step_percentage values.
    These percentage multipliers will be applied both in the x and y direction but cropped to the largest allowed multiplier along each axis.

    This complete test consists in running from all possible start positions of the original environment.

    Parameters
    ----------
    agent : Agent
        The agent to run the shape robustness test on.
    skip_initialization : bool, default=False
        Whether to skip the initialization of the agent. This is to be used in case the agent is initialized in some custom manner beforehand.
    step_percentage : int, default=20
        Starting at 100%, how much of a percentage step to do to reach the min and max percentages.
    min_percentage : int, default=20
        The minimum percentage of deformation to apply on the environment's odor plume.
    max_percentage : int, default=200
        The maximum percentage of deformation to apply on the environment's odor plume.
        If this value is larger than the maximum shape allowed by the margins, the largest allowed percentage will be used.
    multipliers : list[int], optional
        If provided, the step_percentage, min_percentage and max_percentage parameters will be ignored.
        A list of percentages of deformations to use to deforme the environment's odor plume.
    use_gpu : bool, default=False
        Whether to use the GPU to speed up the tests.
    print_progress : bool, default=True
        Whether to display a progress bar of how many test have been performed so far.
    print_stats : bool, default=True
        Whether to display statistics at the end of each test.
    save : bool, default=True
        Whether to save the results of each test to a save_folder.
        Each test's result will be under the name 'test_env_mult-<multiplier>.csv'
    save_folder : str, optional
        The path to which the test results are saved.
        If not provided, it will automatically create a new folder './results/<timestamp>_scale_robustness_test_<environment_name>/'
    save_analysis : bool, default=True
        Whether to save the analysis of the histories.
        It will be saved under a file named '_analysis.csv' in the save_folder.

    Returns
    -------
    all_histories : list[SimulationHistory]
        A list of SimulationHistory instances.
    '''
    # Gather environment
    environment = agent.environment

    # Gathering starting points
    start_points = np.argwhere(environment.start_probabilities > 0)
    n = len(start_points)

    # Generating multipliers
    if multipliers is None:
        with np.errstate(divide='ignore'):
            low_max_mult = ((environment.margins[:,0] / environment.data_source_position) + 1)
            high_max_mult = (1 + (environment.margins[:,1] / (environment.data_shape - environment.data_source_position)))
            max_mult = np.min(np.vstack([low_max_mult, high_max_mult]), axis=0)

        multipliers = [(100 - perc_mult) for perc_mult in range(0, (100-min_percentage)+step_percentage, step_percentage)[1:]] + [perc_mult for perc_mult in range(100, min(max_percentage, int(max(max_mult)*100)), step_percentage)]
    multipliers.sort()

    # Save Folder name and creation
    if save or save_analysis:
        if save_folder is None:
            save_folder = f'./results/{datetime.now().strftime("%Y%m%d_%H%M%S")}_scale_robustness_test_' + environment.name

        if not os.path.exists(save_folder):
            os.mkdir(save_folder)

        print(f'The results will be saved to: {save_folder}\n')

    all_histories = []
    for mult in (tqdm(multipliers) if print_progress else multipliers):
        print(f'Testing on environment with scale modifier {mult}%')

        # Modifying environment and agent
        modified_environment = environment.modify_scale(scale_factor=mult/100)
        modified_agent = agent.modify_environment(modified_environment)

        # Running test
        hist = run_all_starts_test(
            agent=modified_agent,
            print_progress=False,
            print_stats=print_stats,
            use_gpu=use_gpu)

        all_histories.append(hist)

        # Saving history
        if save:
            file_name = f'test_env_mult-{mult}'
            hist.save(file=file_name,
                      folder=save_folder,
                      save_analysis=False)

        print()

    # Analysis saving
    if save and save_analysis:
        analysis_df = analyse_scale_robustness(all_histories=all_histories, multipliers=multipliers)
        analysis_file_name = '_analysis.csv'
        analysis_df.to_csv(save_folder + '/' + analysis_file_name, index=False)
        print(f'Scale robustness analysis saved to: {save_folder}/{analysis_file_name}')

    return all_histories

test_shape_robustness(agent, skip_initialization=False, step_percentage=20, min_percentage=20, max_percentage=200, multipliers=None, use_gpu=False, print_progress=True, print_stats=True, save=True, save_folder=None, save_analysis=True)

Function to test the robustness of an agent in a environment where the odor plume's shape is altered by some percentage.

A list of multipliers will be constructed from the min_percentage to 100% and up to max_percentage values with between each percentage step_percentage values. These percentage multipliers will be applied both in the x and y direction but cropped to the largest allowed multiplier along each axis.

For each multiplier pair, a completed test will be run. This complete test consists in running from all possible start positions of the original environment.

Parameters:

Name Type Description Default
agent Agent

The agent to run the shape robustness test on.

required
skip_initialization bool

Whether to skip the initialization of the agent. This is to be used in case the agent is initialized in some custom manner beforehand.

False
step_percentage int

Starting at 100%, how much of a percentage step to do to reach the min and max percentages.

20
min_percentage int

The minimum percentage of deformation to apply on the environment's odor plume.

20
max_percentage int

The maximum percentage of deformation to apply on the environment's odor plume. If this value is larger than the maximum shape allowed by the margins, the largest allowed percentage will be used.

200
multipliers list[int]

If provided, the step_percentage, min_percentage and max_percentage parameters will be ignored. A list of percentages of deformations to use to deforme the environment's odor plume.

None
use_gpu bool

Whether to use the GPU to speed up the tests.

False
print_progress bool

Whether to display a progress bar of how many test have been performed so far.

True
print_stats bool

Whether to display statistics at the end of each test.

True
save bool

Whether to save the results of each test to a save_folder. Each test's result will be under the name 'test_env_y-_x-.csv'

True
save_folder str

The path to which the test results are saved. If not provided, it will automatically create a new folder './results/shape_robustness_test/'

None
save_analysis bool

Whether to save the analysis of the histories. It will be saved under a file named '_analysis.csv' in the save_folder.

True

Returns:

Name Type Description
all_histories list[SimulationHistory]

A list of SimulationHistory instances.

Source code in olfactory_navigation/test_setups.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def test_shape_robustness(agent: Agent,
                          skip_initialization: bool = False,
                          step_percentage: int = 20,
                          min_percentage:int = 20,
                          max_percentage:int = 200,
                          multipliers: list[int] | None = None,
                          use_gpu: bool = False,
                          print_progress: bool = True,
                          print_stats: bool = True,
                          save: bool = True,
                          save_folder: str = None,
                          save_analysis: bool = True
                          ) -> list[SimulationHistory]:
    '''
    Function to test the robustness of an agent in a environment where the odor plume's shape is altered by some percentage.

    A list of multipliers will be constructed from the min_percentage to 100% and up to max_percentage values with between each percentage step_percentage values.
    These percentage multipliers will be applied both in the x and y direction but cropped to the largest allowed multiplier along each axis.

    For each multiplier pair, a completed test will be run. This complete test consists in running from all possible start positions of the original environment.

    Parameters
    ----------
    agent : Agent
        The agent to run the shape robustness test on.
    skip_initialization : bool, default=False
        Whether to skip the initialization of the agent. This is to be used in case the agent is initialized in some custom manner beforehand.
    step_percentage : int, default=20
        Starting at 100%, how much of a percentage step to do to reach the min and max percentages.
    min_percentage : int, default=20
        The minimum percentage of deformation to apply on the environment's odor plume.
    max_percentage : int, default=200
        The maximum percentage of deformation to apply on the environment's odor plume.
        If this value is larger than the maximum shape allowed by the margins, the largest allowed percentage will be used.
    multipliers : list[int], optional
        If provided, the step_percentage, min_percentage and max_percentage parameters will be ignored.
        A list of percentages of deformations to use to deforme the environment's odor plume.
    use_gpu : bool, default=False
        Whether to use the GPU to speed up the tests.
    print_progress : bool, default=True
        Whether to display a progress bar of how many test have been performed so far.
    print_stats : bool, default=True
        Whether to display statistics at the end of each test.
    save : bool, default=True
        Whether to save the results of each test to a save_folder.
        Each test's result will be under the name 'test_env_y-<y_multiplier>_x-<x_multiplier>.csv'
    save_folder : str, optional
        The path to which the test results are saved.
        If not provided, it will automatically create a new folder './results/<timestamp>_shape_robustness_test_<environment_name>/'
    save_analysis : bool, default=True
        Whether to save the analysis of the histories.
        It will be saved under a file named '_analysis.csv' in the save_folder.

    Returns
    -------
    all_histories : list[SimulationHistory]
        A list of SimulationHistory instances.
    '''
    # Gather environment
    environment = agent.environment

    # Gathering starting points
    start_points = np.argwhere(environment.start_probabilities > 0)
    n = len(start_points)

    # Generating multipliers
    if multipliers is None:
        with np.errstate(divide='ignore'):
            low_max_mult = ((environment.margins[:,0] / environment.data_source_position) + 1)
            high_max_mult = (1 + (environment.margins[:,1] / (environment.data_shape - environment.data_source_position)))
            max_mult = np.min(np.vstack([low_max_mult, high_max_mult]), axis=0)

        multipliers = [(100 - perc_mult) for perc_mult in range(0, (100-min_percentage)+step_percentage, step_percentage)[1:]] + [perc_mult for perc_mult in range(100, min(max_percentage, int(max(max_mult)*100)), step_percentage)]
    multipliers.sort()

    # Generating all combinations of multipliers
    mult_combinations = np.array(np.meshgrid(multipliers, multipliers, indexing='xy')).T.reshape(-1,2).astype(float)
    mult_combinations /= 100
    mult_combinations = mult_combinations[np.all(mult_combinations < max_mult, axis=1), :]

    # Save Folder name and creation
    if save or save_analysis:
        if save_folder is None:
            save_folder = f'./results/{datetime.now().strftime("%Y%m%d_%H%M%S")}_shape_robustness_test_' + environment.name

        if not os.path.exists(save_folder):
            os.mkdir(save_folder)

        print(f'The results will be saved to: {save_folder}\n')

    all_histories = []
    for mults in (tqdm(mult_combinations) if print_progress else mult_combinations):
        print(f'Testing on environment with height {int(mults[0]*100)}% and width {int(mults[1] * 100)}%')

        # Modifying environment
        modified_environment = environment.modify(multiplier=mults)

        # Running test
        hist = run_test(
            agent=agent,
            n=n,
            start_points=start_points,
            environment=modified_environment,
            skip_initialization=skip_initialization,
            print_progress=False,
            print_stats=print_stats,
            use_gpu=use_gpu)

        all_histories.append(hist)

        # Saving history
        if save:
            file_name = f'test_env_y-{int(mults[0]*100)}_x-{int(mults[1]*100)}'
            hist.save(file=file_name,
                      folder=save_folder,
                      save_analysis=False)

        print()

    # Analysis saving
    if save and save_analysis:
        analysis_df = analyse_shape_robustness(all_histories=all_histories, multipliers=(mult_combinations*100))
        analysis_file_name = '_analysis.csv'
        analysis_df.to_csv(save_folder + '/' + analysis_file_name, index=False)
        print(f'Shape robustness analysis saved to: {save_folder}/{analysis_file_name}')

    return all_histories