Commit 61a44a42 authored by Nikita Yurishev's avatar Nikita Yurishev

changed variables name and add service files

parent fdbf4366
......@@ -8,148 +8,155 @@ import sys
import socket
#from influxWriter import InfluxWriter
from influxConnection import InfluxConnection
CO2_USB_MFG = 'Holtek'
CO2_USB_PRD = 'USB-zyTemp'
CO2_USB_MANUFACTURER = 'Holtek'
CO2_USB_PRODUCT = 'USB-zyTemp'
# Ignore first 5 measurements during self-calibration after power-up
IGNORE_N_MEASUREMENTS = 5
IGNORE_FIRST_N_MEASUREMENTS = 5
l = log.getLogger('zytemp')
logger = log.getLogger('zytemp')
_CO2MON_MAGIC_WORD = b'Htemp99e'
_CO2MON_MAGIC_TABLE = (0, 0, 0, 0, 0, 0, 0, 0)
CO2MON_MAGIC_WORD = b'Htemp99e'
CO2MON_MAGIC_TABLE = (0, 0, 0, 0, 0, 0, 0, 0)
def list_to_longint(x):
return sum([val << (i * 8) for i, val in enumerate(x[::-1])])
def list_to_longint(byte_list):
return sum([val << (i * 8) for i, val in enumerate(byte_list[::-1])])
def longint_to_list(x):
return [(x >> i) & 0xFF for i in (56, 48, 40, 32, 24, 16, 8, 0)]
def longint_to_list(integer_value):
return [(integer_value >> i) & 0xFF for i in (56, 48, 40, 32, 24, 16, 8, 0)]
class ZyTemp():
MEASUREMENTS = {
0x50: {
'name': 'CO2',
'unit': 'ppm',
'conversion': lambda x: x,
'conversion': lambda value: value,
},
0x42: {
'name': 'Temperature',
'unit': '°C',
'conversion': lambda x: (x * 0.0625) - 273.15,
'conversion': lambda value: (value * 0.0625) - 273.15,
},
}
def __init__(self, hiddev, decrypt=True):
self.h = hiddev
self.measurements_to_ignore = IGNORE_N_MEASUREMENTS
self.values = {v['name']: None for v in ZyTemp.MEASUREMENTS.values()}
def __init__(self, hid_device, decrypt=True):
self.hid_device = hid_device
self.measurements_to_ignore = IGNORE_FIRST_N_MEASUREMENTS
self.sensor_values = {v['name']: None for v in ZyTemp.MEASUREMENTS.values()}
self._magic_word = [((w << 4) & 0xFF) | (w >> 4) for w in bytearray(_CO2MON_MAGIC_WORD)]
self._magic_table = _CO2MON_MAGIC_TABLE
self._magic_table_int = list_to_longint(_CO2MON_MAGIC_TABLE)
self.magic_word_bytes = [((word << 4) & 0xFF) | (word >> 4) for word in bytearray(CO2MON_MAGIC_WORD)]
self.magic_table = CO2MON_MAGIC_TABLE
self.magic_table_integer = list_to_longint(CO2MON_MAGIC_TABLE)
#self.influx_writer = InfluxWriter()
self.influx_writer = InfluxConnection() # Создаем объект для работы с InfluxDB
self.influx_connection = InfluxConnection() # Создаем объект для работы с InfluxDB
if decrypt:
self.h.send_feature_report(self._magic_table)
self.hid_device.send_feature_report(self.magic_table)
else:
self.h.send_feature_report(b'\xc4\xc6\xc0\x92\x40\x23\xdc\x96')
self.hid_device.send_feature_report(b'\xc4\xc6\xc0\x92\x40\x23\xdc\x96')
def __del__(self):
self.h.close()
self.hid_device.close()
def update(self, key, value, values):
values[key] = value
def update_sensor_value(self, measurement_key, measurement_value, sensor_values):
sensor_values[measurement_key] = measurement_value
if all(value is not None for value in values.values()):
if all(value is not None for value in sensor_values.values()):
sensor_id = socket.gethostname()
print(f"{sensor_id}: {values}")
#Проверка записи в InfluxDB
print(f"{sensor_id}: {sensor_values}")
# Проверка записи в InfluxDB
try:
self.influx_writer.write_data(sensor_id, values) # Записываем данные в InfluxDB
self.influx_connection.write_data(sensor_id, sensor_values) # Записываем данные в InfluxDB
except Exception as e:
l.error("failed to write data into InfluxDB: {e} ")
logger.error(f"Failed to write data into InfluxDB: {e}")
sys.exit(0)
#!
def run_once(self, decrypt=False):
while True:
try:
r = self.h.read(8, timeout_ms=5000)
report_data = self.hid_device.read(8, timeout_ms=5000)
except OSError as err:
l.error(f'OS error: {err}')
logger.error(f'OS error: {err}')
return
if not r:
l.error(f'Read error or timeout')
self.h.close()
if not report_data:
logger.error('Read error or timeout')
self.hid_device.close()
return
if decrypt:
msg = list_to_longint([r[i] for i in [2, 4, 0, 7, 1, 6, 5, 3]])
res = msg ^ self._magic_table_int
res = (res >> 3) | ((res << 61) & 0xFFFFFFFFFFFFFFFF)
res = longint_to_list(res)
r = [(r - mw) & 0xFF for r, mw in zip(res, self._magic_word)]
if len(r) < 5 or r[4] != 0x0d:
l.debug(f'Unexpected data from device: {r}\n')
message = list_to_longint([report_data[i] for i in [2, 4, 0, 7, 1, 6, 5, 3]])
result = message ^ self.magic_table_integer
result = (result >> 3) | ((result << 61) & 0xFFFFFFFFFFFFFFFF)
result = longint_to_list(result)
report_data = [(data - magic_word) & 0xFF for data, magic_word in zip(result, self.magic_word_bytes)]
if len(report_data) < 5 or report_data[4] != 0x0d:
logger.debug(f'Unexpected data from device: {report_data}\n')
continue
if r[3] != sum(r[0:3]) & 0xff:
if report_data[3] != sum(report_data[0:3]) & 0xff:
continue
m_type = r[0]
m_val = r[1] << 8 | r[2]
measurement_type = report_data[0]
measurement_value = report_data[1] << 8 | report_data[2]
try:
m = ZyTemp.MEASUREMENTS[m_type]
measurement = ZyTemp.MEASUREMENTS[measurement_type]
except KeyError:
l.debug(f'Unknown key {m_type:02x}\n')
logger.debug(f'Unknown key {measurement_type:02x}\n')
continue
m_name, m_unit, m_reading = m['name'], m['unit'], m['conversion'](m_val)
measurement_name, measurement_unit, measurement_converted_value = (
measurement['name'],
measurement['unit'],
measurement['conversion'](measurement_value)
)
ignore = self.measurements_to_ignore > 0
ignore_measurement = self.measurements_to_ignore > 0
l.debug(f'{m_name}: {m_reading:g} {m_unit}' + (' (ignored)' if ignore else '') + '\n')
logger.debug(f'{measurement_name}: {measurement_converted_value:g} {measurement_unit}' +
(' (ignored)' if ignore_measurement else '') + '\n')
if not ignore:
self.update(m_name, m_reading, self.values)
if not ignore_measurement:
self.update_sensor_value(measurement_name, measurement_converted_value, self.sensor_values)
if self.measurements_to_ignore:
self.measurements_to_ignore -= 1
def get_hiddev():
def get_hid_device():
hid_sensors = [
e for e in hid.enumerate()
if e['manufacturer_string'] == CO2_USB_MFG
and e['product_string'] == CO2_USB_PRD
sensor for sensor in hid.enumerate()
if sensor['manufacturer_string'] == CO2_USB_MANUFACTURER
and sensor['product_string'] == CO2_USB_PRODUCT
]
p = []
for s in hid_sensors:
intf, path, vid, pid = (s[k] for k in ('interface_number', 'path', 'vendor_id', 'product_id'))
sensor_paths = []
for sensor in hid_sensors:
interface_number, path, vendor_id, product_id = (sensor[key] for key in
('interface_number', 'path', 'vendor_id', 'product_id'))
path_str = path.decode('utf-8')
l.info(f'Found CO2 sensor at intf. {intf}, {path_str}, VID={vid:04x}, PID={pid:04x}')
p.append(path)
if not p:
l.error('No device found')
logger.info(f'Found CO2 sensor at interface {interface_number}, {path_str}, VID={vendor_id:04x}, PID={product_id:04x}')
sensor_paths.append(path)
if not sensor_paths:
logger.error('No device found')
return None
l.info(f'Using device at {p[0].decode("utf-8")}')
h = hid.device()
h.open_path(p[0])
return h
logger.info(f'Using device at {sensor_paths[0].decode("utf-8")}')
hid_device = hid.device()
hid_device.open_path(sensor_paths[0])
return hid_device
if __name__ == "__main__":
log.basicConfig(level=log.DEBUG)
hiddev = get_hiddev()
if hiddev is None:
hid_device = get_hid_device()
if hid_device is None:
sys.exit(1)
zytemp = ZyTemp(hiddev)
zytemp.run_once()
zytemp_sensor = ZyTemp(hid_device)
zytemp_sensor.run_once()
temperature = zytemp.values.get('Temperature', 'N/A')
co2 = zytemp.values.get('CO2', 'N/A')
temperature = zytemp_sensor.sensor_values.get('Temperature', 'N/A')
co2_level = zytemp_sensor.sensor_values.get('CO2', 'N/A')
[Unit]
Description=co2 Service
After=network.target
[Service]
WorkingDirectory=/srv/nikyur/Projects/co2git/co2
ExecStart=/usr/bin/python3 /srv/nikyur/Projects/co2git/co2/co2.py
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
\ No newline at end of file
[Unit]
Description=co2 Service every minute
[Timer]
OnCalendar=*:0/1
Unit=co2.service
[Install]
WantedBy=timers.target
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment