Skip to content

infotaxis_agent

Infotaxis_Agent

Bases: Agent

An agent following the Infotaxis principle. It is a Model-Based approach that aims to make steps towards where the agent has the greatest likelihood to minimize the entropy of the belief. The belief is (as for the PBVI agent) a probability distribution over the state space of how much the agent is to be confident in each state. The technique was developped and described in the following article: Vergassola, M., Villermaux, E., & Shraiman, B. I. (2007). 'Infotaxis' as a strategy for searching without gradients.

It does not need to be trained to the train(), save() and load() function are not implemented.

Parameters:

Name Type Description Default
environment Environment

The olfactory environment to train the agent with.

required
thresholds float or list[float] or dict[str, float] or dict[str, list[float]]

The olfactory thresholds. If an odor cue above this threshold is detected, the agent detects it, else it does not. If a list of thresholds is provided, he agent should be able to detect |thresholds|+1 levels of odor. A dictionary of (list of) thresholds can also be provided when the environment is layered. In such case, the number of layers provided must match the environment's layers and their labels must match. The thresholds provided will be converted to an array where the levels start with -inf and end with +inf.

3e-6
space_aware bool

Whether the agent is aware of it's own position in space. This is to be used in scenarios where, for example, the agent is an enclosed container and the source is the variable. Note: The observation array will have a different shape when returned to the update_state function!

False
spacial_subdivisions ndarray

How many spacial compartments the agent has to internally represent the space it lives in. By default, it will be as many as there are grid points in the environment.

None
actions dict or ndarray

The set of action available to the agent. It should match the type of environment (ie: if the environment has layers, it should contain a layer component to the action vector, and similarly for a third dimension). Else, a dict of strings and action vectors where the strings represent the action labels. If none is provided, by default, all unit movement vectors are included and shuch for all layers (if the environment has layers.)

None
name str

A custom name to give the agent. If not provided is will be a combination of the class-name and the threshold.

None
seed int

For reproducible randomness.

12131415
model Model

A POMDP model to use to represent the olfactory environment. If not provided, the environment_converter parameter will be used.

None
environment_converter Callable

A function to convert the olfactory environment instance to a POMDP Model instance. By default, we use an exact convertion that keeps the shape of the environment to make the amount of states of the POMDP Model. This parameter will be ignored if the model parameter is provided.

exact_converter
converter_parameters dict

A set of additional parameters to be passed down to the environment converter.

{}

Attributes:

Name Type Description
environment Environment
thresholds ndarray

An array of the thresholds of detection, starting with -inf and ending with +inf. In the case of a 2D array of thresholds, the rows of thresholds apply to the different layers of the environment.

space_aware bool
spacial_subdivisions ndarray
name str
action_set ndarray

The actions allowed of the agent. Formulated as movement vectors as [(layer,) (dz,) dy, dx].

action_labels list[str]

The labels associated to the action vectors present in the action set.

model Model

The environment converted to a POMDP model using the "from_environment" constructor of the pomdp.Model class.

saved_at str

The place on disk where the agent has been saved (None if not saved yet).

on_gpu bool

Whether the agent has been sent to the gpu or not.

class_name str

The name of the class of the agent.

seed int

The seed used for the random operations (to allow for reproducability).

rnd_state RandomState

The random state variable used to generate random values.

cpu_version Agent

An instance of the agent on the CPU. If it already is, it returns itself.

gpu_version Agent

An instance of the agent on the CPU. If it already is, it returns itself.

belief BeliefSet

Used only during simulations. Part of the Agent's status. Where the agent believes he is over the state space. It is a list of n belief points based on how many simulations are running at once.

action_played list[int]

Used only during simulations. Part of the Agent's status. Records what action was last played by the agent. A list of n actions played based on how many simulations are running at once.

