Skip to content

pbvi_agent

PBVI_Agent

Bases: Agent

A generic Point-Based Value Iteration based agent. It relies on Model-Based reinforcement learning as described in: Pineau J. et al, Point-based value iteration: An anytime algorithm for POMDPs The training consist in two steps:

  • Expand: Where belief points are explored based on the some strategy (to be defined by subclasses).

  • Backup: Using the generated belief points, the value function is updated.

The belief points are probability distributions over the state space and are therefore vectors of |S| elements.

Actions are chosen based on a value function. A value function is a set of alpha vectors of dimensionality |S|. Each alpha vector is associated to a single action but multiple alpha vectors can be associated to the same action. To choose an action at a given belief point, a dot product is taken between each alpha vector and the belief point and the action associated with the highest result is chosen.

Parameters:

Name Type Description Default
environment Environment

The olfactory environment to train the agent with.

required
thresholds float or list[float] or dict[str, float] or dict[str, list[float]]

The olfactory thresholds. If an odor cue above this threshold is detected, the agent detects it, else it does not. If a list of thresholds is provided, the agent should be able to detect |thresholds|+1 levels of odor. A dictionary of (list of) thresholds can also be provided when the environment is layered. In such case, the number of layers provided must match the environment's layers and their labels must match. The thresholds provided will be converted to an array where the levels start with -inf and end with +inf.

= 3e-6
space_aware bool

Whether the agent is aware of its own position in space. This is to be used in scenarios where, for example, the agent is an enclosed container and the source is the variable. Note: The observation array will have a different shape when returned to the update_state function!

= False
spacial_subdivisions ndarray

How many spacial compartments the agent has to internally represent the space it lives in. By default, it will be as many as there are grid points in the environment.

None
actions dict or ndarray

The set of action available to the agent. It should match the type of environment (ie: if the environment has layers, it should contain a layer component to the action vector, and similarly for a third dimension). Else, a dict of strings and action vectors where the strings represent the action labels. If none is provided, by default, all unit steps in all cardinal directions are included and such for all layers (if the environment has layers.)

None
name str

A custom name to give the agent. If not provided is will be a combination of the class-name and the threshold.

None
rng int or Generator

A seed for random generation or directly a numpy random generator.

= np.random.default_rng()
model POMDP

A POMDP model to use to represent the olfactory environment. If not provided, the environment_converter parameter will be used.

None
use_reachability bool
Whether or not to use the reachable states as a shortcut to find the posterior state(s).
= False
environment_converter Callable

A function to convert the olfactory environment instance to a POMDP Model instance. By default, we use an exact convertion that keeps the shape of the environment to make the amount of states of the POMDP Model. This parameter will be ignored if the model parameter is provided.

= exact_converter
converter_parameters dict

A set of additional parameters to be passed down to the environment converter.

{}

Attributes:

Name Type Description
environment Environment
thresholds ndarray

An array of the thresholds of detection, starting with -inf and ending with +inf. In the case of a 2D array of thresholds, the rows of thresholds apply to the different layers of the environment.

space_aware bool
spacial_subdivisions ndarray
trained bool

Whether or not the agent needs to be trained. If an agent doesnt need training this parameter is set to True by default.

name str
action_set ndarray

The actions allowed of the agent. Formulated as movement vectors as [(layer,) (dz,) dy, dx].

action_labels list[str]

The labels associated to the action vectors present in the action set.

model POMDP

The environment converted to a POMDP model using the "from_environment" constructor of the POMDP class.

saved_at str

The place on disk where the agent has been saved (None if not saved yet).

on_gpu bool

Whether the agent has been sent to the gpu or not.

class_name str

The name of the class of the agent.

rng Generator

A random number generator.

on_cpu PBVI_Agent

An instance of the agent on the CPU. If it already is, it returns itself.

on_gpu PBVI_Agent

An instance of the agent on the GPU. If it already is, it returns itself.

trained_at str

A string timestamp of when the agent has been trained (None if not trained yet).

value_function ValueFunction

The value function used for the agent to make decisions.

belief BeliefSet

Used only during simulations. Part of the Agent's status. Where the agent believes he is over the state space. It is a list of n belief points based on how many simulations are running at once.

action_played list[int]

Used only during simulations. Part of the Agent's status. Records what action was last played by the agent. A list of n actions played based on how many simulations are running at once.

