Skip to content

infotaxis_agent

Infotaxis_Agent

Bases: Agent

An agent following the Infotaxis principle. It is a Model-Based approach that aims to make steps towards where the agent has the greatest likelihood to minimize the entropy of the belief. The belief is (as for the PBVI agent) a probability distribution over the state space of how much the agent is to be confident in each state. The technique was developped and described in the following article: Vergassola, M., Villermaux, E., & Shraiman, B. I. (2007). 'Infotaxis' as a strategy for searching without gradients.

It does not need to be trained to the train(), save() and load() function are not implemented.

Parameters:

Name Type Description Default
environment Environment

The olfactory environment to train the agent with.

required
threshold float or list[float]

The olfactory threshold. If an odor cue above this threshold is detected, the agent detects it, else it does not. If a list of threshold is provided, he agent should be able to detect |thresholds|+1 levels of odor.

3e-6
actions dict or ndarray

The set of action available to the agent. It should match the type of environment (ie: if the environment has layers, it should contain a layer component to the action vector, and similarly for a third dimension). Else, a dict of strings and action vectors where the strings represent the action labels. If none is provided, by default, all unit movement vectors are included and shuch for all layers (if the environment has layers.)

None
name str

A custom name to give the agent. If not provided is will be a combination of the class-name and the threshold.

None
seed int

For reproducible randomness.

12131415
model Model

A POMDP model to use to represent the olfactory environment. If not provided, the environment_converter parameter will be used.

None
environment_converter Callable

A function to convert the olfactory environment instance to a POMDP Model instance. By default, we use an exact convertion that keeps the shape of the environment to make the amount of states of the POMDP Model. This parameter will be ignored if the model parameter is provided.

exact_converter
converter_parameters dict

A set of additional parameters to be passed down to the environment converter.

{}

Attributes:

Name Type Description
environment Environment
threshold float or list[float]
name str
action_set ndarray

The actions allowed of the agent. Formulated as movement vectors as [(layer,) (dz,) dy, dx].

action_labels list[str]

The labels associated to the action vectors present in the action set.

model Model

The environment converted to a POMDP model using the "from_environment" constructor of the pomdp.Model class.

saved_at str

The place on disk where the agent has been saved (None if not saved yet).

on_gpu bool

Whether the agent has been sent to the gpu or not.

class_name str

The name of the class of the agent.

seed int

The seed used for the random operations (to allow for reproducability).

rnd_state RandomState

The random state variable used to generate random values.

belief BeliefSet

Used only during simulations. Part of the Agent's status. Where the agent believes he is over the state space. It is a list of n belief points based on how many simulations are running at once.

action_played list[int]

Used only during simulations. Part of the Agent's status. Records what action was last played by the agent. A list of n actions played based on how many simulations are running at once.

