Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
C
co2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikita Yurishev
co2
Commits
c7499761
Commit
c7499761
authored
Jul 22, 2024
by
Nikita Yurishev
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
created new project
parents
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
171 additions
and
0 deletions
+171
-0
co2.py
co2.py
+171
-0
No files found.
co2.py
0 → 100644
View file @
c7499761
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
import
os
import
logging
as
log
import
hid
import
time
import
sys
CO2_USB_MFG
=
'Holtek'
CO2_USB_PRD
=
'USB-zyTemp'
# Ignore first 5 measurements during self-calibration after power-up
IGNORE_N_MEASUREMENTS
=
5
l
=
log
.
getLogger
(
'zytemp'
)
_CO2MON_MAGIC_WORD
=
b
'Htemp99e'
_CO2MON_MAGIC_TABLE
=
(
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
)
LOG_FILE
=
'co2data.log'
def
list_to_longint
(
x
):
return
sum
([
val
<<
(
i
*
8
)
for
i
,
val
in
enumerate
(
x
[::
-
1
])])
def
longint_to_list
(
x
):
return
[(
x
>>
i
)
&
0xFF
for
i
in
(
56
,
48
,
40
,
32
,
24
,
16
,
8
,
0
)]
class
ZyTemp
():
MEASUREMENTS
=
{
0x50
:
{
'name'
:
'CO2'
,
'unit'
:
'ppm'
,
'conversion'
:
lambda
x
:
x
,
},
0x42
:
{
'name'
:
'Temperature'
,
'unit'
:
'°C'
,
'conversion'
:
lambda
x
:
(
x
*
0.0625
)
-
273.15
,
},
}
def
__init__
(
self
,
hiddev
,
decrypt
=
True
):
self
.
h
=
hiddev
self
.
measurements_to_ignore
=
IGNORE_N_MEASUREMENTS
self
.
values
=
{
v
[
'name'
]:
None
for
v
in
ZyTemp
.
MEASUREMENTS
.
values
()}
self
.
_magic_word
=
[((
w
<<
4
)
&
0xFF
)
|
(
w
>>
4
)
for
w
in
bytearray
(
_CO2MON_MAGIC_WORD
)]
self
.
_magic_table
=
_CO2MON_MAGIC_TABLE
self
.
_magic_table_int
=
list_to_longint
(
_CO2MON_MAGIC_TABLE
)
if
decrypt
:
self
.
h
.
send_feature_report
(
self
.
_magic_table
)
else
:
self
.
h
.
send_feature_report
(
b
'
\xc4\xc6\xc0\x92\x40\x23\xdc\x96
'
)
def
__del__
(
self
):
self
.
h
.
close
()
def
update
(
self
,
key
,
value
,
values
):
values
[
key
]
=
value
# Проверяем, если обе переменные имеют значения
if
all
(
value
is
not
None
for
value
in
values
.
values
()):
# Запись значений в лог-файл с меткой времени
try
:
with
open
(
LOG_FILE
,
'a'
)
as
f
:
timestamp
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
f
.
write
(
f
"{timestamp} - {values}
\n
"
)
l
.
debug
(
f
"Successfully wrote data to {LOG_FILE}"
)
except
Exception
as
e
:
l
.
error
(
f
"Failed to write data to {LOG_FILE}: {e}"
)
print
(
values
)
# Выход после записи в лог
sys
.
exit
(
0
)
#Функция получения данных с детектора
def
run_once
(
self
,
decrypt
=
False
):
while
True
:
try
:
r
=
self
.
h
.
read
(
8
,
timeout_ms
=
5000
)
except
OSError
as
err
:
l
.
error
(
f
'OS error: {err}'
)
return
if
not
r
:
l
.
error
(
f
'Read error or timeout'
)
self
.
h
.
close
()
return
if
decrypt
:
msg
=
list_to_longint
([
r
[
i
]
for
i
in
[
2
,
4
,
0
,
7
,
1
,
6
,
5
,
3
]])
res
=
msg
^
self
.
_magic_table_int
res
=
(
res
>>
3
)
|
((
res
<<
61
)
&
0xFFFFFFFFFFFFFFFF
)
res
=
longint_to_list
(
res
)
r
=
[(
r
-
mw
)
&
0xFF
for
r
,
mw
in
zip
(
res
,
self
.
_magic_word
)]
if
len
(
r
)
<
5
or
r
[
4
]
!=
0x0d
:
l
.
debug
(
f
'Unexpected data from device: {r}
\n
'
)
continue
if
r
[
3
]
!=
sum
(
r
[
0
:
3
])
&
0xff
:
# l.error(f'Checksum error')
continue
m_type
=
r
[
0
]
m_val
=
r
[
1
]
<<
8
|
r
[
2
]
try
:
m
=
ZyTemp
.
MEASUREMENTS
[
m_type
]
except
KeyError
:
l
.
debug
(
f
'Unknown key {m_type:02x}
\n
'
)
continue
m_name
,
m_unit
,
m_reading
=
m
[
'name'
],
m
[
'unit'
],
m
[
'conversion'
](
m_val
)
ignore
=
self
.
measurements_to_ignore
>
0
l
.
debug
(
f
'{m_name}: {m_reading:g} {m_unit}'
+
(
' (ignored)'
if
ignore
else
''
)
+
'
\n
'
)
if
not
ignore
:
self
.
update
(
m_name
,
m_reading
,
self
.
values
)
if
self
.
measurements_to_ignore
:
self
.
measurements_to_ignore
-=
1
def
get_hiddev
():
hid_sensors
=
[
e
for
e
in
hid
.
enumerate
()
if
e
[
'manufacturer_string'
]
==
CO2_USB_MFG
and
e
[
'product_string'
]
==
CO2_USB_PRD
]
p
=
[]
for
s
in
hid_sensors
:
intf
,
path
,
vid
,
pid
=
(
s
[
k
]
for
k
in
(
'interface_number'
,
'path'
,
'vendor_id'
,
'product_id'
))
path_str
=
path
.
decode
(
'utf-8'
)
l
.
info
(
f
'Found CO2 sensor at intf. {intf}, {path_str}, VID={vid:04x}, PID={pid:04x}'
)
p
.
append
(
path
)
if
not
p
:
l
.
error
(
'No device found'
)
return
None
l
.
info
(
f
'Using device at {p[0].decode("utf-8")}'
)
h
=
hid
.
device
()
h
.
open_path
(
p
[
0
])
return
h
if
__name__
==
"__main__"
:
log
.
basicConfig
(
level
=
log
.
DEBUG
)
# Проверка на наличие файла co2data.log, создание при отсутствии
if
not
os
.
path
.
exists
(
LOG_FILE
):
with
open
(
LOG_FILE
,
'w'
)
as
f
:
f
.
write
(
"CO2 and Temperature data log
\n
"
)
hiddev
=
get_hiddev
()
if
hiddev
is
None
:
sys
.
exit
(
1
)
zytemp
=
ZyTemp
(
hiddev
)
zytemp
.
run_once
()
# Вывод значений переменных temperature и co2 в консоль
temperature
=
zytemp
.
values
.
get
(
'Temperature'
,
'N/A'
)
co2
=
zytemp
.
values
.
get
(
'CO2'
,
'N/A'
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment