Skip to content

environment_converter

exact_converter(agent)

Method to create a POMDP model based on an olfactory environment object.

This version of the converter converts the environment in an exact manner. This mean the amount of states is equal to the grid points in the olfactory environment object.

It supports an environment in 2D, with or without layers. It supports a variety of different action sets from the agent.

It also defines at least different observations: Nothing, Something or Goal. However, if multiple thresholds are provided, the more observations will be available: |threshold| + 1 (Nothing) + 1 (Goal)

Note: The environment and the threshold parameters are gathered from the agent instance provided.

Parameters:

Name Type Description Default
agent Agent

The agent to use to get the environment and threshold parameters from.

required

Returns:

Name Type Description
model Model

A generate POMDP model from the environment.

Source code in olfactory_navigation/agents/model_based_util/environment_converter.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def exact_converter(agent : Agent) -> Model:
    '''
    Method to create a POMDP model based on an olfactory environment object.

    This version of the converter converts the environment in an exact manner.
    This mean the amount of states is equal to the grid points in the olfactory environment object.

    It supports an environment in 2D, with or without layers. It supports a variety of different action sets from the agent.

    It also defines at least different observations: Nothing, Something or Goal.
    However, if multiple thresholds are provided, the more observations will be available: |threshold| + 1 (Nothing) + 1 (Goal)

    Note: The environment and the threshold parameters are gathered from the agent instance provided.

    Parameters
    ----------
    agent : Agent
        The agent to use to get the environment and threshold parameters from.

    Returns
    -------
    model : Model
        A generate POMDP model from the environment.
    '''
    # Agent's parameters
    environment = agent.environment
    threshold = agent.threshold
    action_set = agent.action_set

    # Assertion
    assert environment.dimensions == 2, "This converter only works for 2D environments..." # TODO: implement for ND

    # Base Model parameters
    state_count = np.prod(environment.shape)

    state_grid = [[f's_{x}_{y}' for x in range(environment.shape[1])] for y in range(environment.shape[0])]
    end_states = np.argwhere(np.fromfunction(lambda x,y: ((x-environment.source_position[0])**2 + (y-environment.source_position[1])**2) <= environment.source_radius**2,
                                                shape=environment.shape).ravel())[:,0].tolist()

    # Compute observation matrix
    if not isinstance(threshold, list):
        threshold = [threshold]

    # Ensure 0.0 and 1.0 begin and end the threshold list
    if threshold[0] != -np.inf:
        threshold = [-np.inf] + threshold

    if threshold[-1] != np.inf:
        threshold = threshold + [np.inf]

    # Counts
    action_count = len(agent.action_set)
    observation_count = len(threshold) # Thresholds minus 1; plus 1 for the goal.

    # Computing odor probabilities
    odor_fields = None
    data_bounds_slices = tuple(slice(low, high) for low, high in environment.data_bounds)
    if environment.has_layers:
        odor_fields = []
        for layer in environment.layers:
            data_grid = environment.data[layer,:,:,:,None]
            threshs = np.array(threshold)
            data_odor_fields = np.average(((data_grid >= threshs[:-1][None,None,None,:]) & (data_grid < threshs[1:][None,None,None,:])), axis=0)

            # Increasing it to the full environment
            field = np.zeros(environment.shape + (observation_count-1,))
            field[*data_bounds_slices, :] = data_odor_fields

            odor_fields.append(field)

    else:
        data_grid = environment.data[:,:,:,None]
        threshs = np.array(threshold)
        data_odor_fields = np.average(((data_grid >= threshs[:-1][None,None,None,:]) & (data_grid < threshs[1:][None,None,None,:])), axis=0)

        # Increasing it to the full environment
        odor_fields = np.zeros(environment.shape + (observation_count-1,))
        odor_fields[*data_bounds_slices, :] = data_odor_fields

    # Building observation matrix
    observations = np.empty((state_count, action_count, observation_count), dtype=float)
    for o in range(observation_count-1): # Skipping the goal observation
        for a, action_vector in enumerate(action_set):
            if environment.has_layers:
                action_layer = action_vector[0]
                observations[:,a,o] = odor_fields[action_layer][:,:,o].ravel()
            else:
                observations[:,a,o] = odor_fields[:,:,o].ravel()

    # Setting 'Nothing' observation in the margins to 1
    data_margins_mask = np.ones(environment.shape, dtype=bool)
    data_margins_mask[data_bounds_slices] = False
    observations[data_margins_mask.ravel(),:,0] = 1.0

    # Goal observation
    observations[:,:,-1] = 0.0
    observations[end_states,:,:] = 0.0
    observations[end_states,:,-1] = 1.0

    # Assert observations sum to 1
    assert np.all(np.sum(observations, axis=2) == 1.0), "Observation table malformed, something is wrong..."

    # Observation labels
    observation_labels = ['nothing']
    if len(threshold) > 3:
        for i,_ in enumerate(threshold[1:-1]):
            observation_labels.append(f'something_l{i}')
    else:
        observation_labels.append('something')
    observation_labels.append('goal')

    # Compute reachable states
    shape = environment.shape

    points = np.array(np.unravel_index(np.arange(np.prod(shape)), shape)).T

    # For each actions compute all new grid points (using the environment.move method)
    action_new_states = []
    movements = action_set if not environment.has_layers else action_set[:,1:]
    for move_vector in movements:
        new_points = environment.move(points, movement=move_vector[None,:])
        new_states = np.ravel_multi_index((new_points[:,0], new_points[:,1]), dims=shape)
        action_new_states.append(new_states)

    # Forming it the reachable states array from the new states for each action
    reachable_states = np.array(action_new_states).T[:,:,None]

    # Instantiate the model object
    model = Model(
        states=state_grid,
        actions=agent.action_labels,
        observations=observation_labels,
        reachable_states=reachable_states,
        observation_table=observations,
        end_states=end_states,
        start_probabilities=environment.start_probabilities.ravel(),
        seed=agent.seed
    )
    return model