Source code in olfactory_navigation/agents/pbvi_agent.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
class PBVI_Agent(Agent):
    '''
    A generic Point-Based Value Iteration based agent. It relies on Model-Based reinforcement learning as described in: Pineau J. et al, Point-based value iteration: An anytime algorithm for POMDPs
    The training consist in two steps:

    - Expand: Where belief points are explored based on the some strategy (to be defined by subclasses).

    - Backup: Using the generated belief points, the value function is updated.

    The belief points are probability distributions over the state space and are therefore vectors of |S| elements.

    Actions are chosen based on a value function. A value function is a set of alpha vectors of dimensionality |S|.
    Each alpha vector is associated to a single action but multiple alpha vectors can be associated to the same action.
    To choose an action at a given belief point, a dot product is taken between each alpha vector and the belief point and the action associated with the highest result is chosen.

    Parameters
    ----------
    environment : Environment
        The olfactory environment to train the agent with.
    thresholds : float or list[float] or dict[str, float] or dict[str, list[float]], default = 3e-6
        The olfactory thresholds. If an odor cue above this threshold is detected, the agent detects it, else it does not.
        If a list of thresholds is provided, the agent should be able to detect |thresholds|+1 levels of odor.
        A dictionary of (list of) thresholds can also be provided when the environment is layered.
        In such case, the number of layers provided must match the environment's layers and their labels must match.
        The thresholds provided will be converted to an array where the levels start with -inf and end with +inf.
    space_aware : bool, default = False
        Whether the agent is aware of its own position in space.
        This is to be used in scenarios where, for example, the agent is an enclosed container and the source is the variable.
        Note: The observation array will have a different shape when returned to the update_state function!
    spacial_subdivisions : np.ndarray, optional
        How many spacial compartments the agent has to internally represent the space it lives in.
        By default, it will be as many as there are grid points in the environment.
    actions : dict or np.ndarray, optional
        The set of action available to the agent. It should match the type of environment (ie: if the environment has layers, it should contain a layer component to the action vector, and similarly for a third dimension).
        Else, a dict of strings and action vectors where the strings represent the action labels.
        If none is provided, by default, all unit steps in all cardinal directions are included and such for all layers (if the environment has layers.)
    name : str, optional
        A custom name to give the agent. If not provided is will be a combination of the class-name and the threshold.
    rng : int or np.random.Generator, default = np.random.default_rng()
        A seed for random generation or directly a numpy random generator.
    model : POMDP, optional
        A POMDP model to use to represent the olfactory environment.
        If not provided, the environment_converter parameter will be used.
    use_reachability : bool, default = False
            Whether or not to use the reachable states as a shortcut to find the posterior state(s).
    environment_converter : Callable, default = exact_converter
        A function to convert the olfactory environment instance to a POMDP Model instance.
        By default, we use an exact convertion that keeps the shape of the environment to make the amount of states of the POMDP Model.
        This parameter will be ignored if the model parameter is provided.
    converter_parameters : dict, optional
        A set of additional parameters to be passed down to the environment converter.

    Attributes
    ---------
    environment : Environment
    thresholds : np.ndarray
        An array of the thresholds of detection, starting with -inf and ending with +inf.
        In the case of a 2D array of thresholds, the rows of thresholds apply to the different layers of the environment.
    space_aware : bool
    spacial_subdivisions : np.ndarray
    trained : bool
        Whether or not the agent needs to be trained. If an agent doesnt need training this parameter is set to True by default.
    name : str
    action_set : np.ndarray
        The actions allowed of the agent. Formulated as movement vectors as [(layer,) (dz,) dy, dx].
    action_labels : list[str]
        The labels associated to the action vectors present in the action set.
    model : POMDP
        The environment converted to a POMDP model using the "from_environment" constructor of the POMDP class.
    saved_at : str
        The place on disk where the agent has been saved (None if not saved yet).
    on_gpu : bool
        Whether the agent has been sent to the gpu or not.
    class_name : str
        The name of the class of the agent.
    rng : np.random.Generator
        A random number generator.
    on_cpu : PBVI_Agent
        An instance of the agent on the CPU. If it already is, it returns itself.
    on_gpu : PBVI_Agent
        An instance of the agent on the GPU. If it already is, it returns itself.
    trained_at : str
        A string timestamp of when the agent has been trained (None if not trained yet).
    value_function : ValueFunction
        The value function used for the agent to make decisions.
    belief : BeliefSet
        Used only during simulations.
        Part of the Agent's status. Where the agent believes he is over the state space.
        It is a list of n belief points based on how many simulations are running at once.
    action_played : list[int]
        Used only during simulations.
        Part of the Agent's status. Records what action was last played by the agent.
        A list of n actions played based on how many simulations are running at once.
    '''
    def __init__(self,
                 environment: Environment,
                 thresholds: float | list[float] | dict[str, float] | dict[str, list[float]] = 3e-6,
                 space_aware: bool = False,
                 spacial_subdivisions: np.ndarray = None,
                 actions: dict[str, np.ndarray] | np.ndarray = None,
                 name: str = None,
                 model: POMDP = None,
                 use_reachability: bool = False,
                 environment_converter: Callable = None,
                 **converter_parameters
                 ) -> None:
        super().__init__(
            environment = environment,
            thresholds = thresholds,
            space_aware = space_aware,
            spacial_subdivisions = spacial_subdivisions,
            actions = actions,
            name = name
        )

        # Converting the olfactory environment to a POMDP Model
        if model is not None:
            loaded_model = model
        elif callable(environment_converter):
            loaded_model = environment_converter(agent=self, **converter_parameters)
        else:
            # Using the exact converter
            loaded_model = exact_converter(agent=self)
        self.model:POMDP = loaded_model

        self.use_reachability = use_reachability

        # Trainable variables
        self.trained_at = None
        self.value_function: ValueFunction = None

        # Status variables
        self.belief: BeliefSet = None
        self.action_played = None
        self.succeeded_update = None


    @property
    def on_gpu(self) -> Self:
        '''
        A version of the Agent on the GPU.
        If the agent is already on the GPU it returns itself, otherwise a new one is generated.
        '''
        # Check whether the agent is already on the gpu or not
        if self.is_on_gpu:
            return self

        assert gpu_support, "GPU support is not enabled, Cupy might need to be installed..."

        # Check if an alternate version doesnt exists create a new one
        if self._alternate_version is None:
            # Generating a new instance
            cls = self.__class__
            gpu_agent = cls.__new__(cls)

            # Copying arguments to gpu
            for arg, val in self.__dict__.items():
                if isinstance(val, np.ndarray):
                    setattr(gpu_agent, arg, cp.array(val))
                elif isinstance(val, (POMDP, ValueFunction, BeliefSet, Belief)):
                    setattr(gpu_agent, arg, val.on_gpu)
                else:
                    setattr(gpu_agent, arg, val)

            # Self reference instances
            self._alternate_version = gpu_agent
            gpu_agent._alternate_version = self
            gpu_agent.is_on_gpu = True

        return self._alternate_version


    @property
    def on_cpu(self) -> Self:
        '''
        A version of the Agent on the CPU.
        If the agent is already on the CPU it returns itself, otherwise a new one is generated.
        '''
        # Check whether the agent is already on the cpu or not
        if not self.is_on_gpu:
            return self

        # Check if an alternate version doesnt exists create a new one
        if self._alternate_version is None:
            # Generating a new instance
            cls = self.__class__
            cpu_agent = cls.__new__(cls)

            # Copying arguments to gpu
            for arg, val in self.__dict__.items():
                if isinstance(val, cp.ndarray):
                    setattr(cpu_agent, arg, cp.asnumpy(val))
                elif isinstance(val, (POMDP, ValueFunction, BeliefSet, Belief)):
                    setattr(cpu_agent, arg, val.on_cpu)
                else:
                    setattr(cpu_agent, arg, val)

            # Self reference instances
            self._alternate_version = cpu_agent
            cpu_agent._alternate_version = self
            cpu_agent.is_on_gpu = False

        return self._alternate_version


    def save(self,
             folder: str = None,
             force: bool = False,
             save_environment: bool = False
             ) -> None:
        '''
        The save function for PBVI Agents consists in recording the value function after the training.
        It saves the agent in a folder with the name of the agent (class name + training timestamp).
        In this folder, there will be the metadata of the agent (all the attributes) in a json format and the value function.

        Optionally, the environment can be saved too to be able to load it alongside the agent for future reuse.
        If the agent has already been saved, the saving will not happen unless the force parameter is toggled.

        Parameters
        ----------
        folder : str, optional
            The folder under which to save the agent (a subfolder will be created under this folder).
            The agent will therefore be saved at <folder>/Agent-<agent_name> .
            By default the current folder is used.
        force : bool, default = False
            Whether to overwrite an already saved agent with the same name at the same path.
        save_environment : bool, default = False
            Whether to save the environment data along with the agent.
        '''
        assert self.trained_at is not None, "The agent is not trained, there is nothing to save."

        # GPU support
        if self.is_on_gpu:
            self.on_cpu.save(folder=folder, force=force, save_environment=save_environment)
            return

        # Adding env name to folder path
        if folder is None:
            folder = f'./Agent-{self.name}'
        else:
            folder += '/Agent-' + self.name

        # Checking the folder exists or creates it
        if not os.path.exists(folder):
            os.mkdir(folder)
        elif len(os.listdir(folder)):
            if force:
                shutil.rmtree(folder)
                os.mkdir(folder)
            else:
                raise Exception(f'{folder} is not empty. If you want to overwrite the saved model, enable "force".')

        # If requested save environment
        if save_environment:
            self.environment.save(folder=folder)

        # TODO: Add MODEL to save function
        # Generating the metadata arguments dictionary
        arguments = {}
        arguments['name'] = self.name
        arguments['class'] = self.class_name
        if len(self.thresholds.shape) == 2:
            arguments['thresholds'] = {layer_lab: layer_thresholds for layer_lab, layer_thresholds in zip(self.environment.layer_labels, self.thresholds.tolist())}
        else:
            arguments['thresholds'] = self.thresholds.tolist()
        arguments['environment_name'] = self.environment.name
        arguments['environment_saved_at'] = self.environment.saved_at
        arguments['space_aware'] = self.space_aware
        arguments['spacial_subdivisions'] = self.spacial_subdivisions.tolist()
        arguments['action_labels'] = self.action_labels
        arguments['action_set'] = self.action_set.tolist()
        arguments['trained_at'] = self.trained_at

        # Output the arguments to a METADATA file
        with open(folder + '/METADATA.json', 'w') as json_file:
            json.dump(arguments, json_file, indent=4)

        # Save value function
        self.value_function.save(folder=folder, file_name='Value_Function.npy')

        # Finalization
        self.saved_at = os.path.abspath(folder).replace('\\', '/')
        print(f'Agent saved to: {folder}')


    @classmethod
    def load(cls,
             folder: str
             ) -> 'PBVI_Agent':
        '''
        Function to load a PBVI agent from a given folder it has been saved to.
        It will load the environment the agent has been trained on along with it.

        If it is a subclass of the PBVI_Agent, an instance of that specific subclass will be returned.

        Parameters
        ----------
        folder : str
            The agent folder.

        Returns
        -------
        instance : PBVI_Agent
            The loaded instance of the PBVI Agent.
        '''
        # Load arguments
        arguments = None
        with open(folder + '/METADATA.json', 'r') as json_file:
            arguments = json.load(json_file)

        # Load environment
        environment = Environment.load(arguments['environment_saved_at'])

        # Load specific class
        if arguments['class'] != 'PBVI_Agent':
            from olfactory_navigation import agents
            cls = {name:obj for name, obj in inspect.getmembers(agents)}[arguments['class']]

        # Build instance
        instance = cls(
            environment = environment,
            thresholds = arguments['thresholds'],
            space_aware = arguments['space_aware'],
            spacial_subdivisions = np.array(arguments['spacial_subdivisions']),
            actions = {a_label: a_vector for a_label, a_vector in zip(arguments['action_labels'], arguments['action_set'])},
            name = arguments['name']
        )

        # Load and set the value function on the instance
        instance.value_function = ValueFunction.load(
            file=folder + '/Value_Function.npy',
            model=instance.model
        )
        instance.trained_at = arguments['trained_at']
        instance.saved_at = folder

        return instance


    def train(self,
              expansions: int = 10,
              full_backup: bool = True,
              update_passes: int = 1,
              max_belief_growth: int = 10,
              initial_belief: BeliefSet | Belief = None,
              initial_value_function: ValueFunction = None,
              prune_level: int = 1,
              prune_interval: int = 10,
              limit_value_function_size: int = -1,
              gamma: float = 0.99,
              eps: float = 1e-6,
              convergence_stop: bool = False,
              use_gpu: bool = False,
              history_tracking_level: int = 1,
              overwrite_training: bool = False,
              print_progress: bool = True,
              print_stats: bool = True,
              **expand_arguments
              ) -> TrainingHistory:
        '''
        Main loop of the Point-Based Value Iteration algorithm.
        It consists in 2 steps, Backup and Expand.
        1. Expand: Expands the belief set base with a expansion strategy given by the parameter expand_function
        2. Backup: Updates the alpha vectors based on the current belief set

        Parameters
        ----------
        expansions : int, default = 10
            How many times the algorithm has to expand the belief set. (the size will be doubled every time, eg: for 5, the belief set will be of size 32)
        full_backup : bool, default = True
            Whether to force the backup function has to be run on the full set beliefs uncovered since the beginning or only on the new points.
        update_passes : int, default = 1
            How many times the backup function has to be run every time the belief set is expanded.
        max_belief_growth : int, default = 10
            How many beliefs can be added at every expansion step to the belief set.
        initial_belief : BeliefSet or Belief, optional
            An initial list of beliefs to start with.
        initial_value_function : ValueFunction, optional
            An initial value function to start the solving process with.
        prune_level : int, default = 1
            Parameter to prune the value function further before the expand function.
        prune_interval : int, default = 10
            How often to prune the value function. It is counted in number of backup iterations.
        limit_value_function_size : int, default = -1
            When the value function size crosses this threshold, a random selection of 'max_belief_growth' alpha vectors will be removed from the value function
            If set to -1, the value function can grow without bounds.
        use_gpu : bool, default = False
            Whether to use the GPU with cupy array to accelerate solving.
        gamma : float, default = 0.99
            The discount factor to value immediate rewards more than long term rewards.
            The learning rate is 1/gamma.
        eps : float, default = 1e-6
            The smallest allowed changed for the value function.
            Below the amount of change, the value function is considered converged and the value iteration process will end early.
            convergence_stop : bool, default = False
        convergence_stop : bool, default = False
            Whether to compute to compute the change in the value function and stop early if this change is smaller than eps.
        history_tracking_level : int, default = 1
            How thorough the tracking of the solving process should be. (0: Nothing; 1: Times and sizes of belief sets and value function; 2: The actual value functions and beliefs sets)
        overwrite_training : bool, default = False
            Whether to force the overwriting of the training if a value function already exists for this agent.
        print_progress : bool, default = True
            Whether or not to print out the progress of the value iteration process.
        print_stats : bool, default = True
            Whether or not to print out statistics at the end of the training run.
        expand_arguments : kwargs
            An arbitrary amount of parameters that will be passed on to the expand function.

        Returns
        -------
        solver_history : SolverHistory
            The history of the solving process with some plotting options.
        '''
        raise NotImplementedError('The train function is not implemented, make a PBVI agent subclass to implement the method')


    def modify_environment(self,
                           new_environment: Environment
                           ) -> 'Agent':
        '''
        Function to modify the environment of the agent.
        If the agent is already trained, the trained element should also be adapted to fit this new environment.

        Parameters
        ----------
        new_environment : Environment
            A modified environment.

        Returns
        -------
        modified_agent : PBVI_Agent
            A new pbvi agent with a modified environment
        '''
        # TODO: Fix this to account for other init parameters
        # GPU support
        if self.is_on_gpu:
            return self.on_cpu.modify_environment(new_environment=new_environment)

        # Creating a new agent instance
        modified_agent = self.__class__(environment = new_environment,
                                        thresholds = self.thresholds,
                                        name = self.name)

        # Modifying the value function
        if self.value_function is not None:
            reshaped_vf_array = np.array([cv2.resize(av, np.array(modified_agent.model.state_grid.shape)[::-1]).ravel()
                                          for av in self.value_function.alpha_vector_array.reshape(len(self.value_function), *self.model.state_grid.shape)])
            modified_vf = ValueFunction(modified_agent.model, alpha_vectors=reshaped_vf_array, action_list=self.value_function.actions)
            modified_agent.value_function = modified_vf

        return modified_agent


    def initialize_state(self,
                         n: int = 1,
                         belief: BeliefSet = None
                         ) -> None:
        '''
        To use an agent within a simulation, the agent's state needs to be initialized.
        The initialization consists in setting the agent's initial belief.
        Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

        Parameters
        ----------
        n : int, default = 1
            How many agents are to be used during the simulation.
        belief : BeliefSet, optional
            An optional set of beliefs to initialize the simulations with.
        '''
        assert self.value_function is not None, "Agent was not trained, run the training function first..."

        if belief is None:
            self.belief = BeliefSet(self.model, [Belief(self.model) for _ in range(n)])
        else:
            assert len(belief) == n, f"The amount of beliefs provided ({len(belief)}) to initialize the state need to match the amount of stimulations to initialize (n={n})."

            if self.is_on_gpu and not belief.is_on_gpu:
                self.belief = belief.on_gpu
            elif not self.is_on_gpu and belief.is_on_gpu:
                self.belief = belief.on_cpu
            else:
                self.belief = belief


    def choose_action(self) -> np.ndarray:
        '''
        Function to let the agent or set of agents choose an action based on their current belief.

        Returns
        -------
        movement_vector : np.ndarray
            A single or a list of actions chosen by the agent(s) based on their belief.
        '''
        assert self.belief is not None, "Agent was not initialized yet, run the initialize_state function first"

        # Evaluated value function
        _, action = self.value_function.evaluate_at(self.belief)

        # Recording the action played
        self.action_played = action

        # Converting action indexes to movement vectors
        movemement_vector = self.action_set[action,:]

        return movemement_vector


    def update_state(self,
                     action: np.ndarray,
                     observation: np.ndarray,
                     source_reached: np.ndarray
                     ) -> None | np.ndarray:
        '''
        Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

        Parameters
        ----------
        action : np.ndarray
            A 2D array of n movement vectors. If the environment is layered, the 1st component should be the layer.
        observation : np.ndarray
            The observation(s) the agent(s) made.
        source_reached : np.ndarray
            A boolean array of whether the agent(s) have reached the source or not.

        Returns
        -------
        update_successfull : np.ndarray, optional
            If nothing is returned, it means all the agent's state updates have been successfull.
            Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.
        '''
        assert self.belief is not None, "Agent was not initialized yet, run the initialize_state function first"
        # GPU support
        xp = np if not self.is_on_gpu else cp

        # Discretizing observations
        observation_ids = self.discretize_observations(observation=observation, action=action, source_reached=source_reached)

        # Update the set of beliefs
        self.belief, provenance = self.belief.update(actions = self.action_played,
                                                     observations = observation_ids,
                                                     raise_on_impossible_belief = False,
                                                     use_reachability = True,
                                                     return_provenance = True)

        # Check for failed updates
        update_successful = xp.isin(xp.arange(len(self.belief)), provenance[:,0])
        self.succeeded_update = update_successful

        return update_successful


    def kill(self,
             simulations_to_kill: np.ndarray
             ) -> None:
        '''
        Function to kill any simulations that have not reached the source but can't continue further.

        Parameters
        ----------
        simulations_to_kill : np.ndarray
            A boolean array of the simulations to kill.
        '''
        if all(simulations_to_kill):
            self.belief = None
        else:
            filtered_simulations_to_kill = simulations_to_kill[self.succeeded_update]
            self.belief = BeliefSet(self.belief.model, self.belief.belief_array[~filtered_simulations_to_kill])


    def generate_beliefs_from_trajectory(self,
                                         history: SimulationHistory,
                                         trajectory_i: int = 0,
                                         initial_belief: Belief = None
                                         ) -> BeliefSet:
        '''
        Function to generate a sequence of belief points from the trajectory from SimulationHistory instance.

        Parameters
        ----------
        history : SimulationHistory
            The simulation history from which the agent's trajectory is extracted.
        trajectory_i : int, default = 0
            The id of the trajectory from which to build the belief sequence.
        initial_belief : Belief, optional
            The initial belief point from which to start the sequence.

        Returns
        -------
        belief_sequence : BeliefSet
            The sequence of beliefs the agent going through in the the trajectory of the simulation.
        '''
        # If the initial belief is not provided, generate one
        if initial_belief is None:
            initial_belief = Belief(self.model)

        # Retrieve the trjactory's simulation dataframe
        df = history.simulation_dfs[trajectory_i]

        # Set the belief that will be iterate on
        belief = initial_belief

        # Belief sequence to be returned at the end
        belief_sequence = [initial_belief]

        for row_id, row in enumerate(df.iterrows()):
            row = row[1]

            # Skip initial position
            if row_id == 0:
                continue

            # Check the ID of the action
            a = np.argwhere(np.all((self.action_set == [row['dy'],row['dx']]), axis=1))[0,0]

            # Retrieve observations
            o = [row['o']]
            if self.space_aware:
                o += [row['y'],row['x']]

            # Discretize observations
            discrete_o = self.discretize_observations(observation=np.array([o]), action=np.array([a]), source_reached=np.array([False]))[0]

            try:
                # Update belief
                belief = belief.update(a=a, o=discrete_o, use_reachability=self.use_reachability)
                belief_sequence.append(belief)
            except:
                print(f'[Warning] Update of belief failed at step {row_id}...')

        return BeliefSet(self.model, belief_sequence)

