openlch.hal

  1import grpc
  2from typing import List, Tuple, Dict, Union, Iterator
  3from . import hal_pb_pb2
  4from . import hal_pb_pb2_grpc
  5import time
  6
  7__all__ = ['HAL']
  8
  9__pdoc__ = {}
 10__pdoc__["hal_pb_pb2"] = None
 11__pdoc__["hal_pb_pb2_grpc"] = None
 12
 13class HAL:
 14    """
 15    Hardware Abstraction Layer for interacting with the MilkV board.
 16
 17    Args:
 18        host (str): The IP address of the MilkV board. Defaults to '192.168.42.1'.
 19        port (int): The port number for gRPC communication. Defaults to 50051.
 20    """
 21
 22    def __init__(self, host: str = '192.168.42.1', port: int = 50051) -> None:
 23        self.__channel = grpc.insecure_channel(f'{host}:{port}')
 24        self.__stub = hal_pb_pb2_grpc.ServoControlStub(self.__channel)
 25        self.servo = self.Servo(self.__stub)
 26        self.system = self.System(self.__stub)
 27        self.imu = self.IMU(self.__stub)
 28        self.audio = self.Audio(self.__stub)
 29
 30    def close(self) -> None:
 31        """Close the gRPC channel."""
 32        self.__channel.close()
 33
 34    class Servo:
 35        """Class for servo-related operations."""
 36
 37        def __init__(self, stub):
 38            self.__stub = stub
 39
 40        def get_positions(self) -> List[Tuple[int, float, float]]:
 41            """
 42            Get current positions and speeds of all servos.
 43
 44            Returns:
 45                List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds.
 46            """
 47            response = self.__stub.GetPositions(hal_pb_pb2.Empty())
 48            return [(pos.id, pos.position, pos.speed) for pos in response.positions]
 49
 50        def set_positions(self, positions: List[Tuple[int, float]]) -> None:
 51            """
 52            Set positions for multiple servos.
 53
 54            Args:
 55                positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position.
 56            """
 57            joint_positions = [
 58                hal_pb_pb2.JointPosition(id=id, position=position, speed=0)
 59                for id, position in positions
 60            ]
 61            request = hal_pb_pb2.JointPositions(positions=joint_positions)
 62            self.__stub.SetPositions(request)
 63
 64        def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]:
 65            """
 66            Get detailed information about a specific servo.
 67
 68            Args:
 69                servo_id (int): The ID of the servo to query.
 70
 71            Returns:
 72                Dict[str, Union[int, float]]: A dictionary containing servo information:
 73
 74                    id: The ID of the servo
 75
 76                    temperature: Current temperature of the servo (in degrees Celsius)
 77
 78                    current: Current draw of the servo (in mAmps)
 79
 80                    voltage: Voltage supplied to the servo (in volts)
 81
 82                    speed: Current speed of the servo (in degrees per second)
 83
 84                    current_position: Current position of the servo (in degrees)
 85
 86                    min_position: Minimum allowed position of the servo (in degrees)
 87
 88                    max_position: Maximum allowed position of the servo (in degrees)
 89
 90            Raises:
 91                Exception: If there's an error retrieving the servo information.
 92            """
 93            request = hal_pb_pb2.ServoId(id=servo_id)
 94            response = self.__stub.GetServoInfo(request)
 95            if response.HasField('info'):
 96                info = response.info
 97                return {
 98                    'id': info.id,
 99                    'temperature': info.temperature,
100                    'current': info.current,
101                    'voltage': round(info.voltage, 2),
102                    'speed': info.speed,
103                    'current_position': info.current_position,
104                    'min_position': info.min_position,
105                    'max_position': info.max_position
106                }
107            else:
108                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
109
110        def scan(self) -> List[int]:
111            """
112            Scan for connected servos.
113
114            Returns:
115                List[int]: A list of IDs of the connected servos.
116            """
117            response = self.__stub.Scan(hal_pb_pb2.Empty())
118            return list(response.ids)
119
120        def change_id(self, old_id: int, new_id: int) -> bool:
121            """
122            Change the ID of a servo.
123
124            Args:
125                old_id (int): The current ID of the servo.
126                new_id (int): The new ID to assign to the servo.
127
128            Returns:
129                bool: True if the ID change was successful, False otherwise.
130
131            Raises:
132                Exception: If there's an error changing the servo ID.
133            """
134            request = hal_pb_pb2.IdChange(old_id=old_id, new_id=new_id)
135            response = self.__stub.ChangeId(request)
136            if response.HasField('success'):
137                return response.success
138            else:
139                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
140
141        def start_calibration(self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool:
142            """
143            Start calibration for a specific servo.
144
145            Args:
146                servo_id (int): The ID of the servo to calibrate.
147                calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300.
148                current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0.
149
150            Returns:
151                bool: True if calibration started successfully, False otherwise.
152
153            Raises:
154                Exception: If there's an error starting the calibration.
155            """
156            request = hal_pb_pb2.CalibrationRequest(
157                servo_id=servo_id,
158                calibration_speed=calibration_speed,
159                current_threshold=current_threshold
160            )
161            response = self.__stub.StartCalibration(request)
162            if response.HasField('success'):
163                return response.success
164            else:
165                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
166
167        def cancel_calibration(self, servo_id: int) -> bool:
168            """
169            Cancel ongoing calibration for a specific servo.
170
171            Args:
172                servo_id (int): The ID of the servo to cancel calibration for.
173
174            Returns:
175                bool: True if calibration was successfully cancelled, False otherwise.
176
177            Raises:
178                Exception: If there's an error cancelling the calibration.
179            """
180            request = hal_pb_pb2.ServoId(id=servo_id)
181            response = self.__stub.CancelCalibration(request)
182            if response.HasField('success'):
183                return response.success
184            else:
185                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
186
187        def get_calibration_status(self) -> Dict[str, Union[bool, int]]:
188            """
189            Get the current calibration status.
190
191            Returns:
192                Dict[str, Union[bool, int]]: A dictionary containing calibration status information.
193
194            Raises:
195                Exception: If there's an error retrieving the calibration status.
196            """
197            response = self.__stub.GetCalibrationStatus(hal_pb_pb2.Empty())
198            return {
199                'is_calibrating': response.is_calibrating,
200                'calibrating_servo_id': response.calibrating_servo_id
201            }
202
203        def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None:
204            """
205            Set torque for multiple servos.
206
207            Args:
208                torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque.
209            """
210            settings = [
211                hal_pb_pb2.TorqueSetting(id=id, torque=torque)
212                for id, torque in torque_settings
213            ]
214            request = hal_pb_pb2.TorqueSettings(settings=settings)
215            self.__stub.SetTorque(request)
216
217        def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None:
218            """
219            Enable or disable torque for multiple servos.
220
221            Args:
222                enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque.
223            """
224            settings = [
225                hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable)
226                for id, enable in enable_settings
227            ]
228            request = hal_pb_pb2.TorqueEnableSettings(settings=settings)
229            self.__stub.SetTorqueEnable(request)
230
231        def enable_movement(self) -> None:
232            """Enable continuous movement writes for all servos."""
233            self.__stub.EnableMovement(hal_pb_pb2.Empty())
234
235        def disable_movement(self) -> None:
236            """Disable continuous movement writes for all servos."""
237            self.__stub.DisableMovement(hal_pb_pb2.Empty())
238
239        def set_position(self, servo_id: int, position: float, speed: float = 0) -> None:
240            """
241            Set position for a single servo.
242
243            Args:
244                servo_id (int): The ID of the servo to control
245                position (float): Target position in degrees
246                speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed).
247            """
248            request = hal_pb_pb2.JointPosition(id=servo_id, position=position, speed=speed)
249            self.__stub.SetPosition(request)
250
251    class System:
252        """Class for system-related operations."""
253
254        def __init__(self, stub):
255            self.__stub = stub
256
257        def set_wifi_info(self, ssid: str, password: str) -> None:
258            """
259            Set WiFi credentials for the MilkV board.
260
261            Args:
262                ssid (str): The SSID of the WiFi network.
263                password (str): The password for the WiFi network.
264            """
265            request = hal_pb_pb2.WifiCredentials(ssid=ssid, password=password)
266            self.__stub.SetWifiInfo(request)
267
268        def start_video_stream(self) -> None:
269            """Start the video stream."""
270            self.__stub.StartVideoStream(hal_pb_pb2.Empty())
271
272        def stop_video_stream(self) -> None:
273            """Stop the video stream."""
274            self.__stub.StopVideoStream(hal_pb_pb2.Empty())
275
276        def get_video_stream_urls(self) -> Dict[str, List[str]]:
277            """
278            Get the URLs for various video stream formats.
279
280            Returns:
281                Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats:
282
283                    webrtc: List of WebRTC stream URLs
284
285                    hls: List of HTTP Live Streaming (HLS) URLs
286
287                    hls_ll: List of Low-Latency HLS URLs
288
289                    mse: List of Media Source Extension (MSE) URLs
290
291                    rtsp: List of Real-Time Streaming Protocol (RTSP) URLs
292
293            Each list may contain one or more URLs depending on the available streams.
294            """
295            response = self.__stub.GetVideoStreamUrls(hal_pb_pb2.Empty())
296            return {
297                'webrtc': list(response.webrtc),
298                'hls': list(response.hls),
299                'hls_ll': list(response.hls_ll),
300                'mse': list(response.mse),
301                'rtsp': list(response.rtsp)
302            }
303
304    class IMU:
305        """Class for IMU-related operations."""
306
307        def __init__(self, stub):
308            self.__stub = stub
309
310        def get_data(self) -> Dict[str, Dict[str, float]]:
311            """
312            Get current IMU sensor data including gyroscope and accelerometer readings.
313
314            Returns:
315                Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data:
316
317                    {
318                        'gyro': {'x': float, 'y': float, 'z': float},  # Angular velocity in degrees/second
319                        'accel': {'x': float, 'y': float, 'z': float}  # Linear acceleration in g
320                    }
321            """
322            response = self.__stub.GetImuData(hal_pb_pb2.Empty())
323            return {
324                'gyro': {'x': response.gyro.x, 'y': response.gyro.y, 'z': response.gyro.z},
325                'accel': {'x': response.accel.x, 'y': response.accel.y, 'z': response.accel.z}
326            }
327
328    class Audio:
329        """Class for audio-related operations."""
330
331        def __init__(self, stub):
332            self.__stub = stub
333            self.CHUNK_SIZE = 32768  # 32KB chunks
334
335        def upload_file(self, audio_data: bytes, format: str = "wav") -> Dict[str, Union[str, bool]]:
336            """
337            Upload audio data.
338
339            Args:
340                audio_data (bytes): The audio data
341                format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
342
343            Returns:
344                Dict[str, Union[str, bool]]: A dictionary containing:
345                - 'audio_id': Identifier for the uploaded audio
346                - 'success': Boolean indicating upload success
347
348            Raises:
349                Exception: If there's an error during upload
350            """
351            def chunk_generator():
352                timestamp = int(time.time() * 1000)  # Current time in milliseconds
353                for i in range(0, len(audio_data), self.CHUNK_SIZE):
354                    chunk = hal_pb_pb2.AudioChunk(
355                        data=audio_data[i:i + self.CHUNK_SIZE],
356                        format=format,
357                        timestamp=timestamp + i  # Incrementing timestamp for ordering
358                    )
359                    yield chunk
360
361            response = self.__stub.UploadAudio(chunk_generator())
362            if response.HasField('success'):
363                return {
364                    'audio_id': response.audio_id,
365                    'success': response.success
366                }
367            else:
368                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
369
370        def get_recording(self) -> Tuple[bytes, str, int]:
371            """
372            Get recorded audio data as a bytes object.
373
374            Returns:
375                Tuple[bytes, str, int]: Tuple containing:
376
377                - bytes: The complete audio data
378                - str: Audio format
379                - int: Initial timestamp in milliseconds
380            """
381            audio_data = bytearray()
382            format_type = None
383            timestamp = None
384            
385            for chunk in self.__stub.GetRecordedAudio(hal_pb_pb2.Empty()):
386                audio_data.extend(chunk.data)
387                if format_type is None:
388                    format_type = chunk.format
389                if timestamp is None:
390                    timestamp = chunk.timestamp
391            
392            return bytes(audio_data), format_type, timestamp
393
394        def play(self, audio_id: str, volume: float = 1.0) -> None:
395            """
396            Play uploaded audio.
397
398            Args:
399                audio_id (str): ID of the audio to play
400                volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0.
401
402            Raises:
403                ValueError: If volume is not between 0.0 and 1.0
404            """
405            if not 0.0 <= volume <= 1.0:
406                raise ValueError("Volume must be between 0.0 and 1.0")
407            
408            request = hal_pb_pb2.PlayRequest(audio_id=audio_id, volume=volume)
409            self.__stub.PlayAudio(request)
410
411        def start_recording(self, sample_rate: int = 44100, format: str = "wav", channels: int = 1) -> None:
412            """
413            Start audio recording.
414
415            Args:
416                sample_rate (int): Sample rate in Hz. Defaults to 44100.
417                format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
418                channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1.
419            """
420            config = hal_pb_pb2.RecordingConfig(
421                sample_rate=sample_rate,
422                format=format,
423                channels=channels
424            )
425            self.__stub.StartRecording(config)
426
427        def stop_recording(self) -> None:
428            """Stop audio recording."""
429            self.__stub.StopRecording(hal_pb_pb2.Empty())
class HAL:
 14class HAL:
 15    """
 16    Hardware Abstraction Layer for interacting with the MilkV board.
 17
 18    Args:
 19        host (str): The IP address of the MilkV board. Defaults to '192.168.42.1'.
 20        port (int): The port number for gRPC communication. Defaults to 50051.
 21    """
 22
 23    def __init__(self, host: str = '192.168.42.1', port: int = 50051) -> None:
 24        self.__channel = grpc.insecure_channel(f'{host}:{port}')
 25        self.__stub = hal_pb_pb2_grpc.ServoControlStub(self.__channel)
 26        self.servo = self.Servo(self.__stub)
 27        self.system = self.System(self.__stub)
 28        self.imu = self.IMU(self.__stub)
 29        self.audio = self.Audio(self.__stub)
 30
 31    def close(self) -> None:
 32        """Close the gRPC channel."""
 33        self.__channel.close()
 34
 35    class Servo:
 36        """Class for servo-related operations."""
 37
 38        def __init__(self, stub):
 39            self.__stub = stub
 40
 41        def get_positions(self) -> List[Tuple[int, float, float]]:
 42            """
 43            Get current positions and speeds of all servos.
 44
 45            Returns:
 46                List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds.
 47            """
 48            response = self.__stub.GetPositions(hal_pb_pb2.Empty())
 49            return [(pos.id, pos.position, pos.speed) for pos in response.positions]
 50
 51        def set_positions(self, positions: List[Tuple[int, float]]) -> None:
 52            """
 53            Set positions for multiple servos.
 54
 55            Args:
 56                positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position.
 57            """
 58            joint_positions = [
 59                hal_pb_pb2.JointPosition(id=id, position=position, speed=0)
 60                for id, position in positions
 61            ]
 62            request = hal_pb_pb2.JointPositions(positions=joint_positions)
 63            self.__stub.SetPositions(request)
 64
 65        def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]:
 66            """
 67            Get detailed information about a specific servo.
 68
 69            Args:
 70                servo_id (int): The ID of the servo to query.
 71
 72            Returns:
 73                Dict[str, Union[int, float]]: A dictionary containing servo information:
 74
 75                    id: The ID of the servo
 76
 77                    temperature: Current temperature of the servo (in degrees Celsius)
 78
 79                    current: Current draw of the servo (in mAmps)
 80
 81                    voltage: Voltage supplied to the servo (in volts)
 82
 83                    speed: Current speed of the servo (in degrees per second)
 84
 85                    current_position: Current position of the servo (in degrees)
 86
 87                    min_position: Minimum allowed position of the servo (in degrees)
 88
 89                    max_position: Maximum allowed position of the servo (in degrees)
 90
 91            Raises:
 92                Exception: If there's an error retrieving the servo information.
 93            """
 94            request = hal_pb_pb2.ServoId(id=servo_id)
 95            response = self.__stub.GetServoInfo(request)
 96            if response.HasField('info'):
 97                info = response.info
 98                return {
 99                    'id': info.id,
100                    'temperature': info.temperature,
101                    'current': info.current,
102                    'voltage': round(info.voltage, 2),
103                    'speed': info.speed,
104                    'current_position': info.current_position,
105                    'min_position': info.min_position,
106                    'max_position': info.max_position
107                }
108            else:
109                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
110
111        def scan(self) -> List[int]:
112            """
113            Scan for connected servos.
114
115            Returns:
116                List[int]: A list of IDs of the connected servos.
117            """
118            response = self.__stub.Scan(hal_pb_pb2.Empty())
119            return list(response.ids)
120
121        def change_id(self, old_id: int, new_id: int) -> bool:
122            """
123            Change the ID of a servo.
124
125            Args:
126                old_id (int): The current ID of the servo.
127                new_id (int): The new ID to assign to the servo.
128
129            Returns:
130                bool: True if the ID change was successful, False otherwise.
131
132            Raises:
133                Exception: If there's an error changing the servo ID.
134            """
135            request = hal_pb_pb2.IdChange(old_id=old_id, new_id=new_id)
136            response = self.__stub.ChangeId(request)
137            if response.HasField('success'):
138                return response.success
139            else:
140                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
141
142        def start_calibration(self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool:
143            """
144            Start calibration for a specific servo.
145
146            Args:
147                servo_id (int): The ID of the servo to calibrate.
148                calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300.
149                current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0.
150
151            Returns:
152                bool: True if calibration started successfully, False otherwise.
153
154            Raises:
155                Exception: If there's an error starting the calibration.
156            """
157            request = hal_pb_pb2.CalibrationRequest(
158                servo_id=servo_id,
159                calibration_speed=calibration_speed,
160                current_threshold=current_threshold
161            )
162            response = self.__stub.StartCalibration(request)
163            if response.HasField('success'):
164                return response.success
165            else:
166                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
167
168        def cancel_calibration(self, servo_id: int) -> bool:
169            """
170            Cancel ongoing calibration for a specific servo.
171
172            Args:
173                servo_id (int): The ID of the servo to cancel calibration for.
174
175            Returns:
176                bool: True if calibration was successfully cancelled, False otherwise.
177
178            Raises:
179                Exception: If there's an error cancelling the calibration.
180            """
181            request = hal_pb_pb2.ServoId(id=servo_id)
182            response = self.__stub.CancelCalibration(request)
183            if response.HasField('success'):
184                return response.success
185            else:
186                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
187
188        def get_calibration_status(self) -> Dict[str, Union[bool, int]]:
189            """
190            Get the current calibration status.
191
192            Returns:
193                Dict[str, Union[bool, int]]: A dictionary containing calibration status information.
194
195            Raises:
196                Exception: If there's an error retrieving the calibration status.
197            """
198            response = self.__stub.GetCalibrationStatus(hal_pb_pb2.Empty())
199            return {
200                'is_calibrating': response.is_calibrating,
201                'calibrating_servo_id': response.calibrating_servo_id
202            }
203
204        def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None:
205            """
206            Set torque for multiple servos.
207
208            Args:
209                torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque.
210            """
211            settings = [
212                hal_pb_pb2.TorqueSetting(id=id, torque=torque)
213                for id, torque in torque_settings
214            ]
215            request = hal_pb_pb2.TorqueSettings(settings=settings)
216            self.__stub.SetTorque(request)
217
218        def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None:
219            """
220            Enable or disable torque for multiple servos.
221
222            Args:
223                enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque.
224            """
225            settings = [
226                hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable)
227                for id, enable in enable_settings
228            ]
229            request = hal_pb_pb2.TorqueEnableSettings(settings=settings)
230            self.__stub.SetTorqueEnable(request)
231
232        def enable_movement(self) -> None:
233            """Enable continuous movement writes for all servos."""
234            self.__stub.EnableMovement(hal_pb_pb2.Empty())
235
236        def disable_movement(self) -> None:
237            """Disable continuous movement writes for all servos."""
238            self.__stub.DisableMovement(hal_pb_pb2.Empty())
239
240        def set_position(self, servo_id: int, position: float, speed: float = 0) -> None:
241            """
242            Set position for a single servo.
243
244            Args:
245                servo_id (int): The ID of the servo to control
246                position (float): Target position in degrees
247                speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed).
248            """
249            request = hal_pb_pb2.JointPosition(id=servo_id, position=position, speed=speed)
250            self.__stub.SetPosition(request)
251
252    class System:
253        """Class for system-related operations."""
254
255        def __init__(self, stub):
256            self.__stub = stub
257
258        def set_wifi_info(self, ssid: str, password: str) -> None:
259            """
260            Set WiFi credentials for the MilkV board.
261
262            Args:
263                ssid (str): The SSID of the WiFi network.
264                password (str): The password for the WiFi network.
265            """
266            request = hal_pb_pb2.WifiCredentials(ssid=ssid, password=password)
267            self.__stub.SetWifiInfo(request)
268
269        def start_video_stream(self) -> None:
270            """Start the video stream."""
271            self.__stub.StartVideoStream(hal_pb_pb2.Empty())
272
273        def stop_video_stream(self) -> None:
274            """Stop the video stream."""
275            self.__stub.StopVideoStream(hal_pb_pb2.Empty())
276
277        def get_video_stream_urls(self) -> Dict[str, List[str]]:
278            """
279            Get the URLs for various video stream formats.
280
281            Returns:
282                Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats:
283
284                    webrtc: List of WebRTC stream URLs
285
286                    hls: List of HTTP Live Streaming (HLS) URLs
287
288                    hls_ll: List of Low-Latency HLS URLs
289
290                    mse: List of Media Source Extension (MSE) URLs
291
292                    rtsp: List of Real-Time Streaming Protocol (RTSP) URLs
293
294            Each list may contain one or more URLs depending on the available streams.
295            """
296            response = self.__stub.GetVideoStreamUrls(hal_pb_pb2.Empty())
297            return {
298                'webrtc': list(response.webrtc),
299                'hls': list(response.hls),
300                'hls_ll': list(response.hls_ll),
301                'mse': list(response.mse),
302                'rtsp': list(response.rtsp)
303            }
304
305    class IMU:
306        """Class for IMU-related operations."""
307
308        def __init__(self, stub):
309            self.__stub = stub
310
311        def get_data(self) -> Dict[str, Dict[str, float]]:
312            """
313            Get current IMU sensor data including gyroscope and accelerometer readings.
314
315            Returns:
316                Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data:
317
318                    {
319                        'gyro': {'x': float, 'y': float, 'z': float},  # Angular velocity in degrees/second
320                        'accel': {'x': float, 'y': float, 'z': float}  # Linear acceleration in g
321                    }
322            """
323            response = self.__stub.GetImuData(hal_pb_pb2.Empty())
324            return {
325                'gyro': {'x': response.gyro.x, 'y': response.gyro.y, 'z': response.gyro.z},
326                'accel': {'x': response.accel.x, 'y': response.accel.y, 'z': response.accel.z}
327            }
328
329    class Audio:
330        """Class for audio-related operations."""
331
332        def __init__(self, stub):
333            self.__stub = stub
334            self.CHUNK_SIZE = 32768  # 32KB chunks
335
336        def upload_file(self, audio_data: bytes, format: str = "wav") -> Dict[str, Union[str, bool]]:
337            """
338            Upload audio data.
339
340            Args:
341                audio_data (bytes): The audio data
342                format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
343
344            Returns:
345                Dict[str, Union[str, bool]]: A dictionary containing:
346                - 'audio_id': Identifier for the uploaded audio
347                - 'success': Boolean indicating upload success
348
349            Raises:
350                Exception: If there's an error during upload
351            """
352            def chunk_generator():
353                timestamp = int(time.time() * 1000)  # Current time in milliseconds
354                for i in range(0, len(audio_data), self.CHUNK_SIZE):
355                    chunk = hal_pb_pb2.AudioChunk(
356                        data=audio_data[i:i + self.CHUNK_SIZE],
357                        format=format,
358                        timestamp=timestamp + i  # Incrementing timestamp for ordering
359                    )
360                    yield chunk
361
362            response = self.__stub.UploadAudio(chunk_generator())
363            if response.HasField('success'):
364                return {
365                    'audio_id': response.audio_id,
366                    'success': response.success
367                }
368            else:
369                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
370
371        def get_recording(self) -> Tuple[bytes, str, int]:
372            """
373            Get recorded audio data as a bytes object.
374
375            Returns:
376                Tuple[bytes, str, int]: Tuple containing:
377
378                - bytes: The complete audio data
379                - str: Audio format
380                - int: Initial timestamp in milliseconds
381            """
382            audio_data = bytearray()
383            format_type = None
384            timestamp = None
385            
386            for chunk in self.__stub.GetRecordedAudio(hal_pb_pb2.Empty()):
387                audio_data.extend(chunk.data)
388                if format_type is None:
389                    format_type = chunk.format
390                if timestamp is None:
391                    timestamp = chunk.timestamp
392            
393            return bytes(audio_data), format_type, timestamp
394
395        def play(self, audio_id: str, volume: float = 1.0) -> None:
396            """
397            Play uploaded audio.
398
399            Args:
400                audio_id (str): ID of the audio to play
401                volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0.
402
403            Raises:
404                ValueError: If volume is not between 0.0 and 1.0
405            """
406            if not 0.0 <= volume <= 1.0:
407                raise ValueError("Volume must be between 0.0 and 1.0")
408            
409            request = hal_pb_pb2.PlayRequest(audio_id=audio_id, volume=volume)
410            self.__stub.PlayAudio(request)
411
412        def start_recording(self, sample_rate: int = 44100, format: str = "wav", channels: int = 1) -> None:
413            """
414            Start audio recording.
415
416            Args:
417                sample_rate (int): Sample rate in Hz. Defaults to 44100.
418                format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
419                channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1.
420            """
421            config = hal_pb_pb2.RecordingConfig(
422                sample_rate=sample_rate,
423                format=format,
424                channels=channels
425            )
426            self.__stub.StartRecording(config)
427
428        def stop_recording(self) -> None:
429            """Stop audio recording."""
430            self.__stub.StopRecording(hal_pb_pb2.Empty())