Source code in olfactory_navigation/agents/infotaxis_agent.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class Infotaxis_Agent(Agent):
    '''
    An agent following the Infotaxis principle.
    It is a Model-Based approach that aims to make steps towards where the agent has the greatest likelihood to minimize the entropy of the belief.
    The belief is (as for the PBVI agent) a probability distribution over the state space of how much the agent is to be confident in each state.
    The technique was developped and described in the following article: Vergassola, M., Villermaux, E., & Shraiman, B. I. (2007). 'Infotaxis' as a strategy for searching without gradients.

    It does not need to be trained to the train(), save() and load() function are not implemented.


    Parameters
    ----------
    environment : Environment
        The olfactory environment to train the agent with.
    thresholds : float or list[float] or dict[str, float] or dict[str, list[float]], default=3e-6
        The olfactory thresholds. If an odor cue above this threshold is detected, the agent detects it, else it does not.
        If a list of thresholds is provided, he agent should be able to detect |thresholds|+1 levels of odor.
        A dictionary of (list of) thresholds can also be provided when the environment is layered.
        In such case, the number of layers provided must match the environment's layers and their labels must match.
        The thresholds provided will be converted to an array where the levels start with -inf and end with +inf.
    space_aware : bool, default=False
        Whether the agent is aware of it's own position in space.
        This is to be used in scenarios where, for example, the agent is an enclosed container and the source is the variable.
        Note: The observation array will have a different shape when returned to the update_state function!
    spacial_subdivisions : np.ndarray, optional
        How many spacial compartments the agent has to internally represent the space it lives in.
        By default, it will be as many as there are grid points in the environment.
    actions : dict or np.ndarray, optional
        The set of action available to the agent. It should match the type of environment (ie: if the environment has layers, it should contain a layer component to the action vector, and similarly for a third dimension).
        Else, a dict of strings and action vectors where the strings represent the action labels.
        If none is provided, by default, all unit movement vectors are included and shuch for all layers (if the environment has layers.)
    name : str, optional
        A custom name to give the agent. If not provided is will be a combination of the class-name and the threshold.
    seed : int, default=12131415
        For reproducible randomness.
    model : Model, optional
        A POMDP model to use to represent the olfactory environment.
        If not provided, the environment_converter parameter will be used.
    environment_converter : Callable, default=exact_converter
        A function to convert the olfactory environment instance to a POMDP Model instance.
        By default, we use an exact convertion that keeps the shape of the environment to make the amount of states of the POMDP Model.
        This parameter will be ignored if the model parameter is provided.
    converter_parameters : dict, optional
        A set of additional parameters to be passed down to the environment converter.

    Attributes
    ---------
    environment : Environment
    thresholds : np.ndarray
        An array of the thresholds of detection, starting with -inf and ending with +inf.
        In the case of a 2D array of thresholds, the rows of thresholds apply to the different layers of the environment.
    space_aware : bool
    spacial_subdivisions : np.ndarray
    name : str
    action_set : np.ndarray
        The actions allowed of the agent. Formulated as movement vectors as [(layer,) (dz,) dy, dx].
    action_labels : list[str]
        The labels associated to the action vectors present in the action set.
    model : pomdp.Model
        The environment converted to a POMDP model using the "from_environment" constructor of the pomdp.Model class.
    saved_at : str
        The place on disk where the agent has been saved (None if not saved yet).
    on_gpu : bool
        Whether the agent has been sent to the gpu or not.
    class_name : str
        The name of the class of the agent.
    seed : int
        The seed used for the random operations (to allow for reproducability).
    rnd_state : np.random.RandomState
        The random state variable used to generate random values.
    cpu_version : Agent
        An instance of the agent on the CPU. If it already is, it returns itself.
    gpu_version : Agent
        An instance of the agent on the CPU. If it already is, it returns itself.
    belief : BeliefSet
        Used only during simulations.
        Part of the Agent's status. Where the agent believes he is over the state space.
        It is a list of n belief points based on how many simulations are running at once.
    action_played : list[int]
        Used only during simulations.
        Part of the Agent's status. Records what action was last played by the agent.
        A list of n actions played based on how many simulations are running at once.
    '''
    def __init__(self,
                 environment: Environment,
                 thresholds: float | list[float] | dict[str, float] | dict[str, list[float]] = 3e-6,
                 space_aware: bool = False,
                 spacial_subdivisions: np.ndarray | None = None,
                 actions: dict[str, np.ndarray] | np.ndarray | None = None,
                 name: str | None=None,
                 seed: int = 12131415,
                 model: Model | None = None,
                 environment_converter: Callable | None = None,
                 **converter_parameters
                 ) -> None:
        super().__init__(
            environment = environment,
            thresholds = thresholds,
            space_aware = space_aware,
            spacial_subdivisions = spacial_subdivisions,
            actions = actions,
            name = name,
            seed = seed
        )

        # Converting the olfactory environment to a POMDP Model
        if model is not None:
            loaded_model = model
        elif callable(environment_converter):
            loaded_model = environment_converter(agent=self, **converter_parameters)
        else:
            # Using the exact converter
            loaded_model = exact_converter(agent=self)
        self.model:Model = loaded_model

        # Status variables
        self.belief = None
        self.action_played = None


    def to_gpu(self) -> Agent:
        '''
        Function to send the numpy arrays of the agent to the gpu.
        It returns a new instance of the Agent class with the arrays on the gpu.

        Returns
        -------
        gpu_agent
        '''
        # Check whether the agent is already on the gpu or not
        if self.on_gpu:
            return self

        # Warn and overwrite alternate_version in case it already exists
        if self._alternate_version is not None:
            print('[warning] A GPU instance already existed and is being recreated.')
            self._alternate_version = None

        assert gpu_support, "GPU support is not enabled, Cupy might need to be installed..."

        # Generating a new instance
        cls = self.__class__
        gpu_agent = cls.__new__(cls)

        # Copying arguments to gpu
        for arg, val in self.__dict__.items():
            if isinstance(val, np.ndarray):
                setattr(gpu_agent, arg, cp.array(val))
            elif arg == 'rnd_state':
                setattr(gpu_agent, arg, cp.random.RandomState(self.seed))
            elif isinstance(val, Model):
                setattr(gpu_agent, arg, val.gpu_model)
            elif isinstance(val, BeliefSet) or isinstance(val, Belief):
                setattr(gpu_agent, arg, val.to_gpu())
            else:
                setattr(gpu_agent, arg, val)

        # Self reference instances
        self._alternate_version = gpu_agent
        gpu_agent._alternate_version = self

        gpu_agent.on_gpu = True
        return gpu_agent


    def initialize_state(self,
                         n: int = 1,
                         belief: BeliefSet | None = None
                         ) -> None:
        '''
        To use an agent within a simulation, the agent's state needs to be initialized.
        The initialization consists of setting the agent's initial belief.
        Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

        Parameters
        ----------
        n : int, default=1
            How many agents are to be used during the simulation.
        belief : BeliefSet, optional
            An optional set of beliefs to initialize the simulations with.
        '''
        if belief is None:
            self.belief = BeliefSet(self.model, [Belief(self.model) for _ in range(n)])
        else:
            assert len(belief) == n, f"The amount of beliefs provided ({len(belief)}) to initialize the state need to match the amount of stimulations to initialize (n={n})."

            if self.on_gpu and not belief.is_on_gpu:
                self.belief = belief.to_gpu()
            elif not self.on_gpu and belief.is_on_gpu:
                self.belief = belief.to_cpu()
            else:
                self.belief = belief


    def choose_action(self) -> np.ndarray:
        '''
        Function to let the agent or set of agents choose an action based on their current belief.
        Following the Infotaxis principle, it will choose an action that will minimize the sum of next entropies.

        Returns
        -------
        movement_vector : np.ndarray
            A single or a list of actions chosen by the agent(s) based on their belief.
        '''
        xp = np if not self.on_gpu else cp

        n = len(self.belief)

        best_entropy = xp.ones(n) * -1
        best_action = xp.ones(n, dtype=int) * -1

        current_entropy = self.belief.entropies

        for a in self.model.actions:
            total_entropy = xp.zeros(n)

            for o in self.model.observations:
                b_ao = self.belief.update(actions=xp.ones(n, dtype=int)*a,
                                           observations=xp.ones(n, dtype=int)*o,
                                           throw_error=False)

                # Computing entropy
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    b_ao_entropy = b_ao.entropies

                b_prob = xp.dot(self.belief.belief_array, xp.sum(self.model.reachable_transitional_observation_table[:,a,o,:], axis=1))

                total_entropy += (b_prob * (current_entropy - b_ao_entropy))

            # Checking if action is superior to previous best
            superiority_mask = best_entropy < total_entropy
            best_action[superiority_mask] = a
            best_entropy[superiority_mask] = total_entropy[superiority_mask]

        # Recording the action played
        self.action_played = best_action

        # Converting action indexes to movement vectors
        movemement_vector = self.action_set[best_action,:]

        return movemement_vector


    def update_state(self,
                     action: np.ndarray,
                     observation: np.ndarray,
                     source_reached: np.ndarray
                     ) -> None | np.ndarray:
        '''
        Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

        Parameters
        ----------
        action : np.ndarray
            A 2D array of n movement vectors. If the environment is layered, the 1st component should be the layer.
        observation : np.ndarray
            The observation(s) the agent(s) made.
        source_reached : np.ndarray
            A boolean array of whether the agent(s) have reached the source or not.

        Returns
        -------
        update_successfull : np.ndarray, optional
            If nothing is returned, it means all the agent's state updates have been successfull.
            Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.
        '''
        assert self.belief is not None, "Agent was not initialized yet, run the initialize_state function first"


        # Discretizing observations
        observation_ids = self.discretize_observations(observation=observation, action=action, source_reached=source_reached)

        # Update the set of belief
        self.belief = self.belief.update(actions=self.action_played, observations=observation_ids)

        # Check for failed updates
        update_successful = (self.belief.belief_array.sum(axis=1) != 0.0)

        return update_successful


    def kill(self,
             simulations_to_kill: np.ndarray
             ) -> None:
        '''
        Function to kill any simulations that have not reached the source but can't continue further

        Parameters
        ----------
        simulations_to_kill : np.ndarray
            A boolean array of the simulations to kill.
        '''
        if all(simulations_to_kill):
            self.belief = None
        else:
            self.belief = BeliefSet(self.belief.model, self.belief.belief_array[~simulations_to_kill])