on_cpu property

A version of the Agent on the CPU. If the agent is already on the CPU it returns itself, otherwise a new one is generated.

on_gpu property

A version of the Agent on the GPU. If the agent is already on the GPU it returns itself, otherwise a new one is generated.

choose_action()

Function to let the agent or set of agents choose an action based on their current belief.

Returns:

Name Type Description
movement_vector ndarray

A single or a list of actions chosen by the agent(s) based on their belief.

Source code in olfactory_navigation/agents/pbvi_agent.py
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
def choose_action(self) -> np.ndarray:
    '''
    Function to let the agent or set of agents choose an action based on their current belief.

    Returns
    -------
    movement_vector : np.ndarray
        A single or a list of actions chosen by the agent(s) based on their belief.
    '''
    assert self.belief is not None, "Agent was not initialized yet, run the initialize_state function first"

    # Evaluated value function
    _, action = self.value_function.evaluate_at(self.belief)

    # Recording the action played
    self.action_played = action

    # Converting action indexes to movement vectors
    movemement_vector = self.action_set[action,:]

    return movemement_vector

generate_beliefs_from_trajectory(history, trajectory_i=0, initial_belief=None)

Function to generate a sequence of belief points from the trajectory from SimulationHistory instance.

Parameters:

Name Type Description Default
history SimulationHistory

