Skip to content

value_function

AlphaVector

A class to represent an Alpha Vector, a vector representing a plane in |S| dimension for POMDP models.

Parameters:

Name Type Description Default
values ndarray

The actual vector with the value for each state.

required
action int

The action associated with the vector.

required
Source code in olfactory_navigation/agents/model_based_util/value_function.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class AlphaVector:
    '''
    A class to represent an Alpha Vector, a vector representing a plane in |S| dimension for POMDP models.


    Parameters
    ----------
    values : np.ndarray
        The actual vector with the value for each state.
    action : int
        The action associated with the vector.
    '''
    def __init__(self,
                 values: np.ndarray,
                 action: int
                 ) -> None:
        self.values = values
        self.action = int(action)

ValueFunction

Class representing a set of AlphaVectors. One such set approximates the value function of the MDP model.

Parameters:

Name Type Description Default
model Model

The model the value function is associated with.

required
alpha_vectors list[AlphaVector] or ndarray

The alpha vectors composing the value function, if none are provided, it will be empty to start with and AlphaVectors can be appended.

[]
action_list list[int] or ndarray

The actions associated with alpha vectors in the case the alpha vectors are provided as an numpy array.

[]

Attributes:

Name Type Description
model Model

The model the value function is associated with.

alpha_vector_list list[AlphaVector]
alpha_vector_array ndarray
actions ndarray
Source code in olfactory_navigation/agents/model_based_util/value_function.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
class ValueFunction:
    '''
    Class representing a set of AlphaVectors. One such set approximates the value function of the MDP model.


    Parameters
    ----------
    model : mdp.Model
        The model the value function is associated with.
    alpha_vectors : list[AlphaVector] or np.ndarray, optional
        The alpha vectors composing the value function, if none are provided, it will be empty to start with and AlphaVectors can be appended.
    action_list : list[int] or np.ndarray, optional
        The actions associated with alpha vectors in the case the alpha vectors are provided as an numpy array.

    Attributes
    ----------
    model : mdp.Model
        The model the value function is associated with.
    alpha_vector_list : list[AlphaVector]
    alpha_vector_array : np.ndarray
    actions : np.ndarray
    '''
    def __init__(self,
                 model: Model,
                 alpha_vectors: list[AlphaVector] | np.ndarray = [],
                 action_list: list[int] | np.ndarray = []
                 ) -> None:
        self.model = model

        self._vector_list = None
        self._vector_array = None
        self._actions = None

        self.is_on_gpu = False

        # List of alpha vectors
        if isinstance(alpha_vectors, list):
            assert all(v.values.shape[0] == model.state_count for v in alpha_vectors), f"Some or all alpha vectors in the list provided dont have the right size, they should be of shape: {model.state_count}"
            self._vector_list = alpha_vectors

            # Check if on gpu and make sure all vectors are also on the gpu
            if (len(alpha_vectors) > 0) and gpu_support and cp.get_array_module(alpha_vectors[0].values) == cp:
                assert all(cp.get_array_module(v.values) == cp for v in alpha_vectors), "Either all or none of the alpha vectors should be on the GPU, not just some."
                self.is_on_gpu = True

        # As numpy array
        else:
            av_shape = alpha_vectors.shape
            exp_shape = (len(action_list), model.state_count)
            assert av_shape == exp_shape, f"Alpha vector array does not have the right shape (received: {av_shape}; expected: {exp_shape})"

            self._vector_list = []
            for alpha_vect, action in zip(alpha_vectors, action_list):
                self._vector_list.append(AlphaVector(alpha_vect, action))

            # Check if array is on gpu
            if gpu_support and cp.get_array_module(alpha_vectors) == cp:
                self.is_on_gpu = True

        # Deduplication
        self._uniqueness_dict = {alpha_vector.values.tobytes(): alpha_vector for alpha_vector in self._vector_list}
        self._vector_list = list(self._uniqueness_dict.values())

        self._pruning_level = 1


    @property
    def alpha_vector_list(self) -> list[AlphaVector]:
        '''
        A list of AlphaVector objects. If the value function is defined as an matrix of vectors and a list of actions, the list of AlphaVectors will be generated from them.
        '''
        if self._vector_list is None:
            self._vector_list = []
            for alpha_vect, action in zip(self._vector_array, self._actions):
                self._vector_list.append(AlphaVector(alpha_vect, action))
        return self._vector_list


    @property
    def alpha_vector_array(self) -> np.ndarray:
        '''
        A matrix of size N x S, containing all the alpha vectors making up the value function. (N is the number of alpha vectors and S the amount of states in the model)
        If the value function is defined as a list of AlphaVector objects, the matrix will the generated from them.
        '''
        xp = cp if (gpu_support and self.is_on_gpu) else np

        if self._vector_array is None:
            self._vector_array = xp.array([v.values for v in self._vector_list])
            self._actions = xp.array([v.action for v in self._vector_list])
        return self._vector_array


    @property
    def actions(self) -> np.ndarray:
        '''
        A list of N actions corresponding to the N alpha vectors making up the value function.
        If the value function is defined as a list of AlphaVector objects, the list will the generated from the actions of those alpha vector objects.
        '''
        xp = cp if (gpu_support and self.is_on_gpu) else np

        if self._actions is None:
            self._vector_array = xp.array(self._vector_list)
            self._actions = xp.array([v.action for v in self._vector_list])
        return self._actions


    def __len__(self) -> int:
        return len(self._vector_list) if self._vector_list is not None else self._vector_array.shape[0]


    def __add__(self, other_value_function: 'Model') -> 'Model':
        # combined_dict = {**self._uniqueness_dict, **other_value_function._uniqueness_dict}
        combined_dict = {}
        combined_dict.update(self._uniqueness_dict)
        combined_dict.update(other_value_function._uniqueness_dict)

        # Instantiation of the new value function
        new_value_function = super().__new__(self.__class__)
        new_value_function.model = self.model
        new_value_function.is_on_gpu = self.is_on_gpu

        new_value_function._vector_list = list(combined_dict.values())
        new_value_function._uniqueness_dict = combined_dict
        new_value_function._pruning_level = 1

        new_value_function._vector_array = None
        new_value_function._actions = None

        return new_value_function


    def append(self,
               alpha_vector: AlphaVector
               ) -> None:
        '''
        Function to add an alpha vector to the value function.

        Parameters
        ----------
        alpha_vector : AlphaVector
            The alpha vector to be added to the value function.
        '''
        # Make sure size is correct
        assert alpha_vector.values.shape[0] == self.model.state_count, f"Vector to add to value function doesn't have the right size (received: {alpha_vector.values.shape[0]}, expected: {self.model.state_count})"

        # GPU support check
        xp = cp if (gpu_support and self.is_on_gpu) else np
        assert gpu_support and cp.get_array_module(alpha_vector.values) == xp, f"Vector is{' not' if self.is_on_gpu else ''} on GPU while value function is{'' if self.is_on_gpu else ' not'}."

        if self._vector_array is not None:
            self._vector_array = xp.append(self._vector_array, alpha_vector[None,:], axis=0)
            self._actions = xp.append(self._actions, alpha_vector.action)

        if self._vector_list is not None:
            self._vector_list.append(alpha_vector)


    def extend(self,
               other_value_function: 'Model'
               ) -> None:
        '''
        Function to add another value function is place.
        Effectively, it performs the union of the two sets of alpha vectors.

        Parameters
        ----------
        other_value_function : ValueFunction
            The other side of the union.
        '''
        self._uniqueness_dict.update(other_value_function._uniqueness_dict)
        self._vector_list = list(self._uniqueness_dict.values())

        self._vector_array = None
        self._actions = None

        self._pruning_level = 1


    def to_gpu(self) -> 'ValueFunction':
        '''
        Function returning an equivalent value function object with the arrays stored on GPU instead of CPU.

        Returns
        -------
        gpu_value_function : ValueFunction
            A new value function with arrays on GPU.
        '''
        assert gpu_support, "GPU support is not enabled, unable to execute this function"

        gpu_model = self.model.gpu_model

        gpu_value_function = None
        if self._vector_list is not None:
            gpu_alpha_vectors = [AlphaVector(cp.array(av.values), av.action) for av in self._vector_list]
            gpu_value_function = ValueFunction(gpu_model, gpu_alpha_vectors)

        else:
            gpu_vector_array = cp.array(self._vector_array)
            gpu_actions = self._actions if isinstance(self._actions, list) else cp.array(self._actions)
            gpu_value_function = ValueFunction(gpu_model, gpu_vector_array, gpu_actions)

        return gpu_value_function


    def to_cpu(self) -> 'ValueFunction':
        '''
        Function returning an equivalent value function object with the arrays stored on CPU instead of GPU.

        Returns
        -------
        cpu_value_function : ValueFunction
            A new value function with arrays on CPU.
        '''
        assert gpu_support, "GPU support is not enabled, unable to execute this function"

        cpu_model = self.model.cpu_model

        cpu_value_function = None
        if self._vector_list is not None:
            cpu_alpha_vectors = [AlphaVector(cp.asnumpy(av.values), av.action) for av in self._vector_list]
            cpu_value_function = ValueFunction(cpu_model, cpu_alpha_vectors)

        else:
            cpu_vector_array = cp.asnumpy(self._vector_array)
            cpu_actions = self._actions if isinstance(self._actions, list) else cp.asnumpy(self._actions)
            cpu_value_function = ValueFunction(cpu_model, cpu_vector_array, cpu_actions)

        return cpu_value_function


    def prune(self,
              level: int = 1
              ) -> None:
        '''
        Function pruning the set of alpha vectors composing the value function.
        The pruning is as thorough as the level:
            - 2: 1+ Check of absolute domination (check if dominated at each state).
            - 3: 2+ Solves Linear Programming problem for each alpha vector to see if it is dominated by combinations of other vectors.

        Note that the higher the level, the heavier the time impact will be.

        Parameters
        ----------
        level : int, default=1
            Between 0 and 3, how thorough the alpha vector pruning should be.
        '''
        # GPU support check
        xp = cp if (gpu_support and self.is_on_gpu) else np

        # Level 1 or under
        if level < self._pruning_level or level > 3:
            log('Attempting to prune a value function to a level already reached. Returning \'self\'')
            return

        # Level 2 pruning: Check for absolute domination
        if level >= 2 and self._pruning_level < 2:
            non_dominated_vector_indices = []

            for i, v in enumerate(self.alpha_vector_array):
                is_dom_by = xp.all(self.alpha_vector_array >= v, axis=1)
                if len(xp.where(is_dom_by)[0]) == 1:
                    non_dominated_vector_indices.append(i)

            self._vector_array = self._vector_array[non_dominated_vector_indices]
            self._actions = self._actions[non_dominated_vector_indices]

        # Level 3 pruning: LP to check for more complex domination
        if level >= 3:
            assert ilp_support, "ILP support not enabled..."

            pruned_alpha_set = pruned_alpha_set.to_cpu()

            alpha_set = pruned_alpha_set.alpha_vector_array
            non_dominated_vector_indices = []

            for i, alpha_vect in enumerate(alpha_set):
                other_alphas = alpha_set[:i] + alpha_set[(i+1):]

                # Objective function
                c = np.concatenate([np.array([1]), -1*alpha_vect])

                # Alpha vector contraints
                other_count = len(other_alphas)
                A = np.c_[np.ones(other_count), np.multiply(np.array(other_alphas), -1)]
                alpha_constraints = LinearConstraint(A, 0, np.inf)

                # Constraints that sum of beliefs is 1
                belief_constraint = LinearConstraint(np.array([0] + ([1]*self.model.state_count)), 1, 1)

                # Solve problem
                res = milp(c=c, constraints=[alpha_constraints, belief_constraint])

                # Check if dominated
                is_dominated = (res.x[0] - np.dot(res.x[1:], alpha_vect)) >= 0
                if is_dominated:
                    print(alpha_vect)
                    print(' -> Dominated\n')
                else:
                    non_dominated_vector_indices.append(i)

            self._vector_array = self._vector_array[non_dominated_vector_indices]
            self._actions = self._actions[non_dominated_vector_indices]

        # Update the tracked pruned level so far
        self._pruning_level = level


    def evaluate_at(self,
                    belief: Belief | BeliefSet
                    ) -> tuple[float | np.ndarray, int | np.ndarray]:
        '''
        Function to evaluate the value function at a belief point or at a set of belief points.
        It returns a value and the associated action.

        Parameters
        ----------
        belief : Belief or BeliefSet

        Returns
        -------
        value : float or np.ndarray
            The largest value associated with the belief point(s)
        action : int or np.ndarray
            The action(s) associated with the vector having the highest values at the belief point(s).
        '''
        # GPU support check
        xp = cp if (gpu_support and self.is_on_gpu) else np

        best_value = None
        best_action = None

        if isinstance(belief, Belief):
            # Computing values
            belief_values = xp.dot(self.alpha_vector_array, belief.values)

            # Selecting best vectors
            best_vector = xp.argmax(belief_values)

            # From best vector, compute the best value and action
            best_value = float(belief_values[best_vector])
            best_action = int(self.actions[best_vector])
        else:
            # Computing values
            belief_values = xp.matmul(belief.values if isinstance(belief, Belief) else belief.belief_array, self.alpha_vector_array.T)

            # Retrieving the top vectors according to the value function
            best_vectors = xp.argmax(belief_values, axis=1)

            # Retrieving the values and actions associated with the vectors chosen
            best_value = belief_values[xp.arange(belief_values.shape[0]), best_vectors]
            best_action = self.actions[best_vectors]

        return (best_value, best_action)


    def save(self,
             folder: str = './ValueFunctions',
             file_name: str | None = None
             ) -> None:
        '''
        Function to save the value function in a file at a given path. If no path is provided, it will be saved in a subfolder (ValueFunctions) inside the current working directory.
        If no file_name is provided, it be saved as '<current_timestamp>_value_function.csv'.

        Parameters
        ----------
        folder : str, default='./ValueFunctions'
            The path at which the npy file will be saved.
        file_name : str, default='<current_timestamp>_value_function.npy'
            The file name used to save in.
        '''
        if self.is_on_gpu:
            self.to_cpu().save(path=folder, file_name=file_name)
            return

        # Handle file_name
        if file_name is None:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            file_name = timestamp + '_value_function.npy'

        # Make sure that .csv is in the file name
        if '.npy' not in file_name:
            file_name += '.npy'

        # Getting array
        av_array = np.hstack([self.actions[:,None], self.alpha_vector_array])

        np.save(folder + '/' + file_name, av_array)


    @classmethod
    def load(cls,
             file: str,
             model: Model
             ) -> 'ValueFunction':
        '''
        Function to load the value function from a csv file.

        Parameters
        ----------
        file : str
            The path and file_name of the value function to be loaded.
        model : mdp.Model
            The model the value function is linked to.

        Returns
        -------
        loaded_value_function : ValueFunction
            The loaded value function.
        '''
        av_array = np.load(file)

        loaded_value_function = ValueFunction(model=model,
                                              alpha_vectors=av_array[:,1:],
                                              action_list=av_array[:,0].astype(int))

        return loaded_value_function


    def plot(self,
             as_grid: bool = False,
             size: int = 5,
             belief_set: np.ndarray = None
             ) -> None:
        '''
        Function to plot out the value function in 2 or 3 dimensions if possible and the as_grid parameter is kept to false. Else, the value function is plot as a grid.
        If a belief set array is provided and the model is a 2- or 3-model, it will be plot alongside the value function.

        Parameters
        ----------
        as_grid : bool, default=False
            Forces the plot to be plot as a grid.
        size : int, default=5
            The actual plot scale.
        belief_set : np.ndarray, optional
            A set of belief to plot the belief points that were explored.
        '''
        assert len(self) > 0, "Value function is empty, plotting is impossible..."

        # If on GPU, convert to CPU and plot that one
        if self.is_on_gpu:
            print('[Warning] Value function on GPU, converting to numpy before plotting...')
            cpu_value_function = self.to_cpu()
            cpu_value_function.plot(as_grid, size, belief_set)
            return

        func = None
        if as_grid:
            func = self._plot_grid
        elif self.model.state_count == 2:
            func = self._plot_2D
        elif self.model.state_count == 3:
            func = self._plot_3D
        else:
            print('[Warning] \'as_grid\' parameter set to False but state count is >3 so it will be plotted as a grid')
            func = self._plot_grid

        func(size, belief_set)


    def _plot_2D(self, size, belief_set=None):
        x = np.linspace(0, 1, 100)

        plt.figure(figsize=(int(size*1.5),size))
        grid_spec = {'height_ratios': ([1] if belief_set is None else [19,1])}
        _, ax = plt.subplots((2 if belief_set is not None else 1),1,sharex=True,gridspec_kw=grid_spec)

        # Vector plotting
        alpha_vects = self.alpha_vector_array

        m = alpha_vects[:,1] - alpha_vects[:,0] # type: ignore
        m = m.reshape(m.shape[0],1)

        x = x.reshape((1,x.shape[0])).repeat(m.shape[0],axis=0)
        y = (m*x) + alpha_vects[:,0].reshape(m.shape[0],1)

        ax0 = ax[0] if belief_set is not None else ax
        for i, alpha in enumerate(self.alpha_vector_list):
            ax0.plot(x[i,:], y[i,:], color=COLOR_LIST[alpha.action]['id']) # type: ignore

        # Set title
        title = 'Value function' + ('' if belief_set is None else ' and explored belief points')
        ax0.set_title(title)

        # X-axis setting
        ticks = [0,0.25,0.5,0.75,1]
        x_ticks = [str(t) for t in ticks]
        x_ticks[0] = self.model.state_labels[0]
        x_ticks[-1] = self.model.state_labels[1]

        ax0.set_xticks(ticks, x_ticks) # type: ignore

        # Action legend
        proxy = [patches.Rectangle((0,0),1,1,fc = COLOR_LIST[a]['id']) for a in self.model.actions]
        ax0.legend(proxy, self.model.action_labels, title='Actions') # type: ignore

        # Belief plotting
        if belief_set is not None:
            beliefs_x = belief_set.belief_array[:,1]
            ax[1].scatter(beliefs_x, np.zeros(beliefs_x.shape[0]), c='red')
            ax[1].get_yaxis().set_visible(False)
            ax[1].axhline(0, color='black')
            ax[1].set_xlabel('Belief space')
        else:
            ax0.set_xlabel('Belief space')

        # Axis labels
        ax0.set_ylabel('V(b)')


    def _plot_3D(self, size, belief_set=None):

        def get_alpha_vect_z(xx, yy, alpha_vect):
            x0, y0, z0 = [0, 0, alpha_vect[0]]
            x1, y1, z1 = [1, 0, alpha_vect[1]]
            x2, y2, z2 = [0, 1, alpha_vect[2]]

            ux, uy, uz = u = [x1-x0, y1-y0, z1-z0]
            vx, vy, vz = v = [x2-x0, y2-y0, z2-z0]

            u_cross_v = [uy*vz-uz*vy, uz*vx-ux*vz, ux*vy-uy*vx]

            point  = np.array([0, 0, alpha_vect[0]])
            normal = np.array(u_cross_v)

            d = -point.dot(normal)

            z = (-normal[0] * xx - normal[1] * yy - d) * 1. / normal[2]

            return z

        def get_plane_gradient(alpha_vect):

            x0, y0, z0 = [0, 0, alpha_vect[0]]
            x1, y1, z1 = [1, 0, alpha_vect[1]]
            x2, y2, z2 = [0, 1, alpha_vect[2]]

            ux, uy, uz = u = [x1-x0, y1-y0, z1-z0]
            vx, vy, vz = v = [x2-x0, y2-y0, z2-z0]

            u_cross_v = [uy*vz-uz*vy, uz*vx-ux*vz, ux*vy-uy*vx]

            normal_vector = np.array(u_cross_v)
            normal_vector_norm = float(np.linalg.norm(normal_vector))
            normal_vector = np.divide(normal_vector, normal_vector_norm)
            normal_vector[2] = 0

            return np.linalg.norm(normal_vector)

        # Actual plotting
        x = np.linspace(0, 1, 1000)
        y = np.linspace(0, 1, 1000)

        xx, yy = np.meshgrid(x, y)

        max_z = np.zeros((xx.shape[0], yy.shape[0]))
        best_a = (np.zeros((xx.shape[0], yy.shape[0])))
        plane = (np.zeros((xx.shape[0], yy.shape[0])))
        gradients = (np.zeros((xx.shape[0], yy.shape[0])))

        for alpha in self.alpha_vector_list:

            z = get_alpha_vect_z(xx, yy, alpha.values)

            # Action array update
            new_a_mask = np.argmax(np.array([max_z, z]), axis=0)

            best_a[new_a_mask == 1] = alpha.action

            plane[new_a_mask == 1] = random.randrange(100)

            alpha_gradient = get_plane_gradient(alpha.values)
            gradients[new_a_mask == 1] = alpha_gradient

            # Max z update
            max_z = np.max(np.array([max_z, z]), axis=0)

        for x_i, x_val in enumerate(x):
            for y_i, y_val in enumerate(y):
                if (x_val+y_val) > 1:
                    max_z[x_i, y_i] = np.nan
                    plane[x_i, y_i] = np.nan
                    gradients[x_i, y_i] = np.nan
                    best_a[x_i, y_i] = np.nan

        belief_points = None
        if belief_set is not None:
            belief_points = belief_set.belief_array[:,1:]

        fig, ((ax1, ax2),(ax3,ax4)) = plt.subplots(2, 2, figsize=(size*4,size*3.5), sharex=True, sharey=True)

        # Set ticks
        ticks = [0,0.25,0.5,0.75,1]
        x_ticks = [str(t) for t in ticks]
        x_ticks[0] = self.model.state_labels[0]
        x_ticks[-1] = self.model.state_labels[1]

        y_ticks = [str(t) for t in ticks]
        y_ticks[0] = self.model.state_labels[0]
        y_ticks[-1] = self.model.state_labels[2]

        plt.setp([ax1,ax2,ax3,ax4], xticks=ticks, xticklabels=x_ticks, yticks=ticks, yticklabels=y_ticks)

        # Value function ax
        ax1.set_title("Value function")
        ax1_plot = ax1.contourf(x, y, max_z, 100, cmap="viridis")
        plt.colorbar(ax1_plot, ax=ax1)

        # Alpha planes ax
        ax2.set_title("Alpha planes")
        ax2_plot = ax2.contourf(x, y, plane, 100, cmap="viridis")
        plt.colorbar(ax2_plot, ax=ax2)

        # Gradient of planes ax
        ax3.set_title("Gradients of planes")
        ax3_plot = ax3.contourf(x, y, gradients, 100, cmap="Blues")
        plt.colorbar(ax3_plot, ax=ax3)

        # Action policy ax
        ax4.set_title("Action policy")
        ax4.contourf(x, y, best_a, 1, colors=[c['id'] for c in COLOR_LIST])
        proxy = [patches.Rectangle((0,0),1,1,fc = COLOR_LIST[int(a)]['id']) for a in self.model.actions]
        ax4.legend(proxy, self.model.action_labels, title='Actions')

        if belief_points is not None:
            for ax in [ax1,ax2,ax3,ax4]:
                ax.scatter(belief_points[:,0], belief_points[:,1], s=1, c='black')


    def _plot_grid(self, size=5, belief_set=None):
        value_table = np.max(self.alpha_vector_array, axis=0)[self.model.state_grid]
        best_action_table = np.array(self.actions)[np.argmax(self.alpha_vector_array, axis=0)][self.model.state_grid]
        best_action_colors = COLOR_ARRAY[best_action_table]

        dimensions = self.model.state_grid.shape

        fig, (ax1,ax2) = plt.subplots(1,2, figsize=(size*2, size), width_ratios=(0.55,0.45))

        # Ticks
        x_ticks = np.arange(0, dimensions[1], (1 if dimensions[1] < 10 else int(dimensions[1] / 10)))
        y_ticks = np.arange(0, dimensions[0], (1 if dimensions[0] < 5 else int(dimensions[0] / 5)))

        ax1.set_title('Value function')
        ax1_plot = ax1.imshow(value_table)

        if dimensions[0] >= dimensions[1]: # If higher than wide 
            plt.colorbar(ax1_plot, ax=ax1)
        else:
            plt.colorbar(ax1_plot, ax=ax1, location='bottom', orientation='horizontal')

        ax1.set_xticks(x_ticks)
        ax1.set_yticks(y_ticks)

        ax2.set_title('Action policy')
        ax2.imshow(best_action_colors)
        p = [ patches.Patch(color=COLOR_LIST[int(i)]['id'], label=str(self.model.action_labels[int(i)])) for i in self.model.actions]
        ax2.legend(handles=p, bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0., title='Actions')
        ax2.set_xticks(x_ticks)
        ax2.set_yticks(y_ticks)