Hardware Abstraction Layer for interacting with the MilkV board.

Arguments:
  • host (str): The IP address of the MilkV board. Defaults to '192.168.42.1'.
  • port (int): The port number for gRPC communication. Defaults to 50051.
HAL(host: str = '192.168.42.1', port: int = 50051)
23    def __init__(self, host: str = '192.168.42.1', port: int = 50051) -> None:
24        self.__channel = grpc.insecure_channel(f'{host}:{port}')
25        self.__stub = hal_pb_pb2_grpc.ServoControlStub(self.__channel)
26        self.servo = self.Servo(self.__stub)
27        self.system = self.System(self.__stub)
28        self.imu = self.IMU(self.__stub)
29        self.audio = self.Audio(self.__stub)
servo
system
imu
audio
def close(self) -> None:
31    def close(self) -> None:
32        """Close the gRPC channel."""
33        self.__channel.close()

Close the gRPC channel.

class HAL.Servo:
 35    class Servo:
 36        """Class for servo-related operations."""
 37
 38        def __init__(self, stub):
 39            self.__stub = stub
 40
 41        def get_positions(self) -> List[Tuple[int, float, float]]:
 42            """
 43            Get current positions and speeds of all servos.
 44
 45            Returns:
 46                List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds.
 47            """
 48            response = self.__stub.GetPositions(hal_pb_pb2.Empty())
 49            return [(pos.id, pos.position, pos.speed) for pos in response.positions]
 50
 51        def set_positions(self, positions: List[Tuple[int, float]]) -> None:
 52            """
 53            Set positions for multiple servos.
 54
 55            Args:
 56                positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position.
 57            """
 58            joint_positions = [
 59                hal_pb_pb2.JointPosition(id=id, position=position, speed=0)
 60                for id, position in positions
 61            ]
 62            request = hal_pb_pb2.JointPositions(positions=joint_positions)
 63            self.__stub.SetPositions(request)
 64
 65        def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]:
 66            """
 67            Get detailed information about a specific servo.
 68
 69            Args:
 70                servo_id (int): The ID of the servo to query.
 71
 72            Returns:
 73                Dict[str, Union[int, float]]: A dictionary containing servo information:
 74
 75                    id: The ID of the servo
 76
 77                    temperature: Current temperature of the servo (in degrees Celsius)
 78
 79                    current: Current draw of the servo (in mAmps)
 80
 81                    voltage: Voltage supplied to the servo (in volts)
 82
 83                    speed: Current speed of the servo (in degrees per second)
 84
 85                    current_position: Current position of the servo (in degrees)
 86
 87                    min_position: Minimum allowed position of the servo (in degrees)
 88
 89                    max_position: Maximum allowed position of the servo (in degrees)
 90
 91            Raises:
 92                Exception: If there's an error retrieving the servo information.
 93            """
 94            request = hal_pb_pb2.ServoId(id=servo_id)
 95            response = self.__stub.GetServoInfo(request)
 96            if response.HasField('info'):
 97                info = response.info
 98                return {
 99                    'id': info.id,
100                    'temperature': info.temperature,
101                    'current': info.current,
102                    'voltage': round(info.voltage, 2),
103                    'speed': info.speed,
104                    'current_position': info.current_position,
105                    'min_position': info.min_position,
106                    'max_position': info.max_position
107                }
108            else:
109                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
110
111        def scan(self) -> List[int]:
112            """
113            Scan for connected servos.
114
115            Returns:
116                List[int]: A list of IDs of the connected servos.
117            """
118            response = self.__stub.Scan(hal_pb_pb2.Empty())
119            return list(response.ids)
120
121        def change_id(self, old_id: int, new_id: int) -> bool:
122            """
123            Change the ID of a servo.
124
125            Args:
126                old_id (int): The current ID of the servo.
127                new_id (int): The new ID to assign to the servo.
128
129            Returns:
130                bool: True if the ID change was successful, False otherwise.
131
132            Raises:
133                Exception: If there's an error changing the servo ID.
134            """
135            request = hal_pb_pb2.IdChange(old_id=old_id, new_id=new_id)
136            response = self.__stub.ChangeId(request)
137            if response.HasField('success'):
138                return response.success
139            else:
140                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
141
142        def start_calibration(self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool:
143            """
144            Start calibration for a specific servo.
145
146            Args:
147                servo_id (int): The ID of the servo to calibrate.
148                calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300.
149                current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0.
150
151            Returns:
152                bool: True if calibration started successfully, False otherwise.
153
154            Raises:
155                Exception: If there's an error starting the calibration.
156            """
157            request = hal_pb_pb2.CalibrationRequest(
158                servo_id=servo_id,
159                calibration_speed=calibration_speed,
160                current_threshold=current_threshold
161            )
162            response = self.__stub.StartCalibration(request)
163            if response.HasField('success'):
164                return response.success
165            else:
166                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
167
168        def cancel_calibration(self, servo_id: int) -> bool:
169            """
170            Cancel ongoing calibration for a specific servo.
171
172            Args:
173                servo_id (int): The ID of the servo to cancel calibration for.
174
175            Returns:
176                bool: True if calibration was successfully cancelled, False otherwise.
177
178            Raises:
179                Exception: If there's an error cancelling the calibration.
180            """
181            request = hal_pb_pb2.ServoId(id=servo_id)
182            response = self.__stub.CancelCalibration(request)
183            if response.HasField('success'):
184                return response.success
185            else:
186                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
187
188        def get_calibration_status(self) -> Dict[str, Union[bool, int]]:
189            """
190            Get the current calibration status.
191
192            Returns:
193                Dict[str, Union[bool, int]]: A dictionary containing calibration status information.
194
195            Raises:
196                Exception: If there's an error retrieving the calibration status.
197            """
198            response = self.__stub.GetCalibrationStatus(hal_pb_pb2.Empty())
199            return {
200                'is_calibrating': response.is_calibrating,
201                'calibrating_servo_id': response.calibrating_servo_id
202            }
203
204        def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None:
205            """
206            Set torque for multiple servos.
207
208            Args:
209                torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque.
210            """
211            settings = [
212                hal_pb_pb2.TorqueSetting(id=id, torque=torque)
213                for id, torque in torque_settings
214            ]
215            request = hal_pb_pb2.TorqueSettings(settings=settings)
216            self.__stub.SetTorque(request)
217
218        def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None:
219            """
220            Enable or disable torque for multiple servos.
221
222            Args:
223                enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque.
224            """
225            settings = [
226                hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable)
227                for id, enable in enable_settings
228            ]
229            request = hal_pb_pb2.TorqueEnableSettings(settings=settings)
230            self.__stub.SetTorqueEnable(request)
231
232        def enable_movement(self) -> None:
233            """Enable continuous movement writes for all servos."""
234            self.__stub.EnableMovement(hal_pb_pb2.Empty())
235
236        def disable_movement(self) -> None:
237            """Disable continuous movement writes for all servos."""
238            self.__stub.DisableMovement(hal_pb_pb2.Empty())
239
240        def set_position(self, servo_id: int, position: float, speed: float = 0) -> None:
241            """
242            Set position for a single servo.
243
244            Args:
245                servo_id (int): The ID of the servo to control
246                position (float): Target position in degrees
247                speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed).
248            """
249            request = hal_pb_pb2.JointPosition(id=servo_id, position=position, speed=speed)
250            self.__stub.SetPosition(request)