The simulation history from which the agent's trajectory is extracted.

required
trajectory_i int

The id of the trajectory from which to build the belief sequence.

= 0
initial_belief Belief

The initial belief point from which to start the sequence.

None

Returns:

Name Type Description
belief_sequence BeliefSet

The sequence of beliefs the agent going through in the the trajectory of the simulation.

Source code in olfactory_navigation/agents/pbvi_agent.py
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
def generate_beliefs_from_trajectory(self,
                                     history: SimulationHistory,
                                     trajectory_i: int = 0,
                                     initial_belief: Belief = None
                                     ) -> BeliefSet:
    '''
    Function to generate a sequence of belief points from the trajectory from SimulationHistory instance.

    Parameters
    ----------
    history : SimulationHistory
        The simulation history from which the agent's trajectory is extracted.
    trajectory_i : int, default = 0
        The id of the trajectory from which to build the belief sequence.
    initial_belief : Belief, optional
        The initial belief point from which to start the sequence.

    Returns
    -------
    belief_sequence : BeliefSet
        The sequence of beliefs the agent going through in the the trajectory of the simulation.
    '''
    # If the initial belief is not provided, generate one
    if initial_belief is None:
        initial_belief = Belief(self.model)

    # Retrieve the trjactory's simulation dataframe
    df = history.simulation_dfs[trajectory_i]

    # Set the belief that will be iterate on
    belief = initial_belief

    # Belief sequence to be returned at the end
    belief_sequence = [initial_belief]

    for row_id, row in enumerate(df.iterrows()):
        row = row[1]

        # Skip initial position
        if row_id == 0:
            continue

        # Check the ID of the action
        a = np.argwhere(np.all((self.action_set == [row['dy'],row['dx']]), axis=1))[0,0]

        # Retrieve observations
        o = [row['o']]
        if self.space_aware:
            o += [row['y'],row['x']]

        # Discretize observations
        discrete_o = self.discretize_observations(observation=np.array([o]), action=np.array([a]), source_reached=np.array([False]))[0]

        try:
            # Update belief
            belief = belief.update(a=a, o=discrete_o, use_reachability=self.use_reachability)
            belief_sequence.append(belief)
        except:
            print(f'[Warning] Update of belief failed at step {row_id}...')

    return BeliefSet(self.model, belief_sequence)