actions: np.ndarray property

A list of N actions corresponding to the N alpha vectors making up the value function. If the value function is defined as a list of AlphaVector objects, the list will the generated from the actions of those alpha vector objects.

alpha_vector_array: np.ndarray property

A matrix of size N x S, containing all the alpha vectors making up the value function. (N is the number of alpha vectors and S the amount of states in the model) If the value function is defined as a list of AlphaVector objects, the matrix will the generated from them.

alpha_vector_list: list[AlphaVector] property

A list of AlphaVector objects. If the value function is defined as an matrix of vectors and a list of actions, the list of AlphaVectors will be generated from them.

append(alpha_vector)

Function to add an alpha vector to the value function.

Parameters:

Name Type Description Default
alpha_vector AlphaVector

The alpha vector to be added to the value function.

required
Source code in olfactory_navigation/agents/model_based_util/value_function.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def append(self,
           alpha_vector: AlphaVector
           ) -> None:
    '''
    Function to add an alpha vector to the value function.

    Parameters
    ----------
    alpha_vector : AlphaVector
        The alpha vector to be added to the value function.
    '''
    # Make sure size is correct
    assert alpha_vector.values.shape[0] == self.model.state_count, f"Vector to add to value function doesn't have the right size (received: {alpha_vector.values.shape[0]}, expected: {self.model.state_count})"

    # GPU support check
    xp = cp if (gpu_support and self.is_on_gpu) else np
    assert gpu_support and cp.get_array_module(alpha_vector.values) == xp, f"Vector is{' not' if self.is_on_gpu else ''} on GPU while value function is{'' if self.is_on_gpu else ' not'}."

    if self._vector_array is not None:
        self._vector_array = xp.append(self._vector_array, alpha_vector[None,:], axis=0)
        self._actions = xp.append(self._actions, alpha_vector.action)

    if self._vector_list is not None:
        self._vector_list.append(alpha_vector)