minimal_converter(agent, partitions=[3, 6])

Method to create a POMDP Model based on an olfactory environment object.

This version of the converted, attempts to build a minimal version of the environment with just a few partitions in the x and y direction. This means the model will the a total of n states with n = ((|x-partitions| + 2) * (|y-partitions| + 2)). The +2 corresponds to two margin cells in the x and y axes.

It supports an environment in 2D and therefore defines 4 available actions for the agent. (north, east, south, west) But, since the model contains so few spaces, the transitions between states are not deterministic: This means, if an agent takes a step in a direction, there is a chance the agent stays in the same state along with a lower chance the agent moves to a state in the actual direction it was meaning to go.

It also defines at least different observations: Nothing, Something or Goal. However, if multiple thresholds are provided, the more observations will be available: |threshold| + 1 (Nothing) + 1 (Goal)

Note: The environment and the threshold parameters are gathered from the agent instance provided.

Parameters:

Name Type Description Default
agent Agent

The agent to use to get the environment and threshold parameters from.

required
partitions list or ndarray

How many partitions to use in respectively the y and x directions.

[3,6]

Returns:

Name Type Description
model Model

A generated POMDP model from the environment.

Source code in olfactory_navigation/agents/model_based_util/environment_converter.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def minimal_converter(agent : Agent,
                      partitions: list | np.ndarray = [3,6],
                      ) -> Model:
    '''
    Method to create a POMDP Model based on an olfactory environment  object.

    This version of the converted, attempts to build a minimal version of the environment with just a few partitions in the x and y direction.
    This means the model will the a total of n states with n = ((|x-partitions| + 2) * (|y-partitions| + 2)).
    The +2 corresponds to two margin cells in the x and y axes.

    It supports an environment in 2D and therefore defines 4 available actions for the agent. (north, east, south, west)
    But, since the model contains so few spaces, the transitions between states are not deterministic:
    This means, if an agent takes a step in a direction, there is a chance the agent stays in the same state along with a lower chance the agent moves to a state in the actual direction it was meaning to go.

    It also defines at least different observations: Nothing, Something or Goal.
    However, if multiple thresholds are provided, the more observations will be available: |threshold| + 1 (Nothing) + 1 (Goal)

    Note: The environment and the threshold parameters are gathered from the agent instance provided.

    Parameters
    ----------
    agent : Agent
        The agent to use to get the environment and threshold parameters from.
    partitions : list or np.ndarray, default=[3,6]
        How many partitions to use in respectively the y and x directions.

    Returns
    -------
    model : Model
        A generated POMDP model from the environment.
    '''
    # Agent's parameters
    environment = agent.environment
    threshold = agent.threshold
    action_set = agent.action_set

    shape = environment.shape

    # Getting probabilities of odor in the requested partitions and mapping grid to cells
    partitions = np.array(partitions)

    cell_shape = (environment.data_shape / partitions).astype(int)

    # Building cell bounds
    grid_cells = np.ones(shape) * -1
    cell_bounds = [np.array([0, *((np.arange(ax_part+1) * cell_shape[ax_i]) + environment.margins[ax_i, 0]), shape[ax_i]]) for ax_i, ax_part in enumerate(partitions)]

    lower_bounds = np.array([ax_arr.ravel() for ax_arr in np.meshgrid(*[bounds_arr[:-1] for bounds_arr in cell_bounds], indexing='xy')]).T
    upper_bounds = np.array([ax_arr.ravel() for ax_arr in np.meshgrid(*[bounds_arr[1 :] for bounds_arr in cell_bounds], indexing='xy')]).T

    for i, (lower_b, upper_b) in enumerate(zip(lower_bounds, upper_bounds)):
        slices = [slice(ax_lower, ax_upper) for ax_lower, ax_upper in zip(lower_b, upper_b)]

        # Grid to cell mapping
        grid_cells[*slices] = i

    # Building transition probabilities
    cell_counts = int(np.prod(partitions+2))
    points = np.array(np.unravel_index(np.arange(np.prod(shape)), shape)).T
    transition_probabilities = np.full((cell_counts+1, len(action_set), cell_counts+1), -1, dtype=float)

    movements = (action_set if not environment.has_layers else action_set[:,1:])
    for a, move_vector in enumerate(movements):
        new_points = environment.move(points, movement=move_vector[None,:])
        for state_cell in np.arange(cell_counts):
            points_in_cell = (grid_cells[*points.T] == state_cell)[:,None]
            count_in_cell = np.sum(points_in_cell)

            next_cells = np.arange(cell_counts)
            points_in_next_cell = (grid_cells[*new_points.T,None] == next_cells[None,:])

            at_source = environment.source_reached(new_points)[:,None]

            transition_probabilities[state_cell, a, next_cells] = np.sum(((points_in_cell & (~at_source)) & points_in_next_cell), axis=0) / count_in_cell
            transition_probabilities[state_cell, a, -1] = np.sum(points_in_cell & at_source) / count_in_cell

    transition_probabilities[-1,:,:] = 0.0
    transition_probabilities[-1,:,-1] = 1.0

    # Compute observation matrix
    if not isinstance(threshold, list):
        threshold = [threshold]

    # Ensure 0.0 and 1.0 begin and end the threshold list
    if threshold[0] != -np.inf:
        threshold = [-np.inf] + threshold

    if threshold[-1] != np.inf:
        threshold = threshold + [np.inf]

    #  Observation labels
    observation_labels = ['nothing']
    if len(threshold) > 3:
        for i,_ in enumerate(threshold[1:-1]):
            observation_labels.append(f'something_l{i}')
    else:
        observation_labels.append('something')
    observation_labels.append('goal')

    # Observation probabilities
    observations = np.zeros((cell_counts+1, len(action_set), len(observation_labels)))

    # Recomputing bounds for data zone only
    data_cell_bounds = [np.array([*(np.arange(ax_part+1) * cell_shape[ax_i])]) for ax_i, ax_part in enumerate(partitions)]

    data_lower_bounds = np.array([ax_arr.ravel() for ax_arr in np.meshgrid(*[bounds_arr[:-1] for bounds_arr in data_cell_bounds], indexing='xy')]).T
    data_upper_bounds = np.array([ax_arr.ravel() for ax_arr in np.meshgrid(*[bounds_arr[1 :] for bounds_arr in data_cell_bounds], indexing='xy')]).T

    cell_observations = []
    for lower_b, upper_b in zip(data_lower_bounds, data_upper_bounds):
        slices = [slice(ax_lower, ax_upper) for ax_lower, ax_upper in zip(lower_b, upper_b)]

        observations_levels = []
        for min_thresh, max_thresh in zip(threshold[:-1], threshold[1:]):
            if environment.has_layers:
                odor_within_thresh = (environment.data[:,:,*slices] > min_thresh) & (environment.data[:,:,*slices] < max_thresh)
                observations_levels.append(np.average(odor_within_thresh, axis=tuple([a+1 for a in range(environment.dimensions + 1)])))
            else:
                odor_within_thresh = (environment.data[:,*slices] > min_thresh) & (environment.data[:,*slices] < max_thresh)
                observations_levels.append(np.average(odor_within_thresh))

        cell_observations.append(observations_levels)

    # Placing observation probabilities in observation matrix
    data_cell_ids = np.arange(cell_counts).reshape(partitions+2)[1:-1,1:-1].ravel()
    observations[:-1,:,0] = 1.0 # Nothing at 1 everywhere
    if environment.has_layers:
        action_layers = action_set[:,0]
        actions = np.arange(len(action_layers))
        for i, cell_id in enumerate(data_cell_ids):
            for o in range(len(observation_labels) - 1):
                observations[cell_id,actions,o] = cell_observations[i][o][action_layers]
    else:
        for i, cell_id in enumerate(data_cell_ids):
            observations[cell_id,:,:-1] = cell_observations[i]

    observations[-1,:,-1] = 1.0 # Goal

    # Start probabilities # TODO Match data zone
    start_probabilities = np.ones(cell_counts+1, dtype=float)
    # start_probabilities = (odor_probabilities > 0).astype(float).flatten()
    start_probabilities /= np.sum(start_probabilities)

    # Creation of the Model
    model = Model(
        states = [f'cell_{cell}' for cell in range(cell_counts)] + ['goal'],
        actions = agent.action_labels,
        observations = observation_labels,
        transitions = transition_probabilities,
        observation_table = observations,
        end_states = [cell_counts], # The very last state is the goal state
        start_probabilities = start_probabilities,
        seed=agent.seed
    )

    return model