choose_action()

Function to let the agent or set of agents choose an action based on their current belief. Following the Infotaxis principle, it will choose an action that will minimize the sum of next entropies.

Returns:

Name Type Description
movement_vector ndarray

A single or a list of actions chosen by the agent(s) based on their belief.

Source code in olfactory_navigation/agents/infotaxis_agent.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def choose_action(self) -> np.ndarray:
    '''
    Function to let the agent or set of agents choose an action based on their current belief.
    Following the Infotaxis principle, it will choose an action that will minimize the sum of next entropies.

    Returns
    -------
    movement_vector : np.ndarray
        A single or a list of actions chosen by the agent(s) based on their belief.
    '''
    xp = np if not self.on_gpu else cp

    n = len(self.belief)

    best_entropy = xp.ones(n) * -1
    best_action = xp.ones(n, dtype=int) * -1

    current_entropy = self.belief.entropies

    for a in self.model.actions:
        total_entropy = xp.zeros(n)

        for o in self.model.observations:
            b_ao = self.belief.update(actions=xp.ones(n, dtype=int)*a,
                                       observations=xp.ones(n, dtype=int)*o,
                                       throw_error=False)

            # Computing entropy
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                b_ao_entropy = b_ao.entropies

            b_prob = xp.dot(self.belief.belief_array, xp.sum(self.model.reachable_transitional_observation_table[:,a,o,:], axis=1))

            total_entropy += (b_prob * (current_entropy - b_ao_entropy))

        # Checking if action is superior to previous best
        superiority_mask = best_entropy < total_entropy
        best_action[superiority_mask] = a
        best_entropy[superiority_mask] = total_entropy[superiority_mask]

    # Recording the action played
    self.action_played = best_action

    # Converting action indexes to movement vectors
    movemement_vector = self.action_set[best_action,:]

    return movemement_vector