evaluate_at(belief)

Function to evaluate the value function at a belief point or at a set of belief points. It returns a value and the associated action.

Parameters:

Name Type Description Default
belief Belief or BeliefSet
required

Returns:

Name Type Description
value float or ndarray

The largest value associated with the belief point(s)

action int or ndarray

The action(s) associated with the vector having the highest values at the belief point(s).

Source code in olfactory_navigation/agents/model_based_util/value_function.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def evaluate_at(self,
                belief: Belief | BeliefSet
                ) -> tuple[float | np.ndarray, int | np.ndarray]:
    '''
    Function to evaluate the value function at a belief point or at a set of belief points.
    It returns a value and the associated action.

    Parameters
    ----------
    belief : Belief or BeliefSet

    Returns
    -------
    value : float or np.ndarray
        The largest value associated with the belief point(s)
    action : int or np.ndarray
        The action(s) associated with the vector having the highest values at the belief point(s).
    '''
    # GPU support check
    xp = cp if (gpu_support and self.is_on_gpu) else np

    best_value = None
    best_action = None

    if isinstance(belief, Belief):
        # Computing values
        belief_values = xp.dot(self.alpha_vector_array, belief.values)

        # Selecting best vectors
        best_vector = xp.argmax(belief_values)

        # From best vector, compute the best value and action
        best_value = float(belief_values[best_vector])
        best_action = int(self.actions[best_vector])
    else:
        # Computing values
        belief_values = xp.matmul(belief.values if isinstance(belief, Belief) else belief.belief_array, self.alpha_vector_array.T)

        # Retrieving the top vectors according to the value function
        best_vectors = xp.argmax(belief_values, axis=1)

        # Retrieving the values and actions associated with the vectors chosen
        best_value = belief_values[xp.arange(belief_values.shape[0]), best_vectors]
        best_action = self.actions[best_vectors]

    return (best_value, best_action)

