Bases: TaskAcceptabilitySensor
Guidance sensor for the resource management task.
This sensor tracks a number of sub-tasks:
- "system_monitoring.tank-a"
- "system_monitoring.tank-b"
The acceptability of these sub-tasks can be checked by calling the method: ResourceManagementTaskAcceptabilitySensor.is_tank_acceptable
.
Otherwise follow the TaskAcceptabilitySensor
API.
Source code in matbii\guidance\sensor_resource_management.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 | class ResourceManagementTaskAcceptabilitySensor(TaskAcceptabilitySensor):
"""Guidance sensor for the resource management task.
This sensor tracks a number of sub-tasks:
- "system_monitoring.tank-a"
- "system_monitoring.tank-b"
The acceptability of these sub-tasks can be checked by calling the method: `ResourceManagementTaskAcceptabilitySensor.is_tank_acceptable`.
Otherwise follow the `TaskAcceptabilitySensor` API.
"""
def __init__(self, *args: list[Any], **kwargs: dict[str, Any]):
"""Constructor.
Args:
args (list[Any], optional): additional optional arguments.
kwargs (dict[Any], optional): additional optional keyword arguments.
"""
super().__init__(TASK_ID_RESOURCE_MANAGEMENT, *args, **kwargs)
self._is_subtask_acceptable_map = {
f"{self.task_name}.tank-a": partial(self.is_tank_acceptable, "a"),
f"{self.task_name}.tank-b": partial(self.is_tank_acceptable, "b"),
}
def is_acceptable(self, task: str = None, **kwargs: dict[str, Any]) -> bool: # noqa
if task is None or task == self.task_name:
return all([x() for x in self._is_subtask_acceptable_map.values()])
else:
is_acceptable = self._is_subtask_acceptable_map.get(task, None)
if is_acceptable is None:
raise KeyError(
f"Invalid subtask: {task}, doesn't exist for task {self.task_name}"
)
else:
return is_acceptable()
def is_tank_acceptable(self, _id: str) -> bool:
"""Whether the given tank is in an acceptable state.
Acceptable: the fuel level lies in the required range.
Unacceptable: otherwise.
Args:
_id (int): the id of the tank ("a" or "b")
Returns:
bool: True if the tank is in an acceptable state, False otherwise.
"""
tank = self.beliefs[tank_id(_id)]
tank_level = self.beliefs[tank_level_id(_id)]
fuel_capacity = tank["data-capacity"]
fuel_level = tank["data-level"]
acceptable_level = tank_level["data-level"] * fuel_capacity
acceptable_range2 = (tank_level["data-range"] * fuel_capacity) / 2
return (
fuel_level >= acceptable_level - acceptable_range2
and fuel_level <= acceptable_level + acceptable_range2
)
def sense(self, tank_ids: tuple[str, ...] = ("a", "b")) -> list[Select]:
"""Generates the sense actions that are required for checking whether the resource management task is in an acceptable state.
The actions will request the following data:
- the current fuel level and capacity of each main tank ("a" and "b")
- the acceptable range of fuel for each main tank ("a" and "b")
Args:
tank_ids (tuple[str, str], optional): the ids of the tanks to check, defaults to ("a", "b").
Returns:
list[Select]: list of sense actions to take.
"""
# interested in the fuel levels of the main tanks in the Resource Management Task
tanks = [tank_id(i) for i in tank_ids]
tank_levels = [tank_level_id(i) for i in tank_ids]
tank_selects = [
select(
xpath=f"//*[@id='{id}']", attrs=["id", "data-capacity", "data-level"]
)
for id in tanks
]
tank_level_selects = [
select(xpath=f"//*[@id='{id}']", attrs=["id", "data-level", "data-range"])
for id in tank_levels
]
return [*tank_selects, *tank_level_selects]
|
__init__(*args, **kwargs)
Constructor.
Parameters:
Name |
Type |
Description |
Default |
args |
list[Any]
|
additional optional arguments.
|
()
|
kwargs |
dict[Any]
|
additional optional keyword arguments.
|
{}
|
Source code in matbii\guidance\sensor_resource_management.py
23
24
25
26
27
28
29
30
31
32
33
34 | def __init__(self, *args: list[Any], **kwargs: dict[str, Any]):
"""Constructor.
Args:
args (list[Any], optional): additional optional arguments.
kwargs (dict[Any], optional): additional optional keyword arguments.
"""
super().__init__(TASK_ID_RESOURCE_MANAGEMENT, *args, **kwargs)
self._is_subtask_acceptable_map = {
f"{self.task_name}.tank-a": partial(self.is_tank_acceptable, "a"),
f"{self.task_name}.tank-b": partial(self.is_tank_acceptable, "b"),
}
|
is_tank_acceptable(_id)
Whether the given tank is in an acceptable state.
Acceptable: the fuel level lies in the required range.
Unacceptable: otherwise.
Parameters:
Name |
Type |
Description |
Default |
_id |
int
|
the id of the tank ("a" or "b")
|
required
|
Returns:
Name | Type |
Description |
bool |
bool
|
True if the tank is in an acceptable state, False otherwise.
|
Source code in matbii\guidance\sensor_resource_management.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 | def is_tank_acceptable(self, _id: str) -> bool:
"""Whether the given tank is in an acceptable state.
Acceptable: the fuel level lies in the required range.
Unacceptable: otherwise.
Args:
_id (int): the id of the tank ("a" or "b")
Returns:
bool: True if the tank is in an acceptable state, False otherwise.
"""
tank = self.beliefs[tank_id(_id)]
tank_level = self.beliefs[tank_level_id(_id)]
fuel_capacity = tank["data-capacity"]
fuel_level = tank["data-level"]
acceptable_level = tank_level["data-level"] * fuel_capacity
acceptable_range2 = (tank_level["data-range"] * fuel_capacity) / 2
return (
fuel_level >= acceptable_level - acceptable_range2
and fuel_level <= acceptable_level + acceptable_range2
)
|
sense(tank_ids=('a', 'b'))
Generates the sense actions that are required for checking whether the resource management task is in an acceptable state.
The actions will request the following data:
- the current fuel level and capacity of each main tank ("a" and "b")
- the acceptable range of fuel for each main tank ("a" and "b")
Parameters:
Name |
Type |
Description |
Default |
tank_ids |
tuple[str, str]
|
the ids of the tanks to check, defaults to ("a", "b").
|
('a', 'b')
|
Returns:
Type |
Description |
list[Select]
|
list[Select]: list of sense actions to take.
|
Source code in matbii\guidance\sensor_resource_management.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 | def sense(self, tank_ids: tuple[str, ...] = ("a", "b")) -> list[Select]:
"""Generates the sense actions that are required for checking whether the resource management task is in an acceptable state.
The actions will request the following data:
- the current fuel level and capacity of each main tank ("a" and "b")
- the acceptable range of fuel for each main tank ("a" and "b")
Args:
tank_ids (tuple[str, str], optional): the ids of the tanks to check, defaults to ("a", "b").
Returns:
list[Select]: list of sense actions to take.
"""
# interested in the fuel levels of the main tanks in the Resource Management Task
tanks = [tank_id(i) for i in tank_ids]
tank_levels = [tank_level_id(i) for i in tank_ids]
tank_selects = [
select(
xpath=f"//*[@id='{id}']", attrs=["id", "data-capacity", "data-level"]
)
for id in tanks
]
tank_level_selects = [
select(xpath=f"//*[@id='{id}']", attrs=["id", "data-level", "data-range"])
for id in tank_levels
]
return [*tank_selects, *tank_level_selects]
|