initialize_state(n=1, belief=None)

To use an agent within a simulation, the agent's state needs to be initialized. The initialization consists in setting the agent's initial belief. Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

Parameters:

Name Type Description Default
n int

How many agents are to be used during the simulation.

= 1
belief BeliefSet

An optional set of beliefs to initialize the simulations with.

None
Source code in olfactory_navigation/agents/pbvi_agent.py
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
def initialize_state(self,
                     n: int = 1,
                     belief: BeliefSet = None
                     ) -> None:
    '''
    To use an agent within a simulation, the agent's state needs to be initialized.
    The initialization consists in setting the agent's initial belief.
    Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

    Parameters
    ----------
    n : int, default = 1
        How many agents are to be used during the simulation.
    belief : BeliefSet, optional
        An optional set of beliefs to initialize the simulations with.
    '''
    assert self.value_function is not None, "Agent was not trained, run the training function first..."

    if belief is None:
        self.belief = BeliefSet(self.model, [Belief(self.model) for _ in range(n)])
    else:
        assert len(belief) == n, f"The amount of beliefs provided ({len(belief)}) to initialize the state need to match the amount of stimulations to initialize (n={n})."

        if self.is_on_gpu and not belief.is_on_gpu:
            self.belief = belief.on_gpu
        elif not self.is_on_gpu and belief.is_on_gpu:
            self.belief = belief.on_cpu
        else:
            self.belief = belief

kill(simulations_to_kill)

Function to kill any simulations that have not reached the source but can't continue further.

Parameters:

Name Type Description Default
simulations_to_kill ndarray

A boolean array of the simulations to kill.

required
Source code in olfactory_navigation/agents/pbvi_agent.py
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
def kill(self,
         simulations_to_kill: np.ndarray
         ) -> None:
    '''
    Function to kill any simulations that have not reached the source but can't continue further.

    Parameters
    ----------
    simulations_to_kill : np.ndarray
        A boolean array of the simulations to kill.
    '''
    if all(simulations_to_kill):
        self.belief = None
    else:
        filtered_simulations_to_kill = simulations_to_kill[self.succeeded_update]
        self.belief = BeliefSet(self.belief.model, self.belief.belief_array[~filtered_simulations_to_kill])

load(folder) classmethod

Function to load a PBVI agent from a given folder it has been saved to. It will load the environment the agent has been trained on along with it.

If it is a subclass of the PBVI_Agent, an instance of that specific subclass will be returned.

Parameters:

Name Type Description Default
folder str

The agent folder.

required

Returns:

Name Type Description
instance PBVI_Agent

The loaded instance of the PBVI Agent.

Source code in olfactory_navigation/agents/pbvi_agent.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
@classmethod
def load(cls,
         folder: str
         ) -> 'PBVI_Agent':
    '''
    Function to load a PBVI agent from a given folder it has been saved to.
    It will load the environment the agent has been trained on along with it.

    If it is a subclass of the PBVI_Agent, an instance of that specific subclass will be returned.

    Parameters
    ----------
    folder : str
        The agent folder.

    Returns
    -------
    instance : PBVI_Agent
        The loaded instance of the PBVI Agent.
    '''
    # Load arguments
    arguments = None
    with open(folder + '/METADATA.json', 'r') as json_file:
        arguments = json.load(json_file)

    # Load environment
    environment = Environment.load(arguments['environment_saved_at'])

    # Load specific class
    if arguments['class'] != 'PBVI_Agent':
        from olfactory_navigation import agents
        cls = {name:obj for name, obj in inspect.getmembers(agents)}[arguments['class']]

    # Build instance
    instance = cls(
        environment = environment,
        thresholds = arguments['thresholds'],
        space_aware = arguments['space_aware'],
        spacial_subdivisions = np.array(arguments['spacial_subdivisions']),
        actions = {a_label: a_vector for a_label, a_vector in zip(arguments['action_labels'], arguments['action_set'])},
        name = arguments['name']
    )

    # Load and set the value function on the instance
    instance.value_function = ValueFunction.load(
        file=folder + '/Value_Function.npy',
        model=instance.model
    )
    instance.trained_at = arguments['trained_at']
    instance.saved_at = folder

    return instance

modify_environment(new_environment)

Function to modify the environment of the agent. If the agent is already trained, the trained element should also be adapted to fit this new environment.

Parameters:

Name Type Description Default
new_environment Environment

A modified environment.

required

Returns:

Name Type Description
modified_agent PBVI_Agent

A new pbvi agent with a modified environment

Source code in olfactory_navigation/agents/pbvi_agent.py
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
def modify_environment(self,
                       new_environment: Environment
                       ) -> 'Agent':
    '''
    Function to modify the environment of the agent.
    If the agent is already trained, the trained element should also be adapted to fit this new environment.

    Parameters
    ----------
    new_environment : Environment
        A modified environment.

    Returns
    -------
    modified_agent : PBVI_Agent
        A new pbvi agent with a modified environment
    '''
    # TODO: Fix this to account for other init parameters
    # GPU support
    if self.is_on_gpu:
        return self.on_cpu.modify_environment(new_environment=new_environment)

    # Creating a new agent instance
    modified_agent = self.__class__(environment = new_environment,
                                    thresholds = self.thresholds,
                                    name = self.name)

    # Modifying the value function
    if self.value_function is not None:
        reshaped_vf_array = np.array([cv2.resize(av, np.array(modified_agent.model.state_grid.shape)[::-1]).ravel()
                                      for av in self.value_function.alpha_vector_array.reshape(len(self.value_function), *self.model.state_grid.shape)])
        modified_vf = ValueFunction(modified_agent.model, alpha_vectors=reshaped_vf_array, action_list=self.value_function.actions)
        modified_agent.value_function = modified_vf

    return modified_agent

save(folder=None, force=False, save_environment=False)

The save function for PBVI Agents consists in recording the value function after the training. It saves the agent in a folder with the name of the agent (class name + training timestamp). In this folder, there will be the metadata of the agent (all the attributes) in a json format and the value function.

Optionally, the environment can be saved too to be able to load it alongside the agent for future reuse. If the agent has already been saved, the saving will not happen unless the force parameter is toggled.

Parameters:

Name Type Description Default
folder str

The folder under which to save the agent (a subfolder will be created under this folder). The agent will therefore be saved at /Agent- . By default the current folder is used.

None
force bool

Whether to overwrite an already saved agent with the same name at the same path.

= False
save_environment bool

Whether to save the environment data along with the agent.