extend(other_value_function)

Function to add another value function is place. Effectively, it performs the union of the two sets of alpha vectors.

Parameters:

Name Type Description Default
other_value_function ValueFunction

The other side of the union.

required
Source code in olfactory_navigation/agents/model_based_util/value_function.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def extend(self,
           other_value_function: 'Model'
           ) -> None:
    '''
    Function to add another value function is place.
    Effectively, it performs the union of the two sets of alpha vectors.

    Parameters
    ----------
    other_value_function : ValueFunction
        The other side of the union.
    '''
    self._uniqueness_dict.update(other_value_function._uniqueness_dict)
    self._vector_list = list(self._uniqueness_dict.values())

    self._vector_array = None
    self._actions = None

    self._pruning_level = 1

load(file, model) classmethod

Function to load the value function from a csv file.

Parameters:

Name Type Description Default
file str

The path and file_name of the value function to be loaded.

required
model Model

The model the value function is linked to.

required

Returns:

Name Type Description
loaded_value_function ValueFunction

The loaded value function.

Source code in olfactory_navigation/agents/model_based_util/value_function.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
@classmethod
def load(cls,
         file: str,
         model: Model
         ) -> 'ValueFunction':
    '''
    Function to load the value function from a csv file.

    Parameters
    ----------
    file : str
        The path and file_name of the value function to be loaded.
    model : mdp.Model
        The model the value function is linked to.

    Returns
    -------
    loaded_value_function : ValueFunction
        The loaded value function.
    '''
    av_array = np.load(file)

    loaded_value_function = ValueFunction(model=model,
                                          alpha_vectors=av_array[:,1:],
                                          action_list=av_array[:,0].astype(int))

    return loaded_value_function