Class for servo-related operations.

HAL.Servo(stub)
38        def __init__(self, stub):
39            self.__stub = stub
def get_positions(self) -> List[Tuple[int, float, float]]:
41        def get_positions(self) -> List[Tuple[int, float, float]]:
42            """
43            Get current positions and speeds of all servos.
44
45            Returns:
46                List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds.
47            """
48            response = self.__stub.GetPositions(hal_pb_pb2.Empty())
49            return [(pos.id, pos.position, pos.speed) for pos in response.positions]

Get current positions and speeds of all servos.

Returns:

List[Tuple[int, float, float]]: A list of tuples containing servo IDs, their positions, and speeds.

def set_positions(self, positions: List[Tuple[int, float]]) -> None:
51        def set_positions(self, positions: List[Tuple[int, float]]) -> None:
52            """
53            Set positions for multiple servos.
54
55            Args:
56                positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position.
57            """
58            joint_positions = [
59                hal_pb_pb2.JointPosition(id=id, position=position, speed=0)
60                for id, position in positions
61            ]
62            request = hal_pb_pb2.JointPositions(positions=joint_positions)
63            self.__stub.SetPositions(request)

Set positions for multiple servos.

Arguments:
  • positions (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target position.
def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]:
 65        def get_servo_info(self, servo_id: int) -> Dict[str, Union[int, float]]:
 66            """
 67            Get detailed information about a specific servo.
 68
 69            Args:
 70                servo_id (int): The ID of the servo to query.
 71
 72            Returns:
 73                Dict[str, Union[int, float]]: A dictionary containing servo information:
 74
 75                    id: The ID of the servo
 76
 77                    temperature: Current temperature of the servo (in degrees Celsius)
 78
 79                    current: Current draw of the servo (in mAmps)
 80
 81                    voltage: Voltage supplied to the servo (in volts)
 82
 83                    speed: Current speed of the servo (in degrees per second)
 84
 85                    current_position: Current position of the servo (in degrees)
 86
 87                    min_position: Minimum allowed position of the servo (in degrees)
 88
 89                    max_position: Maximum allowed position of the servo (in degrees)
 90
 91            Raises:
 92                Exception: If there's an error retrieving the servo information.
 93            """
 94            request = hal_pb_pb2.ServoId(id=servo_id)
 95            response = self.__stub.GetServoInfo(request)
 96            if response.HasField('info'):
 97                info = response.info
 98                return {
 99                    'id': info.id,
100                    'temperature': info.temperature,
101                    'current': info.current,
102                    'voltage': round(info.voltage, 2),
103                    'speed': info.speed,
104                    'current_position': info.current_position,
105                    'min_position': info.min_position,
106                    'max_position': info.max_position
107                }
108            else:
109                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")