= False
Source code in olfactory_navigation/agents/pbvi_agent.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
def save(self,
         folder: str = None,
         force: bool = False,
         save_environment: bool = False
         ) -> None:
    '''
    The save function for PBVI Agents consists in recording the value function after the training.
    It saves the agent in a folder with the name of the agent (class name + training timestamp).
    In this folder, there will be the metadata of the agent (all the attributes) in a json format and the value function.

    Optionally, the environment can be saved too to be able to load it alongside the agent for future reuse.
    If the agent has already been saved, the saving will not happen unless the force parameter is toggled.

    Parameters
    ----------
    folder : str, optional
        The folder under which to save the agent (a subfolder will be created under this folder).
        The agent will therefore be saved at <folder>/Agent-<agent_name> .
        By default the current folder is used.
    force : bool, default = False
        Whether to overwrite an already saved agent with the same name at the same path.
    save_environment : bool, default = False
        Whether to save the environment data along with the agent.
    '''
    assert self.trained_at is not None, "The agent is not trained, there is nothing to save."

    # GPU support
    if self.is_on_gpu:
        self.on_cpu.save(folder=folder, force=force, save_environment=save_environment)
        return

    # Adding env name to folder path
    if folder is None:
        folder = f'./Agent-{self.name}'
    else:
        folder += '/Agent-' + self.name

    # Checking the folder exists or creates it
    if not os.path.exists(folder):
        os.mkdir(folder)
    elif len(os.listdir(folder)):
        if force:
            shutil.rmtree(folder)
            os.mkdir(folder)
        else:
            raise Exception(f'{folder} is not empty. If you want to overwrite the saved model, enable "force".')

    # If requested save environment
    if save_environment:
        self.environment.save(folder=folder)

    # TODO: Add MODEL to save function
    # Generating the metadata arguments dictionary
    arguments = {}
    arguments['name'] = self.name
    arguments['class'] = self.class_name
    if len(self.thresholds.shape) == 2:
        arguments['thresholds'] = {layer_lab: layer_thresholds for layer_lab, layer_thresholds in zip(self.environment.layer_labels, self.thresholds.tolist())}
    else:
        arguments['thresholds'] = self.thresholds.tolist()
    arguments['environment_name'] = self.environment.name
    arguments['environment_saved_at'] = self.environment.saved_at
    arguments['space_aware'] = self.space_aware
    arguments['spacial_subdivisions'] = self.spacial_subdivisions.tolist()
    arguments['action_labels'] = self.action_labels
    arguments['action_set'] = self.action_set.tolist()
    arguments['trained_at'] = self.trained_at

    # Output the arguments to a METADATA file
    with open(folder + '/METADATA.json', 'w') as json_file:
        json.dump(arguments, json_file, indent=4)

    # Save value function
    self.value_function.save(folder=folder, file_name='Value_Function.npy')

    # Finalization
    self.saved_at = os.path.abspath(folder).replace('\\', '/')
    print(f'Agent saved to: {folder}')

train(expansions=10, full_backup=True, update_passes=1, max_belief_growth=10, initial_belief=None, initial_value_function=None, prune_level=1, prune_interval=10, limit_value_function_size=-1, gamma=0.99, eps=1e-06, convergence_stop=False, use_gpu=False, history_tracking_level=1, overwrite_training=False, print_progress=True, print_stats=True, **expand_arguments)

Main loop of the Point-Based Value Iteration algorithm. It consists in 2 steps, Backup and Expand. 1. Expand: Expands the belief set base with a expansion strategy given by the parameter expand_function 2. Backup: Updates the alpha vectors based on the current belief set

Parameters:

Name Type Description Default
expansions int

How many times the algorithm has to expand the belief set. (the size will be doubled every time, eg: for 5, the belief set will be of size 32)

= 10
full_backup bool

Whether to force the backup function has to be run on the full set beliefs uncovered since the beginning or only on the new points.

= True
update_passes int

How many times the backup function has to be run every time the belief set is expanded.

= 1
max_belief_growth int

How many beliefs can be added at every expansion step to the belief set.

= 10
initial_belief BeliefSet or Belief

An initial list of beliefs to start with.

None
initial_value_function ValueFunction

An initial value function to start the solving process with.

None
prune_level int

Parameter to prune the value function further before the expand function.

= 1
prune_interval int

How often to prune the value function. It is counted in number of backup iterations.

= 10
limit_value_function_size int

When the value function size crosses this threshold, a random selection of 'max_belief_growth' alpha vectors will be removed from the value function If set to -1, the value function can grow without bounds.

= -1
use_gpu bool

Whether to use the GPU with cupy array to accelerate solving.

= False
gamma float

The discount factor to value immediate rewards more than long term rewards. The learning rate is 1/gamma.

= 0.99
eps float

The smallest allowed changed for the value function. Below the amount of change, the value function is considered converged and the value iteration process will end early. convergence_stop : bool, default = False

= 1e-6
convergence_stop bool

Whether to compute to compute the change in the value function and stop early if this change is smaller than eps.

= False
history_tracking_level int

How thorough the tracking of the solving process should be. (0: Nothing; 1: Times and sizes of belief sets and value function; 2: The actual value functions and beliefs sets)

= 1
overwrite_training bool

Whether to force the overwriting of the training if a value function already exists for this agent.

= False
print_progress bool

Whether or not to print out the progress of the value iteration process.

= True
print_stats bool

Whether or not to print out statistics at the end of the training run.

= True
expand_arguments kwargs

An arbitrary amount of parameters that will be passed on to the expand function.

{}

Returns:

Name Type Description
solver_history SolverHistory

The history of the solving process with some plotting options.

Source code in olfactory_navigation/agents/pbvi_agent.py
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
def train(self,
          expansions: int = 10,
          full_backup: bool = True,
          update_passes: int = 1,
          max_belief_growth: int = 10,
          initial_belief: BeliefSet | Belief = None,
          initial_value_function: ValueFunction = None,
          prune_level: int = 1,
          prune_interval: int = 10,
          limit_value_function_size: int = -1,
          gamma: float = 0.99,
          eps: float = 1e-6,
          convergence_stop: bool = False,
          use_gpu: bool = False,
          history_tracking_level: int = 1,
          overwrite_training: bool = False,
          print_progress: bool = True,
          print_stats: bool = True,
          **expand_arguments
          ) -> TrainingHistory:
    '''
    Main loop of the Point-Based Value Iteration algorithm.
    It consists in 2 steps, Backup and Expand.
    1. Expand: Expands the belief set base with a expansion strategy given by the parameter expand_function
    2. Backup: Updates the alpha vectors based on the current belief set

    Parameters
    ----------
    expansions : int, default = 10
        How many times the algorithm has to expand the belief set. (the size will be doubled every time, eg: for 5, the belief set will be of size 32)
    full_backup : bool, default = True
        Whether to force the backup function has to be run on the full set beliefs uncovered since the beginning or only on the new points.
    update_passes : int, default = 1
        How many times the backup function has to be run every time the belief set is expanded.
    max_belief_growth : int, default = 10
        How many beliefs can be added at every expansion step to the belief set.
    initial_belief : BeliefSet or Belief, optional
        An initial list of beliefs to start with.
    initial_value_function : ValueFunction, optional
        An initial value function to start the solving process with.
    prune_level : int, default = 1
        Parameter to prune the value function further before the expand function.
    prune_interval : int, default = 10
        How often to prune the value function. It is counted in number of backup iterations.
    limit_value_function_size : int, default = -1
        When the value function size crosses this threshold, a random selection of 'max_belief_growth' alpha vectors will be removed from the value function
        If set to -1, the value function can grow without bounds.
    use_gpu : bool, default = False
        Whether to use the GPU with cupy array to accelerate solving.
    gamma : float, default = 0.99
        The discount factor to value immediate rewards more than long term rewards.
        The learning rate is 1/gamma.
    eps : float, default = 1e-6
        The smallest allowed changed for the value function.
        Below the amount of change, the value function is considered converged and the value iteration process will end early.
        convergence_stop : bool, default = False
    convergence_stop : bool, default = False
        Whether to compute to compute the change in the value function and stop early if this change is smaller than eps.
    history_tracking_level : int, default = 1
        How thorough the tracking of the solving process should be. (0: Nothing; 1: Times and sizes of belief sets and value function; 2: The actual value functions and beliefs sets)
    overwrite_training : bool, default = False
        Whether to force the overwriting of the training if a value function already exists for this agent.
    print_progress : bool, default = True
        Whether or not to print out the progress of the value iteration process.
    print_stats : bool, default = True
        Whether or not to print out statistics at the end of the training run.
    expand_arguments : kwargs
        An arbitrary amount of parameters that will be passed on to the expand function.

    Returns
    -------
    solver_history : SolverHistory
        The history of the solving process with some plotting options.
    '''
    raise NotImplementedError('The train function is not implemented, make a PBVI agent subclass to implement the method')