plot(as_grid=False, size=5, belief_set=None)

Function to plot out the value function in 2 or 3 dimensions if possible and the as_grid parameter is kept to false. Else, the value function is plot as a grid. If a belief set array is provided and the model is a 2- or 3-model, it will be plot alongside the value function.

Parameters:

Name Type Description Default
as_grid bool

Forces the plot to be plot as a grid.

False
size int

The actual plot scale.

5
belief_set ndarray

A set of belief to plot the belief points that were explored.

None
Source code in olfactory_navigation/agents/model_based_util/value_function.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
def plot(self,
         as_grid: bool = False,
         size: int = 5,
         belief_set: np.ndarray = None
         ) -> None:
    '''
    Function to plot out the value function in 2 or 3 dimensions if possible and the as_grid parameter is kept to false. Else, the value function is plot as a grid.
    If a belief set array is provided and the model is a 2- or 3-model, it will be plot alongside the value function.

    Parameters
    ----------
    as_grid : bool, default=False
        Forces the plot to be plot as a grid.
    size : int, default=5
        The actual plot scale.
    belief_set : np.ndarray, optional
        A set of belief to plot the belief points that were explored.
    '''
    assert len(self) > 0, "Value function is empty, plotting is impossible..."

    # If on GPU, convert to CPU and plot that one
    if self.is_on_gpu:
        print('[Warning] Value function on GPU, converting to numpy before plotting...')
        cpu_value_function = self.to_cpu()
        cpu_value_function.plot(as_grid, size, belief_set)
        return

    func = None
    if as_grid:
        func = self._plot_grid
    elif self.model.state_count == 2:
        func = self._plot_2D
    elif self.model.state_count == 3:
        func = self._plot_3D
    else:
        print('[Warning] \'as_grid\' parameter set to False but state count is >3 so it will be plotted as a grid')
        func = self._plot_grid

    func(size, belief_set)