Get detailed information about a specific servo.

Arguments:
  • servo_id (int): The ID of the servo to query.
Returns:

Dict[str, Union[int, float]]: A dictionary containing servo information:

id: The ID of the servo

temperature: Current temperature of the servo (in degrees Celsius)

current: Current draw of the servo (in mAmps)

voltage: Voltage supplied to the servo (in volts)

speed: Current speed of the servo (in degrees per second)

current_position: Current position of the servo (in degrees)

min_position: Minimum allowed position of the servo (in degrees)

max_position: Maximum allowed position of the servo (in degrees)
Raises:
  • Exception: If there's an error retrieving the servo information.
def scan(self) -> List[int]:
111        def scan(self) -> List[int]:
112            """
113            Scan for connected servos.
114
115            Returns:
116                List[int]: A list of IDs of the connected servos.
117            """
118            response = self.__stub.Scan(hal_pb_pb2.Empty())
119            return list(response.ids)

Scan for connected servos.

Returns:

List[int]: A list of IDs of the connected servos.

def change_id(self, old_id: int, new_id: int) -> bool:
121        def change_id(self, old_id: int, new_id: int) -> bool:
122            """
123            Change the ID of a servo.
124
125            Args:
126                old_id (int): The current ID of the servo.
127                new_id (int): The new ID to assign to the servo.
128
129            Returns:
130                bool: True if the ID change was successful, False otherwise.
131
132            Raises:
133                Exception: If there's an error changing the servo ID.
134            """
135            request = hal_pb_pb2.IdChange(old_id=old_id, new_id=new_id)
136            response = self.__stub.ChangeId(request)
137            if response.HasField('success'):
138                return response.success
139            else:
140                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")