Source code in olfactory_navigation/agents/infotaxis_agent.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class Infotaxis_Agent(Agent):
    '''
    An agent following the Infotaxis principle.
    It is a Model-Based approach that aims to make steps towards where the agent has the greatest likelihood to minimize the entropy of the belief.
    The belief is (as for the PBVI agent) a probability distribution over the state space of how much the agent is to be confident in each state.
    The technique was developped and described in the following article: Vergassola, M., Villermaux, E., & Shraiman, B. I. (2007). 'Infotaxis' as a strategy for searching without gradients.

    It does not need to be trained to the train(), save() and load() function are not implemented.


    Parameters
    ----------
    environment : Environment
        The olfactory environment to train the agent with.
    threshold : float or list[float], default=3e-6
        The olfactory threshold. If an odor cue above this threshold is detected, the agent detects it, else it does not.
        If a list of threshold is provided, he agent should be able to detect |thresholds|+1 levels of odor.
    actions : dict or np.ndarray, optional
        The set of action available to the agent. It should match the type of environment (ie: if the environment has layers, it should contain a layer component to the action vector, and similarly for a third dimension).
        Else, a dict of strings and action vectors where the strings represent the action labels.
        If none is provided, by default, all unit movement vectors are included and shuch for all layers (if the environment has layers.)
    name : str, optional
        A custom name to give the agent. If not provided is will be a combination of the class-name and the threshold.
    seed : int, default=12131415
        For reproducible randomness.
    model : Model, optional
        A POMDP model to use to represent the olfactory environment.
        If not provided, the environment_converter parameter will be used.
    environment_converter : Callable, default=exact_converter
        A function to convert the olfactory environment instance to a POMDP Model instance.
        By default, we use an exact convertion that keeps the shape of the environment to make the amount of states of the POMDP Model.
        This parameter will be ignored if the model parameter is provided.
    converter_parameters : dict, optional
        A set of additional parameters to be passed down to the environment converter.

    Attributes
    ---------
    environment : Environment
    threshold : float or list[float]
    name : str
    action_set : np.ndarray
        The actions allowed of the agent. Formulated as movement vectors as [(layer,) (dz,) dy, dx].
    action_labels : list[str]
        The labels associated to the action vectors present in the action set.
    model : pomdp.Model
        The environment converted to a POMDP model using the "from_environment" constructor of the pomdp.Model class.
    saved_at : str
        The place on disk where the agent has been saved (None if not saved yet).
    on_gpu : bool
        Whether the agent has been sent to the gpu or not.
    class_name : str
        The name of the class of the agent.
    seed : int
        The seed used for the random operations (to allow for reproducability).
    rnd_state : np.random.RandomState
        The random state variable used to generate random values.
    belief : BeliefSet
        Used only during simulations.
        Part of the Agent's status. Where the agent believes he is over the state space.
        It is a list of n belief points based on how many simulations are running at once.
    action_played : list[int]
        Used only during simulations.
        Part of the Agent's status. Records what action was last played by the agent.
        A list of n actions played based on how many simulations are running at once.
    '''
    def __init__(self,
                 environment: Environment,
                 threshold: float | None = 3e-6,
                 actions: dict[str, np.ndarray] | np.ndarray | None = None,
                 name: str | None=None,
                 seed: int = 12131415,
                 model: Model | None = None,
                 environment_converter: Callable | None = None,
                 **converter_parameters
                 ) -> None:
        super().__init__(
            environment = environment,
            threshold = threshold,
            actions = actions,
            name = name,
            seed = seed
        )

        # Converting the olfactory environment to a POMDP Model
        if model is not None:
            loaded_model = model
        elif callable(environment_converter):
            loaded_model = environment_converter(agent=self, **converter_parameters)
        else:
            # Using the exact converter
            loaded_model = exact_converter(agent=self)
        self.model:Model = loaded_model

        # Status variables
        self.belief = None
        self.action_played = None


    def to_gpu(self) -> Agent:
        '''
        Function to send the numpy arrays of the agent to the gpu.
        It returns a new instance of the Agent class with the arrays on the gpu

        Returns
        -------
        gpu_agent
        '''
        # Generating a new instance
        cls = self.__class__
        gpu_agent = cls.__new__(cls)

        # Copying arguments to gpu
        for arg, val in self.__dict__.items():
            if isinstance(val, np.ndarray):
                setattr(gpu_agent, arg, cp.array(val))
            elif arg == 'rnd_state':
                setattr(gpu_agent, arg, cp.random.RandomState(self.seed))
            elif isinstance(val, Model):
                setattr(gpu_agent, arg, val.gpu_model)
            elif isinstance(val, BeliefSet) or isinstance(val, Belief):
                setattr(gpu_agent, arg, val.to_gpu())
            else:
                setattr(gpu_agent, arg, val)

        # Self reference instances
        self._alternate_version = gpu_agent
        gpu_agent._alternate_version = self

        gpu_agent.on_gpu = True
        return gpu_agent


    def initialize_state(self,
                         n: int = 1
                         ) -> None:
        '''
        To use an agent within a simulation, the agent's state needs to be initialized.
        The initialization consists of setting the agent's initial belief.
        Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

        Parameters
        ----------
        n : int, default=1
            How many agents are to be used during the simulation.
        '''
        self.belief = BeliefSet(self.model, [Belief(self.model) for _ in range(n)])


    def choose_action(self) -> np.ndarray:
        '''
        Function to let the agent or set of agents choose an action based on their current belief.
        Following the Infotaxis principle, it will choose an action that will minimize the sum of next entropies.

        Returns
        -------
        movement_vector : np.ndarray
            A single or a list of actions chosen by the agent(s) based on their belief.
        '''
        xp = np if not self.on_gpu else cp

        n = len(self.belief)

        best_entropy = xp.ones(n) * -1
        best_action = xp.ones(n, dtype=int) * -1

        current_entropy = self.belief.entropies

        for a in self.model.actions:
            total_entropy = xp.zeros(n)

            for o in self.model.observations:
                b_ao = self.belief.update(actions=xp.ones(n, dtype=int)*a,
                                           observations=xp.ones(n, dtype=int)*o,
                                           throw_error=False)

                # Computing entropy
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    b_ao_entropy = b_ao.entropies

                b_prob = xp.dot(self.belief.belief_array, xp.sum(self.model.reachable_transitional_observation_table[:,a,o,:], axis=1))

                total_entropy += (b_prob * (current_entropy - b_ao_entropy))

            # Checking if action is superior to previous best
            superiority_mask = best_entropy < total_entropy
            best_action[superiority_mask] = a
            best_entropy[superiority_mask] = total_entropy[superiority_mask]

        # Recording the action played
        self.action_played = best_action

        # Converting action indexes to movement vectors
        movemement_vector = self.action_set[best_action,:]

        return movemement_vector


    def update_state(self,
                     observation: np.ndarray,
                     source_reached: np.ndarray
                     ) -> None | np.ndarray:
        '''
        Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

        Parameters
        ----------
        observation : np.ndarray
            The observation(s) the agent(s) made.
        source_reached : np.ndarray
            A boolean array of whether the agent(s) have reached the source or not.

        Returns
        -------
        update_successfull : np.ndarray, optional
            If nothing is returned, it means all the agent's state updates have been successfull.
            Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.
        '''
        assert self.belief is not None, "Agent was not initialized yet, run the initialize_state function first"

        # GPU support
        xp = np if not self.on_gpu else cp

        # TODO: Make dedicated observation discretization function
        # Set the thresholds as a vector
        threshold = self.threshold
        if not isinstance(threshold, list):
            threshold = [threshold]

        # Ensure 0.0 and 1.0 begin and end the threshold list
        if threshold[0] != -xp.inf:
            threshold = [-xp.inf] + threshold

        if threshold[-1] != xp.inf:
            threshold = threshold + [xp.inf]
        threshold = xp.array(threshold)

        # Setting observation ids
        observation_ids = xp.argwhere((observation[:,None] >= threshold[:-1][None,:]) & (observation[:,None] < threshold[1:][None,:]))[:,1]
        observation_ids[source_reached] = len(threshold) # Observe source, goal is always last observation with len(threshold)-1 being the amount of observation buckets.

        # Update the set of belief
        self.belief = self.belief.update(actions=self.action_played, observations=observation_ids)

        # Check for failed updates
        update_successful = (self.belief.belief_array.sum(axis=1) != 0.0)

        return update_successful


    def kill(self,
             simulations_to_kill: np.ndarray
             ) -> None:
        '''
        Function to kill any simulations that have not reached the source but can't continue further

        Parameters
        ----------
        simulations_to_kill : np.ndarray
            A boolean array of the simulations to kill.
        '''
        if all(simulations_to_kill):
            self.belief = None
        else:
            self.belief = BeliefSet(self.belief.model, self.belief.belief_array[~simulations_to_kill])