prune(level=1)

Function pruning the set of alpha vectors composing the value function. The pruning is as thorough as the level: - 2: 1+ Check of absolute domination (check if dominated at each state). - 3: 2+ Solves Linear Programming problem for each alpha vector to see if it is dominated by combinations of other vectors.

Note that the higher the level, the heavier the time impact will be.

Parameters:

Name Type Description Default
level int

Between 0 and 3, how thorough the alpha vector pruning should be.

1
Source code in olfactory_navigation/agents/model_based_util/value_function.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def prune(self,
          level: int = 1
          ) -> None:
    '''
    Function pruning the set of alpha vectors composing the value function.
    The pruning is as thorough as the level:
        - 2: 1+ Check of absolute domination (check if dominated at each state).
        - 3: 2+ Solves Linear Programming problem for each alpha vector to see if it is dominated by combinations of other vectors.

    Note that the higher the level, the heavier the time impact will be.

    Parameters
    ----------
    level : int, default=1
        Between 0 and 3, how thorough the alpha vector pruning should be.
    '''
    # GPU support check
    xp = cp if (gpu_support and self.is_on_gpu) else np

    # Level 1 or under
    if level < self._pruning_level or level > 3:
        log('Attempting to prune a value function to a level already reached. Returning \'self\'')
        return

    # Level 2 pruning: Check for absolute domination
    if level >= 2 and self._pruning_level < 2:
        non_dominated_vector_indices = []

        for i, v in enumerate(self.alpha_vector_array):
            is_dom_by = xp.all(self.alpha_vector_array >= v, axis=1)
            if len(xp.where(is_dom_by)[0]) == 1:
                non_dominated_vector_indices.append(i)

        self._vector_array = self._vector_array[non_dominated_vector_indices]
        self._actions = self._actions[non_dominated_vector_indices]

    # Level 3 pruning: LP to check for more complex domination
    if level >= 3:
        assert ilp_support, "ILP support not enabled..."

        pruned_alpha_set = pruned_alpha_set.to_cpu()

        alpha_set = pruned_alpha_set.alpha_vector_array
        non_dominated_vector_indices = []

        for i, alpha_vect in enumerate(alpha_set):
            other_alphas = alpha_set[:i] + alpha_set[(i+1):]

            # Objective function
            c = np.concatenate([np.array([1]), -1*alpha_vect])

            # Alpha vector contraints
            other_count = len(other_alphas)
            A = np.c_[np.ones(other_count), np.multiply(np.array(other_alphas), -1)]
            alpha_constraints = LinearConstraint(A, 0, np.inf)

            # Constraints that sum of beliefs is 1
            belief_constraint = LinearConstraint(np.array([0] + ([1]*self.model.state_count)), 1, 1)

            # Solve problem
            res = milp(c=c, constraints=[alpha_constraints, belief_constraint])

            # Check if dominated
            is_dominated = (res.x[0] - np.dot(res.x[1:], alpha_vect)) >= 0
            if is_dominated:
                print(alpha_vect)
                print(' -> Dominated\n')
            else:
                non_dominated_vector_indices.append(i)

        self._vector_array = self._vector_array[non_dominated_vector_indices]
        self._actions = self._actions[non_dominated_vector_indices]

    # Update the tracked pruned level so far
    self._pruning_level = level

