Source code for dagger.analog

"""Get the Analog data from Pluto."""
import struct
import time
from dagger.utils import get_direction_in_bytes, get_header_bytes, calculate_crc, ZERO


[docs]class AnalogData: """Formats the Battery Voltage, Power Meter, RSSI and Amperage values. Attributes ---------- vbat : int Battery Voltage values of pluto in the ``1/10 Volts`` Units. int_power_meter_sum : int Power Meter values of pluto. rssi : int RSSI values of pluto. amperage : int Amperage values of pluto. timstamp : float timestamp values of pluto in the ``seconds`` Units. Example ------- >>> AnalogData.vbat """ def __init__(self, vbat, int_power_meter_sum, rssi, amperage): """Defines the class variables.""" self.vbat = vbat self.int_power_meter_sum = int_power_meter_sum self.rssi = rssi self.amperage = amperage self.timestamp = time.time()
[docs]class Analog: """Get the Analog data from Pluto. Parameters ---------- connection : PlutoConnection Pluto connection object for communicating with ``Pluto``. Examples -------- >>> t = dagger.PlutoConnection() >>> t.connect(('Pluto_IP', Pluto_port)) >>> analog = dagger.Analog(t) """ __msg_code = 110 __msg_length = 7 def __init__(self, connection): """Defines the class variable.""" self._connection = connection self.analog = {}
[docs] def get_analog_data(self): """Get the Analog OUT package from pluto. Returns ------- AnalogData The vbat, int_power_meter_sum, rssi, amperage and timestamp values. Examples -------- >>> data = analog.get_analog_data() >>> data.vbat """ header = get_header_bytes() direction = get_direction_in_bytes() length_bytes = bytearray(ZERO.to_bytes(1, byteorder="little")) code_bytes = bytearray(Analog.__msg_code.to_bytes(1, byteorder="little")) message = length_bytes + code_bytes crc = calculate_crc(message) packet = header + direction + message packet.append(crc) self._connection.send(packet) try: data = self.__response() return data except: print("Data not recieved.")
def __response(self): """Recieves the OUT packages.""" while True: header = struct.unpack("c", self._connection.recv(1))[0] if header.decode("utf-8") == "$": header_m = struct.unpack("c", self._connection.recv(1))[0] if header_m.decode("utf-8") == "M": direction = struct.unpack("c", self._connection.recv(1))[0] if direction.decode("utf-8") == ">": size = struct.unpack("B", self._connection.recv(1))[0] code = struct.unpack("B", self._connection.recv(1))[0] if size == self.__msg_length and code == self.__msg_code: data = self._connection.recv(size) temp = struct.unpack("<BHHH", data) vbat = float(temp[0]) int_power_meter_sum = float(temp[1]) rssi = float(temp[2]) amperage = float(temp[3]) analog_data = AnalogData(vbat, int_power_meter_sum, rssi, amperage) return analog_data