update_state(action, observation, source_reached)

Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

Parameters:

Name Type Description Default
action ndarray

A 2D array of n movement vectors. If the environment is layered, the 1st component should be the layer.

required
observation ndarray

The observation(s) the agent(s) made.

required
source_reached ndarray

A boolean array of whether the agent(s) have reached the source or not.

required

Returns:

Name Type Description
update_successfull (ndarray, optional)

If nothing is returned, it means all the agent's state updates have been successfull. Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.

Source code in olfactory_navigation/agents/pbvi_agent.py
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
def update_state(self,
                 action: np.ndarray,
                 observation: np.ndarray,
                 source_reached: np.ndarray
                 ) -> None | np.ndarray:
    '''
    Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

    Parameters
    ----------
    action : np.ndarray
        A 2D array of n movement vectors. If the environment is layered, the 1st component should be the layer.
    observation : np.ndarray
        The observation(s) the agent(s) made.
    source_reached : np.ndarray
        A boolean array of whether the agent(s) have reached the source or not.

    Returns
    -------
    update_successfull : np.ndarray, optional
        If nothing is returned, it means all the agent's state updates have been successfull.
        Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.
    '''
    assert self.belief is not None, "Agent was not initialized yet, run the initialize_state function first"
    # GPU support
    xp = np if not self.is_on_gpu else cp

    # Discretizing observations
    observation_ids = self.discretize_observations(observation=observation, action=action, source_reached=source_reached)

    # Update the set of beliefs
    self.belief, provenance = self.belief.update(actions = self.action_played,
                                                 observations = observation_ids,
                                                 raise_on_impossible_belief = False,
                                                 use_reachability = True,
                                                 return_provenance = True)

    # Check for failed updates
    update_successful = xp.isin(xp.arange(len(self.belief)), provenance[:,0])
    self.succeeded_update = update_successful

    return update_successful

TrainingHistory

Class to represent the history of a solver for a POMDP solver. It has mainly the purpose to have visualizations for the solution, belief set and the whole solving history. The visualizations available are: - Belief set plot - Solution plot - Video of value function and belief set evolution over training.

Parameters:

Name Type Description Default
tracking_level int

The tracking level of the solver.

required
model Model

The model the solver has solved.

required
gamma float

The gamma parameter used by the solver (learning rate).

required
eps float

The epsilon parameter used by the solver (covergence bound).

required
expand_append bool

Whether the expand function appends new belief points to the belief set of reloads it all.

required
initial_value_function ValueFunction

The initial value function the solver will use to start the solving process.

required
initial_belief_set BeliefSet

The initial belief set the solver will use to start the solving process.

required

Attributes:

Name Type Description
tracking_level int
model Model
gamma float
eps float
expand_append bool
run_ts datetime

The time at which the SolverHistory object was instantiated which is assumed to be the start of the solving run.

expansion_times list[float]

A list of recorded times of the expand function.

backup_times list[float]

A list of recorded times of the backup function.

alpha_vector_counts list[int]

A list of recorded alpha vector count making up the value function over the solving process.

beliefs_counts list[int]

A list of recorded belief count making up the belief set over the solving process.

value_function_changes list[float]

A list of recorded value function changes (the maximum changed value between 2 value functions).

value_functions list[ValueFunction]

A list of recorded value functions.

belief_sets list[BeliefSet]

A list of recorded belief sets.