initialize_state(n=1, belief=None)

To use an agent within a simulation, the agent's state needs to be initialized. The initialization consists of setting the agent's initial belief. Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

Parameters:

Name Type Description Default
n int

How many agents are to be used during the simulation.

1
belief BeliefSet

An optional set of beliefs to initialize the simulations with.

None
Source code in olfactory_navigation/agents/infotaxis_agent.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def initialize_state(self,
                     n: int = 1,
                     belief: BeliefSet | None = None
                     ) -> None:
    '''
    To use an agent within a simulation, the agent's state needs to be initialized.
    The initialization consists of setting the agent's initial belief.
    Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

    Parameters
    ----------
    n : int, default=1
        How many agents are to be used during the simulation.
    belief : BeliefSet, optional
        An optional set of beliefs to initialize the simulations with.
    '''
    if belief is None:
        self.belief = BeliefSet(self.model, [Belief(self.model) for _ in range(n)])
    else:
        assert len(belief) == n, f"The amount of beliefs provided ({len(belief)}) to initialize the state need to match the amount of stimulations to initialize (n={n})."

        if self.on_gpu and not belief.is_on_gpu:
            self.belief = belief.to_gpu()
        elif not self.on_gpu and belief.is_on_gpu:
            self.belief = belief.to_cpu()
        else:
            self.belief = belief