choose_action()

Function to let the agent or set of agents choose an action based on their current belief. Following the Infotaxis principle, it will choose an action that will minimize the sum of next entropies.

Returns:

Name Type Description
movement_vector ndarray

A single or a list of actions chosen by the agent(s) based on their belief.

Source code in olfactory_navigation/agents/infotaxis_agent.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def choose_action(self) -> np.ndarray:
    '''
    Function to let the agent or set of agents choose an action based on their current belief.
    Following the Infotaxis principle, it will choose an action that will minimize the sum of next entropies.

    Returns
    -------
    movement_vector : np.ndarray
        A single or a list of actions chosen by the agent(s) based on their belief.
    '''
    xp = np if not self.on_gpu else cp

    n = len(self.belief)

    best_entropy = xp.ones(n) * -1
    best_action = xp.ones(n, dtype=int) * -1

    current_entropy = self.belief.entropies

    for a in self.model.actions:
        total_entropy = xp.zeros(n)

        for o in self.model.observations:
            b_ao = self.belief.update(actions=xp.ones(n, dtype=int)*a,
                                       observations=xp.ones(n, dtype=int)*o,
                                       throw_error=False)

            # Computing entropy
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                b_ao_entropy = b_ao.entropies

            b_prob = xp.dot(self.belief.belief_array, xp.sum(self.model.reachable_transitional_observation_table[:,a,o,:], axis=1))

            total_entropy += (b_prob * (current_entropy - b_ao_entropy))

        # Checking if action is superior to previous best
        superiority_mask = best_entropy < total_entropy
        best_action[superiority_mask] = a
        best_entropy[superiority_mask] = total_entropy[superiority_mask]

    # Recording the action played
    self.action_played = best_action

    # Converting action indexes to movement vectors
    movemement_vector = self.action_set[best_action,:]

    return movemement_vector

initialize_state(n=1)

To use an agent within a simulation, the agent's state needs to be initialized. The initialization consists of setting the agent's initial belief. Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

Parameters:

Name Type Description Default
n int

How many agents are to be used during the simulation.