solution ValueFunction
explored_beliefs BeliefSet
Source code in olfactory_navigation/agents/pbvi_agent.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class TrainingHistory:
    '''
    Class to represent the history of a solver for a POMDP solver.
    It has mainly the purpose to have visualizations for the solution, belief set and the whole solving history.
    The visualizations available are:
        - Belief set plot
        - Solution plot
        - Video of value function and belief set evolution over training.


    Parameters
    ----------
    tracking_level : int
        The tracking level of the solver.
    model : Model
        The model the solver has solved.
    gamma : float
        The gamma parameter used by the solver (learning rate).
    eps : float
        The epsilon parameter used by the solver (covergence bound).
    expand_append : bool
        Whether the expand function appends new belief points to the belief set of reloads it all.
    initial_value_function : ValueFunction
        The initial value function the solver will use to start the solving process.
    initial_belief_set : BeliefSet
        The initial belief set the solver will use to start the solving process.

    Attributes
    ----------
    tracking_level : int
    model : Model
    gamma : float
    eps : float
    expand_append : bool
    run_ts : datetime
        The time at which the SolverHistory object was instantiated which is assumed to be the start of the solving run.
    expansion_times : list[float]
        A list of recorded times of the expand function.
    backup_times : list[float]
        A list of recorded times of the backup function.
    alpha_vector_counts : list[int]
        A list of recorded alpha vector count making up the value function over the solving process.
    beliefs_counts : list[int]
        A list of recorded belief count making up the belief set over the solving process.
    value_function_changes : list[float]
        A list of recorded value function changes (the maximum changed value between 2 value functions).
    value_functions : list[ValueFunction]
        A list of recorded value functions.
    belief_sets : list[BeliefSet]
        A list of recorded belief sets.
    solution : ValueFunction
    explored_beliefs : BeliefSet
    '''
    def __init__(self,
                 tracking_level: int,
                 model: POMDP,
                 gamma: float,
                 eps: float,
                 expand_append: bool,
                 initial_value_function: ValueFunction,
                 initial_belief_set: BeliefSet
                 ):

        self.tracking_level = tracking_level
        self.model = model
        self.gamma = gamma
        self.eps = eps
        self.run_ts = datetime.now()

        self.expand_append = expand_append

        # Time tracking
        self.expansion_times = []
        self.backup_times = []
        self.pruning_times = []

        # Value function and belief set sizes tracking
        self.alpha_vector_counts = []
        self.beliefs_counts = []
        self.prune_counts = []

        if self.tracking_level >= 1:
            self.alpha_vector_counts.append(len(initial_value_function))
            self.beliefs_counts.append(len(initial_belief_set))

        # Value function and belief set tracking
        self.belief_sets = []
        self.value_functions = []
        self.value_function_changes = []

        if self.tracking_level >= 2:
            self.belief_sets.append(initial_belief_set)
            self.value_functions.append(initial_value_function)


    @property
    def solution(self) -> ValueFunction:
        '''
        The last value function of the solving process.
        '''
        assert self.tracking_level >= 2, "Tracking level is set too low, increase it to 2 if you want to have value function tracking as well."
        return self.value_functions[-1]


    @property
    def explored_beliefs(self) -> BeliefSet:
        '''
        The final set of beliefs explored during the solving.
        '''
        assert self.tracking_level >= 2, "Tracking level is set too low, increase it to 2 if you want to have belief sets tracking as well."
        return self.belief_sets[-1]


    def add_expand_step(self,
                        expansion_time: float,
                        belief_set: BeliefSet
                        ) -> None:
        '''
        Function to add an expansion step in the simulation history by the explored belief set the expand function generated.

        Parameters
        ----------
        expansion_time : float
            The time it took to run a step of expansion of the belief set. (Also known as the exploration step.)
        belief_set : BeliefSet
            The belief set used for the Update step of the solving process.
        '''
        if self.tracking_level >= 1:
            self.expansion_times.append(float(expansion_time))
            self.beliefs_counts.append(len(belief_set))

        if self.tracking_level >= 2:
            self.belief_sets.append(belief_set.on_cpu)


    def add_backup_step(self,
                        backup_time: float,
                        value_function_change: float,
                        value_function: ValueFunction
                        ) -> None:
        '''
        Function to add a backup step in the simulation history by recording the value function the backup function generated.

        Parameters
        ----------
        backup_time : float
            The time it took to run a step of backup of the value function. (Also known as the value function update.)
        value_function_change : float
            The change between the value function of this iteration and of the previous iteration.
        value_function : ValueFunction
            The value function resulting after a step of the solving process.
        '''
        if self.tracking_level >= 1:
            self.backup_times.append(float(backup_time))
            self.alpha_vector_counts.append(len(value_function))
            self.value_function_changes.append(float(value_function_change))

        if self.tracking_level >= 2:
            self.value_functions.append(value_function.on_cpu)


    def add_prune_step(self,
                       prune_time: float,
                       alpha_vectors_pruned: int
                       ) -> None:
        '''
        Function to add a prune step in the simulation history by recording the amount of alpha vectors that were pruned by the pruning function and how long it took.

        Parameters
        ----------
        prune_time : float
            The time it took to run the pruning step.
        alpha_vectors_pruned : int
            How many alpha vectors were pruned.
        '''
        if self.tracking_level >= 1:
            self.pruning_times.append(prune_time)
            self.prune_counts.append(alpha_vectors_pruned)


    @property
    def summary(self) -> str:
        '''
        A summary as a string of the information recorded.
        '''
        summary_str =  f'Summary of Point Based Value Iteration run'
        summary_str += f'\n  - Model: {self.model.state_count} state, {self.model.action_count} action, {self.model.observation_count} observations'
        summary_str += f'\n  - Converged or stopped after {len(self.expansion_times)} expansion steps and {len(self.backup_times)} backup steps.'

        if self.tracking_level >= 1:
            summary_str += f'\n  - Resulting value function has {self.alpha_vector_counts[-1]} alpha vectors.'
            summary_str += f'\n  - Converged in {(sum(self.expansion_times) + sum(self.backup_times)):.4f}s'
            summary_str += f'\n'

            summary_str += f'\n  - Expand function took on average {sum(self.expansion_times) / len(self.expansion_times):.4f}s '
            if self.expand_append:
                summary_str += f'and yielded on average {sum(np.diff(self.beliefs_counts)) / len(self.beliefs_counts[1:]):.2f} beliefs per iteration.'
            else:
                summary_str += f'and yielded on average {sum(self.beliefs_counts[1:]) / len(self.beliefs_counts[1:]):.2f} beliefs per iteration.'
            summary_str += f' ({np.sum(np.divide(self.expansion_times, self.beliefs_counts[1:])) / len(self.expansion_times):.4f}s/it/belief)'

            summary_str += f'\n  - Backup function took on average {sum(self.backup_times) /len(self.backup_times):.4f}s '
            summary_str += f'and yielded on average {np.average(np.diff(self.alpha_vector_counts)):.2f} alpha vectors per iteration.'
            summary_str += f' ({np.sum(np.divide(self.backup_times, self.alpha_vector_counts[1:])) / len(self.backup_times):.4f}s/it/alpha)'

            summary_str += f'\n  - Pruning function took on average {sum(self.pruning_times) /len(self.pruning_times):.4f}s '
            summary_str += f'and yielded on average prunings of {sum(self.prune_counts) / len(self.prune_counts):.2f} alpha vectors per iteration.'

        return summary_str

explored_beliefs property

The final set of beliefs explored during the solving.

solution property

The last value function of the solving process.

summary property

A summary as a string of the information recorded.

add_backup_step(backup_time, value_function_change, value_function)

Function to add a backup step in the simulation history by recording the value function the backup function generated.

Parameters:

Name Type Description Default
backup_time float

The time it took to run a step of backup of the value function. (Also known as the value function update.)

required
value_function_change float

The change between the value function of this iteration and of the previous iteration.

required
value_function ValueFunction

The value function resulting after a step of the solving process.

required
Source code in olfactory_navigation/agents/pbvi_agent.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def add_backup_step(self,
                    backup_time: float,
                    value_function_change: float,
                    value_function: ValueFunction
                    ) -> None:
    '''
    Function to add a backup step in the simulation history by recording the value function the backup function generated.

    Parameters
    ----------
    backup_time : float
        The time it took to run a step of backup of the value function. (Also known as the value function update.)
    value_function_change : float
        The change between the value function of this iteration and of the previous iteration.
    value_function : ValueFunction
        The value function resulting after a step of the solving process.
    '''
    if self.tracking_level >= 1:
        self.backup_times.append(float(backup_time))
        self.alpha_vector_counts.append(len(value_function))
        self.value_function_changes.append(float(value_function_change))

    if self.tracking_level >= 2:
        self.value_functions.append(value_function.on_cpu)

add_expand_step(expansion_time, belief_set)

Function to add an expansion step in the simulation history by the explored belief set the expand function generated.

Parameters:

Name Type Description Default
expansion_time float

The time it took to run a step of expansion of the belief set. (Also known as the exploration step.)

required
belief_set BeliefSet

The belief set used for the Update step of the solving process.

required
Source code in olfactory_navigation/agents/pbvi_agent.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def add_expand_step(self,
                    expansion_time: float,
                    belief_set: BeliefSet
                    ) -> None:
    '''
    Function to add an expansion step in the simulation history by the explored belief set the expand function generated.

    Parameters
    ----------
    expansion_time : float
        The time it took to run a step of expansion of the belief set. (Also known as the exploration step.)
    belief_set : BeliefSet
        The belief set used for the Update step of the solving process.
    '''
    if self.tracking_level >= 1:
        self.expansion_times.append(float(expansion_time))
        self.beliefs_counts.append(len(belief_set))

    if self.tracking_level >= 2:
        self.belief_sets.append(belief_set.on_cpu)

add_prune_step(prune_time, alpha_vectors_pruned)

Function to add a prune step in the simulation history by recording the amount of alpha vectors that were pruned by the pruning function and how long it took.

Parameters:

Name Type Description Default
prune_time float

The time it took to run the pruning step.

required
alpha_vectors_pruned int

How many alpha vectors were pruned.

required
Source code in olfactory_navigation/agents/pbvi_agent.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def add_prune_step(self,
                   prune_time: float,
                   alpha_vectors_pruned: int
                   ) -> None:
    '''
    Function to add a prune step in the simulation history by recording the amount of alpha vectors that were pruned by the pruning function and how long it took.

    Parameters
    ----------
    prune_time : float
        The time it took to run the pruning step.
    alpha_vectors_pruned : int
        How many alpha vectors were pruned.
    '''
    if self.tracking_level >= 1:
        self.pruning_times.append(prune_time)
        self.prune_counts.append(alpha_vectors_pruned)