kill(simulations_to_kill)

Function to kill any simulations that have not reached the source but can't continue further

Parameters:

Name Type Description Default
simulations_to_kill ndarray

A boolean array of the simulations to kill.

required
Source code in olfactory_navigation/agents/infotaxis_agent.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def kill(self,
         simulations_to_kill: np.ndarray
         ) -> None:
    '''
    Function to kill any simulations that have not reached the source but can't continue further

    Parameters
    ----------
    simulations_to_kill : np.ndarray
        A boolean array of the simulations to kill.
    '''
    if all(simulations_to_kill):
        self.belief = None
    else:
        self.belief = BeliefSet(self.belief.model, self.belief.belief_array[~simulations_to_kill])

to_gpu()

Function to send the numpy arrays of the agent to the gpu. It returns a new instance of the Agent class with the arrays on the gpu.

Returns:

Type Description
gpu_agent
Source code in olfactory_navigation/agents/infotaxis_agent.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def to_gpu(self) -> Agent:
    '''
    Function to send the numpy arrays of the agent to the gpu.
    It returns a new instance of the Agent class with the arrays on the gpu.

    Returns
    -------
    gpu_agent
    '''
    # Check whether the agent is already on the gpu or not
    if self.on_gpu:
        return self

    # Warn and overwrite alternate_version in case it already exists
    if self._alternate_version is not None:
        print('[warning] A GPU instance already existed and is being recreated.')
        self._alternate_version = None

    assert gpu_support, "GPU support is not enabled, Cupy might need to be installed..."

    # Generating a new instance
    cls = self.__class__
    gpu_agent = cls.__new__(cls)

    # Copying arguments to gpu
    for arg, val in self.__dict__.items():
        if isinstance(val, np.ndarray):
            setattr(gpu_agent, arg, cp.array(val))
        elif arg == 'rnd_state':
            setattr(gpu_agent, arg, cp.random.RandomState(self.seed))
        elif isinstance(val, Model):
            setattr(gpu_agent, arg, val.gpu_model)
        elif isinstance(val, BeliefSet) or isinstance(val, Belief):
            setattr(gpu_agent, arg, val.to_gpu())
        else:
            setattr(gpu_agent, arg, val)

    # Self reference instances
    self._alternate_version = gpu_agent
    gpu_agent._alternate_version = self

    gpu_agent.on_gpu = True
    return gpu_agent

update_state(action, observation, source_reached)

Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

Parameters:

Name Type Description Default
action ndarray

A 2D array of n movement vectors. If the environment is layered, the 1st component should be the layer.

required
observation ndarray

The observation(s) the agent(s) made.

required
source_reached ndarray

A boolean array of whether the agent(s) have reached the source or not.

required

Returns:

Name Type Description
update_successfull (ndarray, optional)

If nothing is returned, it means all the agent's state updates have been successfull. Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.

Source code in olfactory_navigation/agents/infotaxis_agent.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def update_state(self,
                 action: np.ndarray,
                 observation: np.ndarray,
                 source_reached: np.ndarray
                 ) -> None | np.ndarray:
    '''
    Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

    Parameters
    ----------
    action : np.ndarray
        A 2D array of n movement vectors. If the environment is layered, the 1st component should be the layer.
    observation : np.ndarray
        The observation(s) the agent(s) made.
    source_reached : np.ndarray
        A boolean array of whether the agent(s) have reached the source or not.

    Returns
    -------
    update_successfull : np.ndarray, optional
        If nothing is returned, it means all the agent's state updates have been successfull.
        Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.
    '''
    assert self.belief is not None, "Agent was not initialized yet, run the initialize_state function first"


    # Discretizing observations
    observation_ids = self.discretize_observations(observation=observation, action=action, source_reached=source_reached)

    # Update the set of belief
    self.belief = self.belief.update(actions=self.action_played, observations=observation_ids)

    # Check for failed updates
    update_successful = (self.belief.belief_array.sum(axis=1) != 0.0)

    return update_successful