1
Source code in olfactory_navigation/agents/infotaxis_agent.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def initialize_state(self,
                     n: int = 1
                     ) -> None:
    '''
    To use an agent within a simulation, the agent's state needs to be initialized.
    The initialization consists of setting the agent's initial belief.
    Multiple agents can be used at once for simulations, for this reason, the belief parameter is a BeliefSet by default.

    Parameters
    ----------
    n : int, default=1
        How many agents are to be used during the simulation.
    '''
    self.belief = BeliefSet(self.model, [Belief(self.model) for _ in range(n)])

kill(simulations_to_kill)

Function to kill any simulations that have not reached the source but can't continue further

Parameters:

Name Type Description Default
simulations_to_kill ndarray

A boolean array of the simulations to kill.

required
Source code in olfactory_navigation/agents/infotaxis_agent.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def kill(self,
         simulations_to_kill: np.ndarray
         ) -> None:
    '''
    Function to kill any simulations that have not reached the source but can't continue further

    Parameters
    ----------
    simulations_to_kill : np.ndarray
        A boolean array of the simulations to kill.
    '''
    if all(simulations_to_kill):
        self.belief = None
    else:
        self.belief = BeliefSet(self.belief.model, self.belief.belief_array[~simulations_to_kill])

to_gpu()

Function to send the numpy arrays of the agent to the gpu. It returns a new instance of the Agent class with the arrays on the gpu

Returns:

Type Description
gpu_agent
Source code in olfactory_navigation/agents/infotaxis_agent.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def to_gpu(self) -> Agent:
    '''
    Function to send the numpy arrays of the agent to the gpu.
    It returns a new instance of the Agent class with the arrays on the gpu

    Returns
    -------
    gpu_agent
    '''
    # Generating a new instance
    cls = self.__class__
    gpu_agent = cls.__new__(cls)

    # Copying arguments to gpu
    for arg, val in self.__dict__.items():
        if isinstance(val, np.ndarray):
            setattr(gpu_agent, arg, cp.array(val))
        elif arg == 'rnd_state':
            setattr(gpu_agent, arg, cp.random.RandomState(self.seed))
        elif isinstance(val, Model):
            setattr(gpu_agent, arg, val.gpu_model)
        elif isinstance(val, BeliefSet) or isinstance(val, Belief):
            setattr(gpu_agent, arg, val.to_gpu())
        else:
            setattr(gpu_agent, arg, val)

    # Self reference instances
    self._alternate_version = gpu_agent
    gpu_agent._alternate_version = self

    gpu_agent.on_gpu = True
    return gpu_agent

update_state(observation, source_reached)

Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

Parameters:

Name Type Description Default
observation ndarray

The observation(s) the agent(s) made.

required
source_reached ndarray

A boolean array of whether the agent(s) have reached the source or not.

required

Returns:

Name Type Description
update_successfull (ndarray, optional)

If nothing is returned, it means all the agent's state updates have been successfull. Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.

Source code in olfactory_navigation/agents/infotaxis_agent.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def update_state(self,
                 observation: np.ndarray,
                 source_reached: np.ndarray
                 ) -> None | np.ndarray:
    '''
    Function to update the internal state(s) of the agent(s) based on the previous action(s) taken and the observation(s) received.

    Parameters
    ----------
    observation : np.ndarray
        The observation(s) the agent(s) made.
    source_reached : np.ndarray
        A boolean array of whether the agent(s) have reached the source or not.

    Returns
    -------
    update_successfull : np.ndarray, optional
        If nothing is returned, it means all the agent's state updates have been successfull.
        Else, a boolean np.ndarray of size n can be returned confirming for each agent whether the update has been successful or not.
    '''
    assert self.belief is not None, "Agent was not initialized yet, run the initialize_state function first"

    # GPU support
    xp = np if not self.on_gpu else cp

    # TODO: Make dedicated observation discretization function
    # Set the thresholds as a vector
    threshold = self.threshold
    if not isinstance(threshold, list):
        threshold = [threshold]

    # Ensure 0.0 and 1.0 begin and end the threshold list
    if threshold[0] != -xp.inf:
        threshold = [-xp.inf] + threshold

    if threshold[-1] != xp.inf:
        threshold = threshold + [xp.inf]
    threshold = xp.array(threshold)

    # Setting observation ids
    observation_ids = xp.argwhere((observation[:,None] >= threshold[:-1][None,:]) & (observation[:,None] < threshold[1:][None,:]))[:,1]
    observation_ids[source_reached] = len(threshold) # Observe source, goal is always last observation with len(threshold)-1 being the amount of observation buckets.

    # Update the set of belief
    self.belief = self.belief.update(actions=self.action_played, observations=observation_ids)

    # Check for failed updates
    update_successful = (self.belief.belief_array.sum(axis=1) != 0.0)

    return update_successful