save(folder='./ValueFunctions', file_name=None)

Function to save the value function in a file at a given path. If no path is provided, it will be saved in a subfolder (ValueFunctions) inside the current working directory. If no file_name is provided, it be saved as '_value_function.csv'.

Parameters:

Name Type Description Default
folder str

The path at which the npy file will be saved.

'./ValueFunctions'
file_name str

The file name used to save in.

'<current_timestamp>_value_function.npy'
Source code in olfactory_navigation/agents/model_based_util/value_function.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
def save(self,
         folder: str = './ValueFunctions',
         file_name: str | None = None
         ) -> None:
    '''
    Function to save the value function in a file at a given path. If no path is provided, it will be saved in a subfolder (ValueFunctions) inside the current working directory.
    If no file_name is provided, it be saved as '<current_timestamp>_value_function.csv'.

    Parameters
    ----------
    folder : str, default='./ValueFunctions'
        The path at which the npy file will be saved.
    file_name : str, default='<current_timestamp>_value_function.npy'
        The file name used to save in.
    '''
    if self.is_on_gpu:
        self.to_cpu().save(path=folder, file_name=file_name)
        return

    # Handle file_name
    if file_name is None:
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        file_name = timestamp + '_value_function.npy'

    # Make sure that .csv is in the file name
    if '.npy' not in file_name:
        file_name += '.npy'

    # Getting array
    av_array = np.hstack([self.actions[:,None], self.alpha_vector_array])

    np.save(folder + '/' + file_name, av_array)

