openlch.hal
1import grpc 2from typing import List, Tuple, Dict, Union, Iterator 3from . import hal_pb_pb2 4from . import hal_pb_pb2_grpc 5import time 6 7__all__ = ['HAL'] 8 9__pdoc__ = {} 10__pdoc__["hal_pb_pb2"] = None 11__pdoc__["hal_pb_pb2_grpc"] = None 12 13class HAL: 14 """ 15 Hardware Abstraction Layer for interacting with the MilkV board. 16 17 Args: 18 host (str): The IP address of the MilkV board. Defaults to '192.168.42.1'. 19 port (int): The port number for gRPC communication. Defaults to 50051. 20 """ 21 22 def __init__(self, host: str = '192.168.42.1', port: int = 50051) -> None: 23 self.__channel = grpc.insecure_channel(f'{host}:{port}') 24 self.__stub = hal_pb_pb2_grpc.ServoControlStub(self.__channel) 25 self.servo = self.Servo(self.__stub) 26 self.system = self.System(self.__stub) 27 self.imu = self.IMU(self.__stub) 28 self.audio = self.Audio(self.__stub) 29 30 def close(self) -> None: 31 """Close the gRPC channel.""" 32 self.__channel.close() 33 34 class Servo: 35 """Class for servo-related operations.""" 36 37 def __init__(self, stub): 38 self.__stub = stub 39 40 def get_positions(self) -> List[Tuple[int, float, float]]: 41 """ 42 Get current positions and speeds of all servos. 43 44 Returns: 45 List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds. 46 """ 47 response = self.__stub.GetPositions(hal_pb_pb2.Empty()) 48 return [(pos.id, pos.position, pos.speed) for pos in response.positions] 49 50 def set_positions(self, positions: List[Tuple[int, float]]) -> None: 51 """ 52 Set positions for multiple servos. 53 54 Args: 55 positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position. 56 """ 57 joint_positions = [ 58 hal_pb_pb2.JointPosition(id=id, position=position, speed=0) 59 for id, position in positions 60 ] 61 request = hal_pb_pb2.JointPositions(positions=joint_positions) 62 self.__stub.SetPositions(request) 63 64 def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]: 65 """ 66 Get detailed information about a specific servo. 67 68 Args: 69 servo_id (int): The ID of the servo to query. 70 71 Returns: 72 Dict[str, Union[int, float]]: A dictionary containing servo information: 73 74 id: The ID of the servo 75 76 temperature: Current temperature of the servo (in degrees Celsius) 77 78 current: Current draw of the servo (in mAmps) 79 80 voltage: Voltage supplied to the servo (in volts) 81 82 speed: Current speed of the servo (in degrees per second) 83 84 current_position: Current position of the servo (in degrees) 85 86 min_position: Minimum allowed position of the servo (in degrees) 87 88 max_position: Maximum allowed position of the servo (in degrees) 89 90 Raises: 91 Exception: If there's an error retrieving the servo information. 92 """ 93 request = hal_pb_pb2.ServoId(id=servo_id) 94 response = self.__stub.GetServoInfo(request) 95 if response.HasField('info'): 96 info = response.info 97 return { 98 'id': info.id, 99 'temperature': info.temperature, 100 'current': info.current, 101 'voltage': round(info.voltage, 2), 102 'speed': info.speed, 103 'current_position': info.current_position, 104 'min_position': info.min_position, 105 'max_position': info.max_position 106 } 107 else: 108 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 109 110 def scan(self) -> List[int]: 111 """ 112 Scan for connected servos. 113 114 Returns: 115 List[int]: A list of IDs of the connected servos. 116 """ 117 response = self.__stub.Scan(hal_pb_pb2.Empty()) 118 return list(response.ids) 119 120 def change_id(self, old_id: int, new_id: int) -> bool: 121 """ 122 Change the ID of a servo. 123 124 Args: 125 old_id (int): The current ID of the servo. 126 new_id (int): The new ID to assign to the servo. 127 128 Returns: 129 bool: True if the ID change was successful, False otherwise. 130 131 Raises: 132 Exception: If there's an error changing the servo ID. 133 """ 134 request = hal_pb_pb2.IdChange(old_id=old_id, new_id=new_id) 135 response = self.__stub.ChangeId(request) 136 if response.HasField('success'): 137 return response.success 138 else: 139 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 140 141 def start_calibration(self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool: 142 """ 143 Start calibration for a specific servo. 144 145 Args: 146 servo_id (int): The ID of the servo to calibrate. 147 calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300. 148 current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0. 149 150 Returns: 151 bool: True if calibration started successfully, False otherwise. 152 153 Raises: 154 Exception: If there's an error starting the calibration. 155 """ 156 request = hal_pb_pb2.CalibrationRequest( 157 servo_id=servo_id, 158 calibration_speed=calibration_speed, 159 current_threshold=current_threshold 160 ) 161 response = self.__stub.StartCalibration(request) 162 if response.HasField('success'): 163 return response.success 164 else: 165 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 166 167 def cancel_calibration(self, servo_id: int) -> bool: 168 """ 169 Cancel ongoing calibration for a specific servo. 170 171 Args: 172 servo_id (int): The ID of the servo to cancel calibration for. 173 174 Returns: 175 bool: True if calibration was successfully cancelled, False otherwise. 176 177 Raises: 178 Exception: If there's an error cancelling the calibration. 179 """ 180 request = hal_pb_pb2.ServoId(id=servo_id) 181 response = self.__stub.CancelCalibration(request) 182 if response.HasField('success'): 183 return response.success 184 else: 185 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 186 187 def get_calibration_status(self) -> Dict[str, Union[bool, int]]: 188 """ 189 Get the current calibration status. 190 191 Returns: 192 Dict[str, Union[bool, int]]: A dictionary containing calibration status information. 193 194 Raises: 195 Exception: If there's an error retrieving the calibration status. 196 """ 197 response = self.__stub.GetCalibrationStatus(hal_pb_pb2.Empty()) 198 return { 199 'is_calibrating': response.is_calibrating, 200 'calibrating_servo_id': response.calibrating_servo_id 201 } 202 203 def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None: 204 """ 205 Set torque for multiple servos. 206 207 Args: 208 torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque. 209 """ 210 settings = [ 211 hal_pb_pb2.TorqueSetting(id=id, torque=torque) 212 for id, torque in torque_settings 213 ] 214 request = hal_pb_pb2.TorqueSettings(settings=settings) 215 self.__stub.SetTorque(request) 216 217 def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None: 218 """ 219 Enable or disable torque for multiple servos. 220 221 Args: 222 enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque. 223 """ 224 settings = [ 225 hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable) 226 for id, enable in enable_settings 227 ] 228 request = hal_pb_pb2.TorqueEnableSettings(settings=settings) 229 self.__stub.SetTorqueEnable(request) 230 231 def enable_movement(self) -> None: 232 """Enable continuous movement writes for all servos.""" 233 self.__stub.EnableMovement(hal_pb_pb2.Empty()) 234 235 def disable_movement(self) -> None: 236 """Disable continuous movement writes for all servos.""" 237 self.__stub.DisableMovement(hal_pb_pb2.Empty()) 238 239 def set_position(self, servo_id: int, position: float, speed: float = 0) -> None: 240 """ 241 Set position for a single servo. 242 243 Args: 244 servo_id (int): The ID of the servo to control 245 position (float): Target position in degrees 246 speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed). 247 """ 248 request = hal_pb_pb2.JointPosition(id=servo_id, position=position, speed=speed) 249 self.__stub.SetPosition(request) 250 251 class System: 252 """Class for system-related operations.""" 253 254 def __init__(self, stub): 255 self.__stub = stub 256 257 def set_wifi_info(self, ssid: str, password: str) -> None: 258 """ 259 Set WiFi credentials for the MilkV board. 260 261 Args: 262 ssid (str): The SSID of the WiFi network. 263 password (str): The password for the WiFi network. 264 """ 265 request = hal_pb_pb2.WifiCredentials(ssid=ssid, password=password) 266 self.__stub.SetWifiInfo(request) 267 268 def start_video_stream(self) -> None: 269 """Start the video stream.""" 270 self.__stub.StartVideoStream(hal_pb_pb2.Empty()) 271 272 def stop_video_stream(self) -> None: 273 """Stop the video stream.""" 274 self.__stub.StopVideoStream(hal_pb_pb2.Empty()) 275 276 def get_video_stream_urls(self) -> Dict[str, List[str]]: 277 """ 278 Get the URLs for various video stream formats. 279 280 Returns: 281 Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats: 282 283 webrtc: List of WebRTC stream URLs 284 285 hls: List of HTTP Live Streaming (HLS) URLs 286 287 hls_ll: List of Low-Latency HLS URLs 288 289 mse: List of Media Source Extension (MSE) URLs 290 291 rtsp: List of Real-Time Streaming Protocol (RTSP) URLs 292 293 Each list may contain one or more URLs depending on the available streams. 294 """ 295 response = self.__stub.GetVideoStreamUrls(hal_pb_pb2.Empty()) 296 return { 297 'webrtc': list(response.webrtc), 298 'hls': list(response.hls), 299 'hls_ll': list(response.hls_ll), 300 'mse': list(response.mse), 301 'rtsp': list(response.rtsp) 302 } 303 304 class IMU: 305 """Class for IMU-related operations.""" 306 307 def __init__(self, stub): 308 self.__stub = stub 309 310 def get_data(self) -> Dict[str, Dict[str, float]]: 311 """ 312 Get current IMU sensor data including gyroscope and accelerometer readings. 313 314 Returns: 315 Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data: 316 317 { 318 'gyro': {'x': float, 'y': float, 'z': float}, # Angular velocity in degrees/second 319 'accel': {'x': float, 'y': float, 'z': float} # Linear acceleration in g 320 } 321 """ 322 response = self.__stub.GetImuData(hal_pb_pb2.Empty()) 323 return { 324 'gyro': {'x': response.gyro.x, 'y': response.gyro.y, 'z': response.gyro.z}, 325 'accel': {'x': response.accel.x, 'y': response.accel.y, 'z': response.accel.z} 326 } 327 328 class Audio: 329 """Class for audio-related operations.""" 330 331 def __init__(self, stub): 332 self.__stub = stub 333 self.CHUNK_SIZE = 32768 # 32KB chunks 334 335 def upload_file(self, audio_data: bytes, format: str = "wav") -> Dict[str, Union[str, bool]]: 336 """ 337 Upload audio data. 338 339 Args: 340 audio_data (bytes): The audio data 341 format (str): Audio format (e.g., 'wav'). Defaults to 'wav'. 342 343 Returns: 344 Dict[str, Union[str, bool]]: A dictionary containing: 345 - 'audio_id': Identifier for the uploaded audio 346 - 'success': Boolean indicating upload success 347 348 Raises: 349 Exception: If there's an error during upload 350 """ 351 def chunk_generator(): 352 timestamp = int(time.time() * 1000) # Current time in milliseconds 353 for i in range(0, len(audio_data), self.CHUNK_SIZE): 354 chunk = hal_pb_pb2.AudioChunk( 355 data=audio_data[i:i + self.CHUNK_SIZE], 356 format=format, 357 timestamp=timestamp + i # Incrementing timestamp for ordering 358 ) 359 yield chunk 360 361 response = self.__stub.UploadAudio(chunk_generator()) 362 if response.HasField('success'): 363 return { 364 'audio_id': response.audio_id, 365 'success': response.success 366 } 367 else: 368 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 369 370 def get_recording(self) -> Tuple[bytes, str, int]: 371 """ 372 Get recorded audio data as a bytes object. 373 374 Returns: 375 Tuple[bytes, str, int]: Tuple containing: 376 377 - bytes: The complete audio data 378 - str: Audio format 379 - int: Initial timestamp in milliseconds 380 """ 381 audio_data = bytearray() 382 format_type = None 383 timestamp = None 384 385 for chunk in self.__stub.GetRecordedAudio(hal_pb_pb2.Empty()): 386 audio_data.extend(chunk.data) 387 if format_type is None: 388 format_type = chunk.format 389 if timestamp is None: 390 timestamp = chunk.timestamp 391 392 return bytes(audio_data), format_type, timestamp 393 394 def play(self, audio_id: str, volume: float = 1.0) -> None: 395 """ 396 Play uploaded audio. 397 398 Args: 399 audio_id (str): ID of the audio to play 400 volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0. 401 402 Raises: 403 ValueError: If volume is not between 0.0 and 1.0 404 """ 405 if not 0.0 <= volume <= 1.0: 406 raise ValueError("Volume must be between 0.0 and 1.0") 407 408 request = hal_pb_pb2.PlayRequest(audio_id=audio_id, volume=volume) 409 self.__stub.PlayAudio(request) 410 411 def start_recording(self, sample_rate: int = 44100, format: str = "wav", channels: int = 1) -> None: 412 """ 413 Start audio recording. 414 415 Args: 416 sample_rate (int): Sample rate in Hz. Defaults to 44100. 417 format (str): Audio format (e.g., 'wav'). Defaults to 'wav'. 418 channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1. 419 """ 420 config = hal_pb_pb2.RecordingConfig( 421 sample_rate=sample_rate, 422 format=format, 423 channels=channels 424 ) 425 self.__stub.StartRecording(config) 426 427 def stop_recording(self) -> None: 428 """Stop audio recording.""" 429 self.__stub.StopRecording(hal_pb_pb2.Empty())
14class HAL: 15 """ 16 Hardware Abstraction Layer for interacting with the MilkV board. 17 18 Args: 19 host (str): The IP address of the MilkV board. Defaults to '192.168.42.1'. 20 port (int): The port number for gRPC communication. Defaults to 50051. 21 """ 22 23 def __init__(self, host: str = '192.168.42.1', port: int = 50051) -> None: 24 self.__channel = grpc.insecure_channel(f'{host}:{port}') 25 self.__stub = hal_pb_pb2_grpc.ServoControlStub(self.__channel) 26 self.servo = self.Servo(self.__stub) 27 self.system = self.System(self.__stub) 28 self.imu = self.IMU(self.__stub) 29 self.audio = self.Audio(self.__stub) 30 31 def close(self) -> None: 32 """Close the gRPC channel.""" 33 self.__channel.close() 34 35 class Servo: 36 """Class for servo-related operations.""" 37 38 def __init__(self, stub): 39 self.__stub = stub 40 41 def get_positions(self) -> List[Tuple[int, float, float]]: 42 """ 43 Get current positions and speeds of all servos. 44 45 Returns: 46 List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds. 47 """ 48 response = self.__stub.GetPositions(hal_pb_pb2.Empty()) 49 return [(pos.id, pos.position, pos.speed) for pos in response.positions] 50 51 def set_positions(self, positions: List[Tuple[int, float]]) -> None: 52 """ 53 Set positions for multiple servos. 54 55 Args: 56 positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position. 57 """ 58 joint_positions = [ 59 hal_pb_pb2.JointPosition(id=id, position=position, speed=0) 60 for id, position in positions 61 ] 62 request = hal_pb_pb2.JointPositions(positions=joint_positions) 63 self.__stub.SetPositions(request) 64 65 def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]: 66 """ 67 Get detailed information about a specific servo. 68 69 Args: 70 servo_id (int): The ID of the servo to query. 71 72 Returns: 73 Dict[str, Union[int, float]]: A dictionary containing servo information: 74 75 id: The ID of the servo 76 77 temperature: Current temperature of the servo (in degrees Celsius) 78 79 current: Current draw of the servo (in mAmps) 80 81 voltage: Voltage supplied to the servo (in volts) 82 83 speed: Current speed of the servo (in degrees per second) 84 85 current_position: Current position of the servo (in degrees) 86 87 min_position: Minimum allowed position of the servo (in degrees) 88 89 max_position: Maximum allowed position of the servo (in degrees) 90 91 Raises: 92 Exception: If there's an error retrieving the servo information. 93 """ 94 request = hal_pb_pb2.ServoId(id=servo_id) 95 response = self.__stub.GetServoInfo(request) 96 if response.HasField('info'): 97 info = response.info 98 return { 99 'id': info.id, 100 'temperature': info.temperature, 101 'current': info.current, 102 'voltage': round(info.voltage, 2), 103 'speed': info.speed, 104 'current_position': info.current_position, 105 'min_position': info.min_position, 106 'max_position': info.max_position 107 } 108 else: 109 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 110 111 def scan(self) -> List[int]: 112 """ 113 Scan for connected servos. 114 115 Returns: 116 List[int]: A list of IDs of the connected servos. 117 """ 118 response = self.__stub.Scan(hal_pb_pb2.Empty()) 119 return list(response.ids) 120 121 def change_id(self, old_id: int, new_id: int) -> bool: 122 """ 123 Change the ID of a servo. 124 125 Args: 126 old_id (int): The current ID of the servo. 127 new_id (int): The new ID to assign to the servo. 128 129 Returns: 130 bool: True if the ID change was successful, False otherwise. 131 132 Raises: 133 Exception: If there's an error changing the servo ID. 134 """ 135 request = hal_pb_pb2.IdChange(old_id=old_id, new_id=new_id) 136 response = self.__stub.ChangeId(request) 137 if response.HasField('success'): 138 return response.success 139 else: 140 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 141 142 def start_calibration(self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool: 143 """ 144 Start calibration for a specific servo. 145 146 Args: 147 servo_id (int): The ID of the servo to calibrate. 148 calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300. 149 current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0. 150 151 Returns: 152 bool: True if calibration started successfully, False otherwise. 153 154 Raises: 155 Exception: If there's an error starting the calibration. 156 """ 157 request = hal_pb_pb2.CalibrationRequest( 158 servo_id=servo_id, 159 calibration_speed=calibration_speed, 160 current_threshold=current_threshold 161 ) 162 response = self.__stub.StartCalibration(request) 163 if response.HasField('success'): 164 return response.success 165 else: 166 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 167 168 def cancel_calibration(self, servo_id: int) -> bool: 169 """ 170 Cancel ongoing calibration for a specific servo. 171 172 Args: 173 servo_id (int): The ID of the servo to cancel calibration for. 174 175 Returns: 176 bool: True if calibration was successfully cancelled, False otherwise. 177 178 Raises: 179 Exception: If there's an error cancelling the calibration. 180 """ 181 request = hal_pb_pb2.ServoId(id=servo_id) 182 response = self.__stub.CancelCalibration(request) 183 if response.HasField('success'): 184 return response.success 185 else: 186 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 187 188 def get_calibration_status(self) -> Dict[str, Union[bool, int]]: 189 """ 190 Get the current calibration status. 191 192 Returns: 193 Dict[str, Union[bool, int]]: A dictionary containing calibration status information. 194 195 Raises: 196 Exception: If there's an error retrieving the calibration status. 197 """ 198 response = self.__stub.GetCalibrationStatus(hal_pb_pb2.Empty()) 199 return { 200 'is_calibrating': response.is_calibrating, 201 'calibrating_servo_id': response.calibrating_servo_id 202 } 203 204 def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None: 205 """ 206 Set torque for multiple servos. 207 208 Args: 209 torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque. 210 """ 211 settings = [ 212 hal_pb_pb2.TorqueSetting(id=id, torque=torque) 213 for id, torque in torque_settings 214 ] 215 request = hal_pb_pb2.TorqueSettings(settings=settings) 216 self.__stub.SetTorque(request) 217 218 def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None: 219 """ 220 Enable or disable torque for multiple servos. 221 222 Args: 223 enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque. 224 """ 225 settings = [ 226 hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable) 227 for id, enable in enable_settings 228 ] 229 request = hal_pb_pb2.TorqueEnableSettings(settings=settings) 230 self.__stub.SetTorqueEnable(request) 231 232 def enable_movement(self) -> None: 233 """Enable continuous movement writes for all servos.""" 234 self.__stub.EnableMovement(hal_pb_pb2.Empty()) 235 236 def disable_movement(self) -> None: 237 """Disable continuous movement writes for all servos.""" 238 self.__stub.DisableMovement(hal_pb_pb2.Empty()) 239 240 def set_position(self, servo_id: int, position: float, speed: float = 0) -> None: 241 """ 242 Set position for a single servo. 243 244 Args: 245 servo_id (int): The ID of the servo to control 246 position (float): Target position in degrees 247 speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed). 248 """ 249 request = hal_pb_pb2.JointPosition(id=servo_id, position=position, speed=speed) 250 self.__stub.SetPosition(request) 251 252 class System: 253 """Class for system-related operations.""" 254 255 def __init__(self, stub): 256 self.__stub = stub 257 258 def set_wifi_info(self, ssid: str, password: str) -> None: 259 """ 260 Set WiFi credentials for the MilkV board. 261 262 Args: 263 ssid (str): The SSID of the WiFi network. 264 password (str): The password for the WiFi network. 265 """ 266 request = hal_pb_pb2.WifiCredentials(ssid=ssid, password=password) 267 self.__stub.SetWifiInfo(request) 268 269 def start_video_stream(self) -> None: 270 """Start the video stream.""" 271 self.__stub.StartVideoStream(hal_pb_pb2.Empty()) 272 273 def stop_video_stream(self) -> None: 274 """Stop the video stream.""" 275 self.__stub.StopVideoStream(hal_pb_pb2.Empty()) 276 277 def get_video_stream_urls(self) -> Dict[str, List[str]]: 278 """ 279 Get the URLs for various video stream formats. 280 281 Returns: 282 Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats: 283 284 webrtc: List of WebRTC stream URLs 285 286 hls: List of HTTP Live Streaming (HLS) URLs 287 288 hls_ll: List of Low-Latency HLS URLs 289 290 mse: List of Media Source Extension (MSE) URLs 291 292 rtsp: List of Real-Time Streaming Protocol (RTSP) URLs 293 294 Each list may contain one or more URLs depending on the available streams. 295 """ 296 response = self.__stub.GetVideoStreamUrls(hal_pb_pb2.Empty()) 297 return { 298 'webrtc': list(response.webrtc), 299 'hls': list(response.hls), 300 'hls_ll': list(response.hls_ll), 301 'mse': list(response.mse), 302 'rtsp': list(response.rtsp) 303 } 304 305 class IMU: 306 """Class for IMU-related operations.""" 307 308 def __init__(self, stub): 309 self.__stub = stub 310 311 def get_data(self) -> Dict[str, Dict[str, float]]: 312 """ 313 Get current IMU sensor data including gyroscope and accelerometer readings. 314 315 Returns: 316 Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data: 317 318 { 319 'gyro': {'x': float, 'y': float, 'z': float}, # Angular velocity in degrees/second 320 'accel': {'x': float, 'y': float, 'z': float} # Linear acceleration in g 321 } 322 """ 323 response = self.__stub.GetImuData(hal_pb_pb2.Empty()) 324 return { 325 'gyro': {'x': response.gyro.x, 'y': response.gyro.y, 'z': response.gyro.z}, 326 'accel': {'x': response.accel.x, 'y': response.accel.y, 'z': response.accel.z} 327 } 328 329 class Audio: 330 """Class for audio-related operations.""" 331 332 def __init__(self, stub): 333 self.__stub = stub 334 self.CHUNK_SIZE = 32768 # 32KB chunks 335 336 def upload_file(self, audio_data: bytes, format: str = "wav") -> Dict[str, Union[str, bool]]: 337 """ 338 Upload audio data. 339 340 Args: 341 audio_data (bytes): The audio data 342 format (str): Audio format (e.g., 'wav'). Defaults to 'wav'. 343 344 Returns: 345 Dict[str, Union[str, bool]]: A dictionary containing: 346 - 'audio_id': Identifier for the uploaded audio 347 - 'success': Boolean indicating upload success 348 349 Raises: 350 Exception: If there's an error during upload 351 """ 352 def chunk_generator(): 353 timestamp = int(time.time() * 1000) # Current time in milliseconds 354 for i in range(0, len(audio_data), self.CHUNK_SIZE): 355 chunk = hal_pb_pb2.AudioChunk( 356 data=audio_data[i:i + self.CHUNK_SIZE], 357 format=format, 358 timestamp=timestamp + i # Incrementing timestamp for ordering 359 ) 360 yield chunk 361 362 response = self.__stub.UploadAudio(chunk_generator()) 363 if response.HasField('success'): 364 return { 365 'audio_id': response.audio_id, 366 'success': response.success 367 } 368 else: 369 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 370 371 def get_recording(self) -> Tuple[bytes, str, int]: 372 """ 373 Get recorded audio data as a bytes object. 374 375 Returns: 376 Tuple[bytes, str, int]: Tuple containing: 377 378 - bytes: The complete audio data 379 - str: Audio format 380 - int: Initial timestamp in milliseconds 381 """ 382 audio_data = bytearray() 383 format_type = None 384 timestamp = None 385 386 for chunk in self.__stub.GetRecordedAudio(hal_pb_pb2.Empty()): 387 audio_data.extend(chunk.data) 388 if format_type is None: 389 format_type = chunk.format 390 if timestamp is None: 391 timestamp = chunk.timestamp 392 393 return bytes(audio_data), format_type, timestamp 394 395 def play(self, audio_id: str, volume: float = 1.0) -> None: 396 """ 397 Play uploaded audio. 398 399 Args: 400 audio_id (str): ID of the audio to play 401 volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0. 402 403 Raises: 404 ValueError: If volume is not between 0.0 and 1.0 405 """ 406 if not 0.0 <= volume <= 1.0: 407 raise ValueError("Volume must be between 0.0 and 1.0") 408 409 request = hal_pb_pb2.PlayRequest(audio_id=audio_id, volume=volume) 410 self.__stub.PlayAudio(request) 411 412 def start_recording(self, sample_rate: int = 44100, format: str = "wav", channels: int = 1) -> None: 413 """ 414 Start audio recording. 415 416 Args: 417 sample_rate (int): Sample rate in Hz. Defaults to 44100. 418 format (str): Audio format (e.g., 'wav'). Defaults to 'wav'. 419 channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1. 420 """ 421 config = hal_pb_pb2.RecordingConfig( 422 sample_rate=sample_rate, 423 format=format, 424 channels=channels 425 ) 426 self.__stub.StartRecording(config) 427 428 def stop_recording(self) -> None: 429 """Stop audio recording.""" 430 self.__stub.StopRecording(hal_pb_pb2.Empty())
Hardware Abstraction Layer for interacting with the MilkV board.
Arguments:
- host (str): The IP address of the MilkV board. Defaults to '192.168.42.1'.
- port (int): The port number for gRPC communication. Defaults to 50051.
23 def __init__(self, host: str = '192.168.42.1', port: int = 50051) -> None: 24 self.__channel = grpc.insecure_channel(f'{host}:{port}') 25 self.__stub = hal_pb_pb2_grpc.ServoControlStub(self.__channel) 26 self.servo = self.Servo(self.__stub) 27 self.system = self.System(self.__stub) 28 self.imu = self.IMU(self.__stub) 29 self.audio = self.Audio(self.__stub)
35 class Servo: 36 """Class for servo-related operations.""" 37 38 def __init__(self, stub): 39 self.__stub = stub 40 41 def get_positions(self) -> List[Tuple[int, float, float]]: 42 """ 43 Get current positions and speeds of all servos. 44 45 Returns: 46 List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds. 47 """ 48 response = self.__stub.GetPositions(hal_pb_pb2.Empty()) 49 return [(pos.id, pos.position, pos.speed) for pos in response.positions] 50 51 def set_positions(self, positions: List[Tuple[int, float]]) -> None: 52 """ 53 Set positions for multiple servos. 54 55 Args: 56 positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position. 57 """ 58 joint_positions = [ 59 hal_pb_pb2.JointPosition(id=id, position=position, speed=0) 60 for id, position in positions 61 ] 62 request = hal_pb_pb2.JointPositions(positions=joint_positions) 63 self.__stub.SetPositions(request) 64 65 def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]: 66 """ 67 Get detailed information about a specific servo. 68 69 Args: 70 servo_id (int): The ID of the servo to query. 71 72 Returns: 73 Dict[str, Union[int, float]]: A dictionary containing servo information: 74 75 id: The ID of the servo 76 77 temperature: Current temperature of the servo (in degrees Celsius) 78 79 current: Current draw of the servo (in mAmps) 80 81 voltage: Voltage supplied to the servo (in volts) 82 83 speed: Current speed of the servo (in degrees per second) 84 85 current_position: Current position of the servo (in degrees) 86 87 min_position: Minimum allowed position of the servo (in degrees) 88 89 max_position: Maximum allowed position of the servo (in degrees) 90 91 Raises: 92 Exception: If there's an error retrieving the servo information. 93 """ 94 request = hal_pb_pb2.ServoId(id=servo_id) 95 response = self.__stub.GetServoInfo(request) 96 if response.HasField('info'): 97 info = response.info 98 return { 99 'id': info.id, 100 'temperature': info.temperature, 101 'current': info.current, 102 'voltage': round(info.voltage, 2), 103 'speed': info.speed, 104 'current_position': info.current_position, 105 'min_position': info.min_position, 106 'max_position': info.max_position 107 } 108 else: 109 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 110 111 def scan(self) -> List[int]: 112 """ 113 Scan for connected servos. 114 115 Returns: 116 List[int]: A list of IDs of the connected servos. 117 """ 118 response = self.__stub.Scan(hal_pb_pb2.Empty()) 119 return list(response.ids) 120 121 def change_id(self, old_id: int, new_id: int) -> bool: 122 """ 123 Change the ID of a servo. 124 125 Args: 126 old_id (int): The current ID of the servo. 127 new_id (int): The new ID to assign to the servo. 128 129 Returns: 130 bool: True if the ID change was successful, False otherwise. 131 132 Raises: 133 Exception: If there's an error changing the servo ID. 134 """ 135 request = hal_pb_pb2.IdChange(old_id=old_id, new_id=new_id) 136 response = self.__stub.ChangeId(request) 137 if response.HasField('success'): 138 return response.success 139 else: 140 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 141 142 def start_calibration(self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool: 143 """ 144 Start calibration for a specific servo. 145 146 Args: 147 servo_id (int): The ID of the servo to calibrate. 148 calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300. 149 current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0. 150 151 Returns: 152 bool: True if calibration started successfully, False otherwise. 153 154 Raises: 155 Exception: If there's an error starting the calibration. 156 """ 157 request = hal_pb_pb2.CalibrationRequest( 158 servo_id=servo_id, 159 calibration_speed=calibration_speed, 160 current_threshold=current_threshold 161 ) 162 response = self.__stub.StartCalibration(request) 163 if response.HasField('success'): 164 return response.success 165 else: 166 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 167 168 def cancel_calibration(self, servo_id: int) -> bool: 169 """ 170 Cancel ongoing calibration for a specific servo. 171 172 Args: 173 servo_id (int): The ID of the servo to cancel calibration for. 174 175 Returns: 176 bool: True if calibration was successfully cancelled, False otherwise. 177 178 Raises: 179 Exception: If there's an error cancelling the calibration. 180 """ 181 request = hal_pb_pb2.ServoId(id=servo_id) 182 response = self.__stub.CancelCalibration(request) 183 if response.HasField('success'): 184 return response.success 185 else: 186 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 187 188 def get_calibration_status(self) -> Dict[str, Union[bool, int]]: 189 """ 190 Get the current calibration status. 191 192 Returns: 193 Dict[str, Union[bool, int]]: A dictionary containing calibration status information. 194 195 Raises: 196 Exception: If there's an error retrieving the calibration status. 197 """ 198 response = self.__stub.GetCalibrationStatus(hal_pb_pb2.Empty()) 199 return { 200 'is_calibrating': response.is_calibrating, 201 'calibrating_servo_id': response.calibrating_servo_id 202 } 203 204 def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None: 205 """ 206 Set torque for multiple servos. 207 208 Args: 209 torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque. 210 """ 211 settings = [ 212 hal_pb_pb2.TorqueSetting(id=id, torque=torque) 213 for id, torque in torque_settings 214 ] 215 request = hal_pb_pb2.TorqueSettings(settings=settings) 216 self.__stub.SetTorque(request) 217 218 def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None: 219 """ 220 Enable or disable torque for multiple servos. 221 222 Args: 223 enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque. 224 """ 225 settings = [ 226 hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable) 227 for id, enable in enable_settings 228 ] 229 request = hal_pb_pb2.TorqueEnableSettings(settings=settings) 230 self.__stub.SetTorqueEnable(request) 231 232 def enable_movement(self) -> None: 233 """Enable continuous movement writes for all servos.""" 234 self.__stub.EnableMovement(hal_pb_pb2.Empty()) 235 236 def disable_movement(self) -> None: 237 """Disable continuous movement writes for all servos.""" 238 self.__stub.DisableMovement(hal_pb_pb2.Empty()) 239 240 def set_position(self, servo_id: int, position: float, speed: float = 0) -> None: 241 """ 242 Set position for a single servo. 243 244 Args: 245 servo_id (int): The ID of the servo to control 246 position (float): Target position in degrees 247 speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed). 248 """ 249 request = hal_pb_pb2.JointPosition(id=servo_id, position=position, speed=speed) 250 self.__stub.SetPosition(request)
Class for servo-related operations.
41 def get_positions(self) -> List[Tuple[int, float, float]]: 42 """ 43 Get current positions and speeds of all servos. 44 45 Returns: 46 List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds. 47 """ 48 response = self.__stub.GetPositions(hal_pb_pb2.Empty()) 49 return [(pos.id, pos.position, pos.speed) for pos in response.positions]
Get current positions and speeds of all servos.
Returns:
List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds.
51 def set_positions(self, positions: List[Tuple[int, float]]) -> None: 52 """ 53 Set positions for multiple servos. 54 55 Args: 56 positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position. 57 """ 58 joint_positions = [ 59 hal_pb_pb2.JointPosition(id=id, position=position, speed=0) 60 for id, position in positions 61 ] 62 request = hal_pb_pb2.JointPositions(positions=joint_positions) 63 self.__stub.SetPositions(request)
Set positions for multiple servos.
Arguments:
- positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position.
65 def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]: 66 """ 67 Get detailed information about a specific servo. 68 69 Args: 70 servo_id (int): The ID of the servo to query. 71 72 Returns: 73 Dict[str, Union[int, float]]: A dictionary containing servo information: 74 75 id: The ID of the servo 76 77 temperature: Current temperature of the servo (in degrees Celsius) 78 79 current: Current draw of the servo (in mAmps) 80 81 voltage: Voltage supplied to the servo (in volts) 82 83 speed: Current speed of the servo (in degrees per second) 84 85 current_position: Current position of the servo (in degrees) 86 87 min_position: Minimum allowed position of the servo (in degrees) 88 89 max_position: Maximum allowed position of the servo (in degrees) 90 91 Raises: 92 Exception: If there's an error retrieving the servo information. 93 """ 94 request = hal_pb_pb2.ServoId(id=servo_id) 95 response = self.__stub.GetServoInfo(request) 96 if response.HasField('info'): 97 info = response.info 98 return { 99 'id': info.id, 100 'temperature': info.temperature, 101 'current': info.current, 102 'voltage': round(info.voltage, 2), 103 'speed': info.speed, 104 'current_position': info.current_position, 105 'min_position': info.min_position, 106 'max_position': info.max_position 107 } 108 else: 109 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
Get detailed information about a specific servo.
Arguments:
- servo_id (int): The ID of the servo to query.
Returns:
Dict[str, Union[int, float]]: A dictionary containing servo information:
id: The ID of the servo temperature: Current temperature of the servo (in degrees Celsius) current: Current draw of the servo (in mAmps) voltage: Voltage supplied to the servo (in volts) speed: Current speed of the servo (in degrees per second) current_position: Current position of the servo (in degrees) min_position: Minimum allowed position of the servo (in degrees) max_position: Maximum allowed position of the servo (in degrees)
Raises:
- Exception: If there's an error retrieving the servo information.
111 def scan(self) -> List[int]: 112 """ 113 Scan for connected servos. 114 115 Returns: 116 List[int]: A list of IDs of the connected servos. 117 """ 118 response = self.__stub.Scan(hal_pb_pb2.Empty()) 119 return list(response.ids)
Scan for connected servos.
Returns:
List[int]: A list of IDs of the connected servos.
121 def change_id(self, old_id: int, new_id: int) -> bool: 122 """ 123 Change the ID of a servo. 124 125 Args: 126 old_id (int): The current ID of the servo. 127 new_id (int): The new ID to assign to the servo. 128 129 Returns: 130 bool: True if the ID change was successful, False otherwise. 131 132 Raises: 133 Exception: If there's an error changing the servo ID. 134 """ 135 request = hal_pb_pb2.IdChange(old_id=old_id, new_id=new_id) 136 response = self.__stub.ChangeId(request) 137 if response.HasField('success'): 138 return response.success 139 else: 140 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
Change the ID of a servo.
Arguments:
- old_id (int): The current ID of the servo.
- new_id (int): The new ID to assign to the servo.
Returns:
bool: True if the ID change was successful, False otherwise.
Raises:
- Exception: If there's an error changing the servo ID.
142 def start_calibration(self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool: 143 """ 144 Start calibration for a specific servo. 145 146 Args: 147 servo_id (int): The ID of the servo to calibrate. 148 calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300. 149 current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0. 150 151 Returns: 152 bool: True if calibration started successfully, False otherwise. 153 154 Raises: 155 Exception: If there's an error starting the calibration. 156 """ 157 request = hal_pb_pb2.CalibrationRequest( 158 servo_id=servo_id, 159 calibration_speed=calibration_speed, 160 current_threshold=current_threshold 161 ) 162 response = self.__stub.StartCalibration(request) 163 if response.HasField('success'): 164 return response.success 165 else: 166 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
Start calibration for a specific servo.
Arguments:
- servo_id (int): The ID of the servo to calibrate.
- calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300.
- current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0.
Returns:
bool: True if calibration started successfully, False otherwise.
Raises:
- Exception: If there's an error starting the calibration.
168 def cancel_calibration(self, servo_id: int) -> bool: 169 """ 170 Cancel ongoing calibration for a specific servo. 171 172 Args: 173 servo_id (int): The ID of the servo to cancel calibration for. 174 175 Returns: 176 bool: True if calibration was successfully cancelled, False otherwise. 177 178 Raises: 179 Exception: If there's an error cancelling the calibration. 180 """ 181 request = hal_pb_pb2.ServoId(id=servo_id) 182 response = self.__stub.CancelCalibration(request) 183 if response.HasField('success'): 184 return response.success 185 else: 186 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
Cancel ongoing calibration for a specific servo.
Arguments:
- servo_id (int): The ID of the servo to cancel calibration for.
Returns:
bool: True if calibration was successfully cancelled, False otherwise.
Raises:
- Exception: If there's an error cancelling the calibration.
188 def get_calibration_status(self) -> Dict[str, Union[bool, int]]: 189 """ 190 Get the current calibration status. 191 192 Returns: 193 Dict[str, Union[bool, int]]: A dictionary containing calibration status information. 194 195 Raises: 196 Exception: If there's an error retrieving the calibration status. 197 """ 198 response = self.__stub.GetCalibrationStatus(hal_pb_pb2.Empty()) 199 return { 200 'is_calibrating': response.is_calibrating, 201 'calibrating_servo_id': response.calibrating_servo_id 202 }
Get the current calibration status.
Returns:
Dict[str, Union[bool, int]]: A dictionary containing calibration status information.
Raises:
- Exception: If there's an error retrieving the calibration status.
204 def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None: 205 """ 206 Set torque for multiple servos. 207 208 Args: 209 torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque. 210 """ 211 settings = [ 212 hal_pb_pb2.TorqueSetting(id=id, torque=torque) 213 for id, torque in torque_settings 214 ] 215 request = hal_pb_pb2.TorqueSettings(settings=settings) 216 self.__stub.SetTorque(request)
Set torque for multiple servos.
Arguments:
- torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque.
218 def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None: 219 """ 220 Enable or disable torque for multiple servos. 221 222 Args: 223 enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque. 224 """ 225 settings = [ 226 hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable) 227 for id, enable in enable_settings 228 ] 229 request = hal_pb_pb2.TorqueEnableSettings(settings=settings) 230 self.__stub.SetTorqueEnable(request)
Enable or disable torque for multiple servos.
Arguments:
- enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque.
232 def enable_movement(self) -> None: 233 """Enable continuous movement writes for all servos.""" 234 self.__stub.EnableMovement(hal_pb_pb2.Empty())
Enable continuous movement writes for all servos.
236 def disable_movement(self) -> None: 237 """Disable continuous movement writes for all servos.""" 238 self.__stub.DisableMovement(hal_pb_pb2.Empty())
Disable continuous movement writes for all servos.
240 def set_position(self, servo_id: int, position: float, speed: float = 0) -> None: 241 """ 242 Set position for a single servo. 243 244 Args: 245 servo_id (int): The ID of the servo to control 246 position (float): Target position in degrees 247 speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed). 248 """ 249 request = hal_pb_pb2.JointPosition(id=servo_id, position=position, speed=speed) 250 self.__stub.SetPosition(request)
Set position for a single servo.
Arguments:
- servo_id (int): The ID of the servo to control
- position (float): Target position in degrees
- speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed).
252 class System: 253 """Class for system-related operations.""" 254 255 def __init__(self, stub): 256 self.__stub = stub 257 258 def set_wifi_info(self, ssid: str, password: str) -> None: 259 """ 260 Set WiFi credentials for the MilkV board. 261 262 Args: 263 ssid (str): The SSID of the WiFi network. 264 password (str): The password for the WiFi network. 265 """ 266 request = hal_pb_pb2.WifiCredentials(ssid=ssid, password=password) 267 self.__stub.SetWifiInfo(request) 268 269 def start_video_stream(self) -> None: 270 """Start the video stream.""" 271 self.__stub.StartVideoStream(hal_pb_pb2.Empty()) 272 273 def stop_video_stream(self) -> None: 274 """Stop the video stream.""" 275 self.__stub.StopVideoStream(hal_pb_pb2.Empty()) 276 277 def get_video_stream_urls(self) -> Dict[str, List[str]]: 278 """ 279 Get the URLs for various video stream formats. 280 281 Returns: 282 Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats: 283 284 webrtc: List of WebRTC stream URLs 285 286 hls: List of HTTP Live Streaming (HLS) URLs 287 288 hls_ll: List of Low-Latency HLS URLs 289 290 mse: List of Media Source Extension (MSE) URLs 291 292 rtsp: List of Real-Time Streaming Protocol (RTSP) URLs 293 294 Each list may contain one or more URLs depending on the available streams. 295 """ 296 response = self.__stub.GetVideoStreamUrls(hal_pb_pb2.Empty()) 297 return { 298 'webrtc': list(response.webrtc), 299 'hls': list(response.hls), 300 'hls_ll': list(response.hls_ll), 301 'mse': list(response.mse), 302 'rtsp': list(response.rtsp) 303 }
Class for system-related operations.
258 def set_wifi_info(self, ssid: str, password: str) -> None: 259 """ 260 Set WiFi credentials for the MilkV board. 261 262 Args: 263 ssid (str): The SSID of the WiFi network. 264 password (str): The password for the WiFi network. 265 """ 266 request = hal_pb_pb2.WifiCredentials(ssid=ssid, password=password) 267 self.__stub.SetWifiInfo(request)
Set WiFi credentials for the MilkV board.
Arguments:
- ssid (str): The SSID of the WiFi network.
- password (str): The password for the WiFi network.
269 def start_video_stream(self) -> None: 270 """Start the video stream.""" 271 self.__stub.StartVideoStream(hal_pb_pb2.Empty())
Start the video stream.
273 def stop_video_stream(self) -> None: 274 """Stop the video stream.""" 275 self.__stub.StopVideoStream(hal_pb_pb2.Empty())
Stop the video stream.
277 def get_video_stream_urls(self) -> Dict[str, List[str]]: 278 """ 279 Get the URLs for various video stream formats. 280 281 Returns: 282 Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats: 283 284 webrtc: List of WebRTC stream URLs 285 286 hls: List of HTTP Live Streaming (HLS) URLs 287 288 hls_ll: List of Low-Latency HLS URLs 289 290 mse: List of Media Source Extension (MSE) URLs 291 292 rtsp: List of Real-Time Streaming Protocol (RTSP) URLs 293 294 Each list may contain one or more URLs depending on the available streams. 295 """ 296 response = self.__stub.GetVideoStreamUrls(hal_pb_pb2.Empty()) 297 return { 298 'webrtc': list(response.webrtc), 299 'hls': list(response.hls), 300 'hls_ll': list(response.hls_ll), 301 'mse': list(response.mse), 302 'rtsp': list(response.rtsp) 303 }
Get the URLs for various video stream formats.
Returns:
Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats:
webrtc: List of WebRTC stream URLs hls: List of HTTP Live Streaming (HLS) URLs hls_ll: List of Low-Latency HLS URLs mse: List of Media Source Extension (MSE) URLs rtsp: List of Real-Time Streaming Protocol (RTSP) URLs
Each list may contain one or more URLs depending on the available streams.
305 class IMU: 306 """Class for IMU-related operations.""" 307 308 def __init__(self, stub): 309 self.__stub = stub 310 311 def get_data(self) -> Dict[str, Dict[str, float]]: 312 """ 313 Get current IMU sensor data including gyroscope and accelerometer readings. 314 315 Returns: 316 Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data: 317 318 { 319 'gyro': {'x': float, 'y': float, 'z': float}, # Angular velocity in degrees/second 320 'accel': {'x': float, 'y': float, 'z': float} # Linear acceleration in g 321 } 322 """ 323 response = self.__stub.GetImuData(hal_pb_pb2.Empty()) 324 return { 325 'gyro': {'x': response.gyro.x, 'y': response.gyro.y, 'z': response.gyro.z}, 326 'accel': {'x': response.accel.x, 'y': response.accel.y, 'z': response.accel.z} 327 }
Class for IMU-related operations.
311 def get_data(self) -> Dict[str, Dict[str, float]]: 312 """ 313 Get current IMU sensor data including gyroscope and accelerometer readings. 314 315 Returns: 316 Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data: 317 318 { 319 'gyro': {'x': float, 'y': float, 'z': float}, # Angular velocity in degrees/second 320 'accel': {'x': float, 'y': float, 'z': float} # Linear acceleration in g 321 } 322 """ 323 response = self.__stub.GetImuData(hal_pb_pb2.Empty()) 324 return { 325 'gyro': {'x': response.gyro.x, 'y': response.gyro.y, 'z': response.gyro.z}, 326 'accel': {'x': response.accel.x, 'y': response.accel.y, 'z': response.accel.z} 327 }
Get current IMU sensor data including gyroscope and accelerometer readings.
Returns:
Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data:
{ 'gyro': {'x': float, 'y': float, 'z': float}, # Angular velocity in degrees/second 'accel': {'x': float, 'y': float, 'z': float} # Linear acceleration in g }
329 class Audio: 330 """Class for audio-related operations.""" 331 332 def __init__(self, stub): 333 self.__stub = stub 334 self.CHUNK_SIZE = 32768 # 32KB chunks 335 336 def upload_file(self, audio_data: bytes, format: str = "wav") -> Dict[str, Union[str, bool]]: 337 """ 338 Upload audio data. 339 340 Args: 341 audio_data (bytes): The audio data 342 format (str): Audio format (e.g., 'wav'). Defaults to 'wav'. 343 344 Returns: 345 Dict[str, Union[str, bool]]: A dictionary containing: 346 - 'audio_id': Identifier for the uploaded audio 347 - 'success': Boolean indicating upload success 348 349 Raises: 350 Exception: If there's an error during upload 351 """ 352 def chunk_generator(): 353 timestamp = int(time.time() * 1000) # Current time in milliseconds 354 for i in range(0, len(audio_data), self.CHUNK_SIZE): 355 chunk = hal_pb_pb2.AudioChunk( 356 data=audio_data[i:i + self.CHUNK_SIZE], 357 format=format, 358 timestamp=timestamp + i # Incrementing timestamp for ordering 359 ) 360 yield chunk 361 362 response = self.__stub.UploadAudio(chunk_generator()) 363 if response.HasField('success'): 364 return { 365 'audio_id': response.audio_id, 366 'success': response.success 367 } 368 else: 369 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})") 370 371 def get_recording(self) -> Tuple[bytes, str, int]: 372 """ 373 Get recorded audio data as a bytes object. 374 375 Returns: 376 Tuple[bytes, str, int]: Tuple containing: 377 378 - bytes: The complete audio data 379 - str: Audio format 380 - int: Initial timestamp in milliseconds 381 """ 382 audio_data = bytearray() 383 format_type = None 384 timestamp = None 385 386 for chunk in self.__stub.GetRecordedAudio(hal_pb_pb2.Empty()): 387 audio_data.extend(chunk.data) 388 if format_type is None: 389 format_type = chunk.format 390 if timestamp is None: 391 timestamp = chunk.timestamp 392 393 return bytes(audio_data), format_type, timestamp 394 395 def play(self, audio_id: str, volume: float = 1.0) -> None: 396 """ 397 Play uploaded audio. 398 399 Args: 400 audio_id (str): ID of the audio to play 401 volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0. 402 403 Raises: 404 ValueError: If volume is not between 0.0 and 1.0 405 """ 406 if not 0.0 <= volume <= 1.0: 407 raise ValueError("Volume must be between 0.0 and 1.0") 408 409 request = hal_pb_pb2.PlayRequest(audio_id=audio_id, volume=volume) 410 self.__stub.PlayAudio(request) 411 412 def start_recording(self, sample_rate: int = 44100, format: str = "wav", channels: int = 1) -> None: 413 """ 414 Start audio recording. 415 416 Args: 417 sample_rate (int): Sample rate in Hz. Defaults to 44100. 418 format (str): Audio format (e.g., 'wav'). Defaults to 'wav'. 419 channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1. 420 """ 421 config = hal_pb_pb2.RecordingConfig( 422 sample_rate=sample_rate, 423 format=format, 424 channels=channels 425 ) 426 self.__stub.StartRecording(config) 427 428 def stop_recording(self) -> None: 429 """Stop audio recording.""" 430 self.__stub.StopRecording(hal_pb_pb2.Empty())
Class for audio-related operations.
336 def upload_file(self, audio_data: bytes, format: str = "wav") -> Dict[str, Union[str, bool]]: 337 """ 338 Upload audio data. 339 340 Args: 341 audio_data (bytes): The audio data 342 format (str): Audio format (e.g., 'wav'). Defaults to 'wav'. 343 344 Returns: 345 Dict[str, Union[str, bool]]: A dictionary containing: 346 - 'audio_id': Identifier for the uploaded audio 347 - 'success': Boolean indicating upload success 348 349 Raises: 350 Exception: If there's an error during upload 351 """ 352 def chunk_generator(): 353 timestamp = int(time.time() * 1000) # Current time in milliseconds 354 for i in range(0, len(audio_data), self.CHUNK_SIZE): 355 chunk = hal_pb_pb2.AudioChunk( 356 data=audio_data[i:i + self.CHUNK_SIZE], 357 format=format, 358 timestamp=timestamp + i # Incrementing timestamp for ordering 359 ) 360 yield chunk 361 362 response = self.__stub.UploadAudio(chunk_generator()) 363 if response.HasField('success'): 364 return { 365 'audio_id': response.audio_id, 366 'success': response.success 367 } 368 else: 369 raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
Upload audio data.
Arguments:
- audio_data (bytes): The audio data
- format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
Returns:
Dict[str, Union[str, bool]]: A dictionary containing:
- 'audio_id': Identifier for the uploaded audio
- 'success': Boolean indicating upload success
Raises:
- Exception: If there's an error during upload
371 def get_recording(self) -> Tuple[bytes, str, int]: 372 """ 373 Get recorded audio data as a bytes object. 374 375 Returns: 376 Tuple[bytes, str, int]: Tuple containing: 377 378 - bytes: The complete audio data 379 - str: Audio format 380 - int: Initial timestamp in milliseconds 381 """ 382 audio_data = bytearray() 383 format_type = None 384 timestamp = None 385 386 for chunk in self.__stub.GetRecordedAudio(hal_pb_pb2.Empty()): 387 audio_data.extend(chunk.data) 388 if format_type is None: 389 format_type = chunk.format 390 if timestamp is None: 391 timestamp = chunk.timestamp 392 393 return bytes(audio_data), format_type, timestamp
Get recorded audio data as a bytes object.
Returns:
Tuple[bytes, str, int]: Tuple containing:
- bytes: The complete audio data
- str: Audio format
- int: Initial timestamp in milliseconds
395 def play(self, audio_id: str, volume: float = 1.0) -> None: 396 """ 397 Play uploaded audio. 398 399 Args: 400 audio_id (str): ID of the audio to play 401 volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0. 402 403 Raises: 404 ValueError: If volume is not between 0.0 and 1.0 405 """ 406 if not 0.0 <= volume <= 1.0: 407 raise ValueError("Volume must be between 0.0 and 1.0") 408 409 request = hal_pb_pb2.PlayRequest(audio_id=audio_id, volume=volume) 410 self.__stub.PlayAudio(request)
Play uploaded audio.
Arguments:
- audio_id (str): ID of the audio to play
- volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0.
Raises:
- ValueError: If volume is not between 0.0 and 1.0
412 def start_recording(self, sample_rate: int = 44100, format: str = "wav", channels: int = 1) -> None: 413 """ 414 Start audio recording. 415 416 Args: 417 sample_rate (int): Sample rate in Hz. Defaults to 44100. 418 format (str): Audio format (e.g., 'wav'). Defaults to 'wav'. 419 channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1. 420 """ 421 config = hal_pb_pb2.RecordingConfig( 422 sample_rate=sample_rate, 423 format=format, 424 channels=channels 425 ) 426 self.__stub.StartRecording(config)
Start audio recording.
Arguments:
- sample_rate (int): Sample rate in Hz. Defaults to 44100.
- format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
- channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1.