Change the ID of a servo.

Arguments:
  • old_id (int): The current ID of the servo.
  • new_id (int): The new ID to assign to the servo.
Returns:

bool: True if the ID change was successful, False otherwise.

Raises:
  • Exception: If there's an error changing the servo ID.
def start_calibration( self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool:
142        def start_calibration(self, servo_id: int, calibration_speed: int = 300, current_threshold: float = 600.0) -> bool:
143            """
144            Start calibration for a specific servo.
145
146            Args:
147                servo_id (int): The ID of the servo to calibrate.
148                calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300.
149                current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0.
150
151            Returns:
152                bool: True if calibration started successfully, False otherwise.
153
154            Raises:
155                Exception: If there's an error starting the calibration.
156            """
157            request = hal_pb_pb2.CalibrationRequest(
158                servo_id=servo_id,
159                calibration_speed=calibration_speed,
160                current_threshold=current_threshold
161            )
162            response = self.__stub.StartCalibration(request)
163            if response.HasField('success'):
164                return response.success
165            else:
166                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")

Start calibration for a specific servo.

Arguments:
  • servo_id (int): The ID of the servo to calibrate.
  • calibration_speed (int, optional): Speed of calibration movement in degrees per second. Defaults to 300.
  • current_threshold (float, optional): Current threshold in mA to detect end stops. Defaults to 600.0.
Returns:

bool: True if calibration started successfully, False otherwise.

Raises:
  • Exception: If there's an error starting the calibration.
def cancel_calibration(self, servo_id: int) -> bool:
168        def cancel_calibration(self, servo_id: int) -> bool:
169            """
170            Cancel ongoing calibration for a specific servo.
171
172            Args:
173                servo_id (int): The ID of the servo to cancel calibration for.
174
175            Returns:
176                bool: True if calibration was successfully cancelled, False otherwise.
177
178            Raises:
179                Exception: If there's an error cancelling the calibration.
180            """
181            request = hal_pb_pb2.ServoId(id=servo_id)
182            response = self.__stub.CancelCalibration(request)
183            if response.HasField('success'):
184                return response.success
185            else:
186                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")

Cancel ongoing calibration for a specific servo.

Arguments:
  • servo_id (int): The ID of the servo to cancel calibration for.
Returns:

bool: True if calibration was successfully cancelled, False otherwise.

Raises:
  • Exception: If there's an error cancelling the calibration.
def get_calibration_status(self) -> Dict[str, Union[bool, int]]:
188        def get_calibration_status(self) -> Dict[str, Union[bool, int]]:
189            """
190            Get the current calibration status.
191
192            Returns:
193                Dict[str, Union[bool, int]]: A dictionary containing calibration status information.
194
195            Raises:
196                Exception: If there's an error retrieving the calibration status.
197            """
198            response = self.__stub.GetCalibrationStatus(hal_pb_pb2.Empty())
199            return {
200                'is_calibrating': response.is_calibrating,
201                'calibrating_servo_id': response.calibrating_servo_id
202            }

Get the current calibration status.

Returns:

Dict[str, Union[bool, int]]: A dictionary containing calibration status information.

Raises:
  • Exception: If there's an error retrieving the calibration status.
def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None:
204        def set_torque(self, torque_settings: List[Tuple[int, float]]) -> None:
205            """
206            Set torque for multiple servos.
207
208            Args:
209                torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque.
210            """
211            settings = [
212                hal_pb_pb2.TorqueSetting(id=id, torque=torque)
213                for id, torque in torque_settings
214            ]
215            request = hal_pb_pb2.TorqueSettings(settings=settings)
216            self.__stub.SetTorque(request)

Set torque for multiple servos.

Arguments:
  • torque_settings (List[Tuple[int, float]]): A list of tuples, each containing a servo ID and its target torque.
def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None:
218        def set_torque_enable(self, enable_settings: List[Tuple[int, bool]]) -> None:
219            """
220            Enable or disable torque for multiple servos.
221
222            Args:
223                enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque.
224            """
225            settings = [
226                hal_pb_pb2.TorqueEnableSetting(id=id, enable=enable)
227                for id, enable in enable_settings
228            ]
229            request = hal_pb_pb2.TorqueEnableSettings(settings=settings)
230            self.__stub.SetTorqueEnable(request)

Enable or disable torque for multiple servos.

Arguments:
  • enable_settings (List[Tuple[int, bool]]): A list of tuples, each containing a servo ID and a boolean indicating whether to enable torque.
def enable_movement(self) -> None:
232        def enable_movement(self) -> None:
233            """Enable continuous movement writes for all servos."""
234            self.__stub.EnableMovement(hal_pb_pb2.Empty())

Enable continuous movement writes for all servos.

def disable_movement(self) -> None:
236        def disable_movement(self) -> None:
237            """Disable continuous movement writes for all servos."""
238            self.__stub.DisableMovement(hal_pb_pb2.Empty())

Disable continuous movement writes for all servos.

def set_position(self, servo_id: int, position: float, speed: float = 0) -> None:
240        def set_position(self, servo_id: int, position: float, speed: float = 0) -> None:
241            """
242            Set position for a single servo.
243
244            Args:
245                servo_id (int): The ID of the servo to control
246                position (float): Target position in degrees
247                speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed).
248            """
249            request = hal_pb_pb2.JointPosition(id=servo_id, position=position, speed=speed)
250            self.__stub.SetPosition(request)

Set position for a single servo.

Arguments:
  • servo_id (int): The ID of the servo to control
  • position (float): Target position in degrees
  • speed (float, optional): Movement speed in degrees per second. Defaults to 0 (maximum speed).
class HAL.System:
252    class System:
253        """Class for system-related operations."""
254
255        def __init__(self, stub):
256            self.__stub = stub
257
258        def set_wifi_info(self, ssid: str, password: str) -> None:
259            """
260            Set WiFi credentials for the MilkV board.
261
262            Args:
263                ssid (str): The SSID of the WiFi network.
264                password (str): The password for the WiFi network.
265            """
266            request = hal_pb_pb2.WifiCredentials(ssid=ssid, password=password)
267            self.__stub.SetWifiInfo(request)
268
269        def start_video_stream(self) -> None:
270            """Start the video stream."""
271            self.__stub.StartVideoStream(hal_pb_pb2.Empty())
272
273        def stop_video_stream(self) -> None:
274            """Stop the video stream."""
275            self.__stub.StopVideoStream(hal_pb_pb2.Empty())
276
277        def get_video_stream_urls(self) -> Dict[str, List[str]]:
278            """
279            Get the URLs for various video stream formats.
280
281            Returns:
282                Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats:
283
284                    webrtc: List of WebRTC stream URLs
285
286                    hls: List of HTTP Live Streaming (HLS) URLs
287
288                    hls_ll: List of Low-Latency HLS URLs
289
290                    mse: List of Media Source Extension (MSE) URLs
291
292                    rtsp: List of Real-Time Streaming Protocol (RTSP) URLs
293
294            Each list may contain one or more URLs depending on the available streams.
295            """
296            response = self.__stub.GetVideoStreamUrls(hal_pb_pb2.Empty())
297            return {
298                'webrtc': list(response.webrtc),
299                'hls': list(response.hls),
300                'hls_ll': list(response.hls_ll),
301                'mse': list(response.mse),
302                'rtsp': list(response.rtsp)
303            }

Class for system-related operations.

HAL.System(stub)
255        def __init__(self, stub):
256            self.__stub = stub
def set_wifi_info(self, ssid: str, password: str) -> None:
258        def set_wifi_info(self, ssid: str, password: str) -> None:
259            """
260            Set WiFi credentials for the MilkV board.
261
262            Args:
263                ssid (str): The SSID of the WiFi network.
264                password (str): The password for the WiFi network.
265            """
266            request = hal_pb_pb2.WifiCredentials(ssid=ssid, password=password)
267            self.__stub.SetWifiInfo(request)

Set WiFi credentials for the MilkV board.

Arguments:
  • ssid (str): The SSID of the WiFi network.
  • password (str): The password for the WiFi network.
def start_video_stream(self) -> None:
269        def start_video_stream(self) -> None:
270            """Start the video stream."""
271            self.__stub.StartVideoStream(hal_pb_pb2.Empty())

Start the video stream.

def stop_video_stream(self) -> None:
273        def stop_video_stream(self) -> None:
274            """Stop the video stream."""
275            self.__stub.StopVideoStream(hal_pb_pb2.Empty())

Stop the video stream.

def get_video_stream_urls(self) -> Dict[str, List[str]]:
277        def get_video_stream_urls(self) -> Dict[str, List[str]]:
278            """
279            Get the URLs for various video stream formats.
280
281            Returns:
282                Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats:
283
284                    webrtc: List of WebRTC stream URLs
285
286                    hls: List of HTTP Live Streaming (HLS) URLs
287
288                    hls_ll: List of Low-Latency HLS URLs
289
290                    mse: List of Media Source Extension (MSE) URLs
291
292                    rtsp: List of Real-Time Streaming Protocol (RTSP) URLs
293
294            Each list may contain one or more URLs depending on the available streams.
295            """
296            response = self.__stub.GetVideoStreamUrls(hal_pb_pb2.Empty())
297            return {
298                'webrtc': list(response.webrtc),
299                'hls': list(response.hls),
300                'hls_ll': list(response.hls_ll),
301                'mse': list(response.mse),
302                'rtsp': list(response.rtsp)
303            }

Get the URLs for various video stream formats.

Returns:

Dict[str, List[str]]: A dictionary containing lists of URLs for different stream formats:

webrtc: List of WebRTC stream URLs

hls: List of HTTP Live Streaming (HLS) URLs

hls_ll: List of Low-Latency HLS URLs

mse: List of Media Source Extension (MSE) URLs

rtsp: List of Real-Time Streaming Protocol (RTSP) URLs

Each list may contain one or more URLs depending on the available streams.

class HAL.IMU:
305    class IMU:
306        """Class for IMU-related operations."""
307
308        def __init__(self, stub):
309            self.__stub = stub
310
311        def get_data(self) -> Dict[str, Dict[str, float]]:
312            """
313            Get current IMU sensor data including gyroscope and accelerometer readings.
314
315            Returns:
316                Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data:
317
318                    {
319                        'gyro': {'x': float, 'y': float, 'z': float},  # Angular velocity in degrees/second
320                        'accel': {'x': float, 'y': float, 'z': float}  # Linear acceleration in g
321                    }
322            """
323            response = self.__stub.GetImuData(hal_pb_pb2.Empty())
324            return {
325                'gyro': {'x': response.gyro.x, 'y': response.gyro.y, 'z': response.gyro.z},
326                'accel': {'x': response.accel.x, 'y': response.accel.y, 'z': response.accel.z}
327            }

Class for IMU-related operations.

HAL.IMU(stub)
308        def __init__(self, stub):
309            self.__stub = stub
def get_data(self) -> Dict[str, Dict[str, float]]:
311        def get_data(self) -> Dict[str, Dict[str, float]]:
312            """
313            Get current IMU sensor data including gyroscope and accelerometer readings.
314
315            Returns:
316                Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data:
317
318                    {
319                        'gyro': {'x': float, 'y': float, 'z': float},  # Angular velocity in degrees/second
320                        'accel': {'x': float, 'y': float, 'z': float}  # Linear acceleration in g
321                    }
322            """
323            response = self.__stub.GetImuData(hal_pb_pb2.Empty())
324            return {
325                'gyro': {'x': response.gyro.x, 'y': response.gyro.y, 'z': response.gyro.z},
326                'accel': {'x': response.accel.x, 'y': response.accel.y, 'z': response.accel.z}
327            }

Get current IMU sensor data including gyroscope and accelerometer readings.

Returns:

Dict[str, Dict[str, float]]: A dictionary containing gyroscope and accelerometer data:

{
    'gyro': {'x': float, 'y': float, 'z': float},  # Angular velocity in degrees/second
    'accel': {'x': float, 'y': float, 'z': float}  # Linear acceleration in g
}
class HAL.Audio:
329    class Audio:
330        """Class for audio-related operations."""
331
332        def __init__(self, stub):
333            self.__stub = stub
334            self.CHUNK_SIZE = 32768  # 32KB chunks
335
336        def upload_file(self, audio_data: bytes, format: str = "wav") -> Dict[str, Union[str, bool]]:
337            """
338            Upload audio data.
339
340            Args:
341                audio_data (bytes): The audio data
342                format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
343
344            Returns:
345                Dict[str, Union[str, bool]]: A dictionary containing:
346                - 'audio_id': Identifier for the uploaded audio
347                - 'success': Boolean indicating upload success
348
349            Raises:
350                Exception: If there's an error during upload
351            """
352            def chunk_generator():
353                timestamp = int(time.time() * 1000)  # Current time in milliseconds
354                for i in range(0, len(audio_data), self.CHUNK_SIZE):
355                    chunk = hal_pb_pb2.AudioChunk(
356                        data=audio_data[i:i + self.CHUNK_SIZE],
357                        format=format,
358                        timestamp=timestamp + i  # Incrementing timestamp for ordering
359                    )
360                    yield chunk
361
362            response = self.__stub.UploadAudio(chunk_generator())
363            if response.HasField('success'):
364                return {
365                    'audio_id': response.audio_id,
366                    'success': response.success
367                }
368            else:
369                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")
370
371        def get_recording(self) -> Tuple[bytes, str, int]:
372            """
373            Get recorded audio data as a bytes object.
374
375            Returns:
376                Tuple[bytes, str, int]: Tuple containing:
377
378                - bytes: The complete audio data
379                - str: Audio format
380                - int: Initial timestamp in milliseconds
381            """
382            audio_data = bytearray()
383            format_type = None
384            timestamp = None
385            
386            for chunk in self.__stub.GetRecordedAudio(hal_pb_pb2.Empty()):
387                audio_data.extend(chunk.data)
388                if format_type is None:
389                    format_type = chunk.format
390                if timestamp is None:
391                    timestamp = chunk.timestamp
392            
393            return bytes(audio_data), format_type, timestamp
394
395        def play(self, audio_id: str, volume: float = 1.0) -> None:
396            """
397            Play uploaded audio.
398
399            Args:
400                audio_id (str): ID of the audio to play
401                volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0.
402
403            Raises:
404                ValueError: If volume is not between 0.0 and 1.0
405            """
406            if not 0.0 <= volume <= 1.0:
407                raise ValueError("Volume must be between 0.0 and 1.0")
408            
409            request = hal_pb_pb2.PlayRequest(audio_id=audio_id, volume=volume)
410            self.__stub.PlayAudio(request)
411
412        def start_recording(self, sample_rate: int = 44100, format: str = "wav", channels: int = 1) -> None:
413            """
414            Start audio recording.
415
416            Args:
417                sample_rate (int): Sample rate in Hz. Defaults to 44100.
418                format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
419                channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1.
420            """
421            config = hal_pb_pb2.RecordingConfig(
422                sample_rate=sample_rate,
423                format=format,
424                channels=channels
425            )
426            self.__stub.StartRecording(config)
427
428        def stop_recording(self) -> None:
429            """Stop audio recording."""
430            self.__stub.StopRecording(hal_pb_pb2.Empty())

Class for audio-related operations.

HAL.Audio(stub)
332        def __init__(self, stub):
333            self.__stub = stub
334            self.CHUNK_SIZE = 32768  # 32KB chunks
CHUNK_SIZE
def upload_file( self, audio_data: bytes, format: str = 'wav') -> Dict[str, Union[str, bool]]:
336        def upload_file(self, audio_data: bytes, format: str = "wav") -> Dict[str, Union[str, bool]]:
337            """
338            Upload audio data.
339
340            Args:
341                audio_data (bytes): The audio data
342                format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
343
344            Returns:
345                Dict[str, Union[str, bool]]: A dictionary containing:
346                - 'audio_id': Identifier for the uploaded audio
347                - 'success': Boolean indicating upload success
348
349            Raises:
350                Exception: If there's an error during upload
351            """
352            def chunk_generator():
353                timestamp = int(time.time() * 1000)  # Current time in milliseconds
354                for i in range(0, len(audio_data), self.CHUNK_SIZE):
355                    chunk = hal_pb_pb2.AudioChunk(
356                        data=audio_data[i:i + self.CHUNK_SIZE],
357                        format=format,
358                        timestamp=timestamp + i  # Incrementing timestamp for ordering
359                    )
360                    yield chunk
361
362            response = self.__stub.UploadAudio(chunk_generator())
363            if response.HasField('success'):
364                return {
365                    'audio_id': response.audio_id,
366                    'success': response.success
367                }
368            else:
369                raise Exception(f"Error: {response.error.message} (Code: {response.error.code})")

Upload audio data.

Arguments:
  • audio_data (bytes): The audio data
  • format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
Returns:

Dict[str, Union[str, bool]]: A dictionary containing:

  • 'audio_id': Identifier for the uploaded audio
  • 'success': Boolean indicating upload success
Raises:
  • Exception: If there's an error during upload
def get_recording(self) -> Tuple[bytes, str, int]:
371        def get_recording(self) -> Tuple[bytes, str, int]:
372            """
373            Get recorded audio data as a bytes object.
374
375            Returns:
376                Tuple[bytes, str, int]: Tuple containing:
377
378                - bytes: The complete audio data
379                - str: Audio format
380                - int: Initial timestamp in milliseconds
381            """
382            audio_data = bytearray()
383            format_type = None
384            timestamp = None
385            
386            for chunk in self.__stub.GetRecordedAudio(hal_pb_pb2.Empty()):
387                audio_data.extend(chunk.data)
388                if format_type is None:
389                    format_type = chunk.format
390                if timestamp is None:
391                    timestamp = chunk.timestamp
392            
393            return bytes(audio_data), format_type, timestamp

Get recorded audio data as a bytes object.

Returns:

Tuple[bytes, str, int]: Tuple containing:

  • bytes: The complete audio data
  • str: Audio format
  • int: Initial timestamp in milliseconds
def play(self, audio_id: str, volume: float = 1.0) -> None:
395        def play(self, audio_id: str, volume: float = 1.0) -> None:
396            """
397            Play uploaded audio.
398
399            Args:
400                audio_id (str): ID of the audio to play
401                volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0.
402
403            Raises:
404                ValueError: If volume is not between 0.0 and 1.0
405            """
406            if not 0.0 <= volume <= 1.0:
407                raise ValueError("Volume must be between 0.0 and 1.0")
408            
409            request = hal_pb_pb2.PlayRequest(audio_id=audio_id, volume=volume)
410            self.__stub.PlayAudio(request)

Play uploaded audio.

Arguments:
  • audio_id (str): ID of the audio to play
  • volume (float): Playback volume from 0.0 to 1.0. Defaults to 1.0.
Raises:
  • ValueError: If volume is not between 0.0 and 1.0
def start_recording( self, sample_rate: int = 44100, format: str = 'wav', channels: int = 1) -> None:
412        def start_recording(self, sample_rate: int = 44100, format: str = "wav", channels: int = 1) -> None:
413            """
414            Start audio recording.
415
416            Args:
417                sample_rate (int): Sample rate in Hz. Defaults to 44100.
418                format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
419                channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1.
420            """
421            config = hal_pb_pb2.RecordingConfig(
422                sample_rate=sample_rate,
423                format=format,
424                channels=channels
425            )
426            self.__stub.StartRecording(config)

Start audio recording.

Arguments:
  • sample_rate (int): Sample rate in Hz. Defaults to 44100.
  • format (str): Audio format (e.g., 'wav'). Defaults to 'wav'.
  • channels (int): Number of audio channels (1 for mono, 2 for stereo). Defaults to 1.
def stop_recording(self) -> None:
428        def stop_recording(self) -> None:
429            """Stop audio recording."""
430            self.__stub.StopRecording(hal_pb_pb2.Empty())

Stop audio recording.