to_cpu()

Function returning an equivalent value function object with the arrays stored on CPU instead of GPU.

Returns:

Name Type Description
cpu_value_function ValueFunction

A new value function with arrays on CPU.

Source code in olfactory_navigation/agents/model_based_util/value_function.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def to_cpu(self) -> 'ValueFunction':
    '''
    Function returning an equivalent value function object with the arrays stored on CPU instead of GPU.

    Returns
    -------
    cpu_value_function : ValueFunction
        A new value function with arrays on CPU.
    '''
    assert gpu_support, "GPU support is not enabled, unable to execute this function"

    cpu_model = self.model.cpu_model

    cpu_value_function = None
    if self._vector_list is not None:
        cpu_alpha_vectors = [AlphaVector(cp.asnumpy(av.values), av.action) for av in self._vector_list]
        cpu_value_function = ValueFunction(cpu_model, cpu_alpha_vectors)

    else:
        cpu_vector_array = cp.asnumpy(self._vector_array)
        cpu_actions = self._actions if isinstance(self._actions, list) else cp.asnumpy(self._actions)
        cpu_value_function = ValueFunction(cpu_model, cpu_vector_array, cpu_actions)

    return cpu_value_function

to_gpu()

Function returning an equivalent value function object with the arrays stored on GPU instead of CPU.

Returns:

Name Type Description
gpu_value_function ValueFunction

A new value function with arrays on GPU.

Source code in olfactory_navigation/agents/model_based_util/value_function.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def to_gpu(self) -> 'ValueFunction':
    '''
    Function returning an equivalent value function object with the arrays stored on GPU instead of CPU.

    Returns
    -------
    gpu_value_function : ValueFunction
        A new value function with arrays on GPU.
    '''
    assert gpu_support, "GPU support is not enabled, unable to execute this function"

    gpu_model = self.model.gpu_model

    gpu_value_function = None
    if self._vector_list is not None:
        gpu_alpha_vectors = [AlphaVector(cp.array(av.values), av.action) for av in self._vector_list]
        gpu_value_function = ValueFunction(gpu_model, gpu_alpha_vectors)

    else:
        gpu_vector_array = cp.array(self._vector_array)
        gpu_actions = self._actions if isinstance(self._actions, list) else cp.array(self._actions)
        gpu_value_function = ValueFunction(gpu_model, gpu_vector_array, gpu_actions)

    return gpu_value_function