2
2
import csv
3
3
import datetime
4
4
from threading import Thread
5
+
5
6
# import numpy as np
6
7
# import matplotlib.pyplot as plt
7
8
# import matplotlib
8
9
from ppk2_api .ppk2_api import PPK2_MP as PPK2_API
9
10
10
- class PowerProfiler ():
11
+
12
+ class PowerProfiler :
11
13
def __init__ (self , serial_port = None , source_voltage_mV = 3300 , filename = None ):
12
14
"""Initialize PPK2 power profiler with serial"""
13
15
self .measuring = None
14
16
self .measurement_thread = None
15
17
self .ppk2 = None
16
18
17
- print (f "Initing power profiler" )
19
+ print ("Initing power profiler" )
18
20
19
21
# try:
20
22
if serial_port :
@@ -26,7 +28,8 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
26
28
self .ppk2 = PPK2_API (serial_port )
27
29
28
30
try :
29
- ret = self .ppk2 .get_modifiers () # try to read modifiers, if it fails serial port is probably not correct
31
+ # try to read modifiers, if it fails serial port is probably not correct
32
+ ret = self .ppk2 .get_modifiers ()
30
33
print (f"Initialized ppk2 api: { ret } " )
31
34
except Exception as e :
32
35
print (f"Error initializing power profiler: { e } " )
@@ -35,13 +38,16 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
35
38
36
39
if not ret :
37
40
self .ppk2 = None
38
- raise Exception (f"Error when initing PowerProfiler with serial port { serial_port } " )
41
+ raise Exception (
42
+ f"Error when initing PowerProfiler with serial port { serial_port } "
43
+ )
39
44
else :
40
45
self .ppk2 .use_source_meter ()
41
46
42
47
self .source_voltage_mV = source_voltage_mV
43
48
44
- self .ppk2 .set_source_voltage (self .source_voltage_mV ) # set to 3.3V
49
+ # set to 3.3V
50
+ self .ppk2 .set_source_voltage (self .source_voltage_mV )
45
51
46
52
print (f"Set power profiler source voltage: { self .source_voltage_mV } " )
47
53
@@ -62,7 +68,7 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
62
68
# write to csv
63
69
self .filename = filename
64
70
if self .filename is not None :
65
- with open (self .filename , 'w' , newline = '' ) as file :
71
+ with open (self .filename , "w" , newline = "" ) as file :
66
72
writer = csv .writer (file )
67
73
row = []
68
74
for key in ["ts" , "avg1000" ]:
@@ -71,10 +77,10 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
71
77
72
78
def write_csv_rows (self , samples ):
73
79
"""Write csv row"""
74
- with open (self .filename , 'a' , newline = '' ) as file :
80
+ with open (self .filename , "a" , newline = "" ) as file :
75
81
writer = csv .writer (file )
76
82
for sample in samples :
77
- row = [datetime .datetime .now ().strftime (' %d-%m-%Y %H:%M:%S.%f' ), sample ]
83
+ row = [datetime .datetime .now ().strftime (" %d-%m-%Y %H:%M:%S.%f" ), sample ]
78
84
writer .writerow (row )
79
85
80
86
def delete_power_profiler (self ):
@@ -85,26 +91,26 @@ def delete_power_profiler(self):
85
91
print ("Deleting power profiler" )
86
92
87
93
if self .measurement_thread :
88
- print (f "Joining measurement thread" )
94
+ print ("Joining measurement thread" )
89
95
self .measurement_thread .join ()
90
96
self .measurement_thread = None
91
97
92
98
if self .ppk2 :
93
- print (f "Disabling ppk2 power" )
99
+ print ("Disabling ppk2 power" )
94
100
self .disable_power ()
95
101
del self .ppk2
96
102
97
- print (f "Deleted power profiler" )
103
+ print ("Deleted power profiler" )
98
104
99
105
def discover_port (self ):
100
106
"""Discovers ppk2 serial port"""
101
107
ppk2s_connected = PPK2_API .list_devices ()
102
- if ( len (ppk2s_connected ) == 1 ) :
108
+ if len (ppk2s_connected ) == 1 :
103
109
ppk2_port = ppk2s_connected [0 ]
104
- print (f' Found PPK2 at { ppk2_port } ' )
110
+ print (f" Found PPK2 at { ppk2_port } " )
105
111
return ppk2_port
106
112
else :
107
- print (f' Too many connected PPK2\ ' s: { ppk2s_connected } ' )
113
+ print (f" Too many connected PPK2's: { ppk2s_connected } " )
108
114
return None
109
115
110
116
def enable_power (self ):
@@ -124,16 +130,21 @@ def disable_power(self):
124
130
def measurement_loop (self ):
125
131
"""Endless measurement loop will run in a thread"""
126
132
while True and not self .stop :
127
- if self .measuring : # read data if currently measuring
133
+ # read data if currently measuring
134
+ if self .measuring :
128
135
read_data = self .ppk2 .get_data ()
129
- if read_data != b'' :
136
+ if read_data != b"" :
130
137
samples = self .ppk2 .get_samples (read_data )
131
- self .current_measurements += samples # can easily sum lists, will append individual data
132
- time .sleep (0.001 ) # TODO figure out correct sleep duration
138
+ # can easily sum lists, will append individual data
139
+ self .current_measurements += samples
140
+ # TODO figure out correct sleep duration
141
+ time .sleep (0.001 )
133
142
134
143
def _average_samples (self , list , window_size ):
135
144
"""Average samples based on window size"""
136
- chunks = [list [val :val + window_size ] for val in range (0 , len (list ), window_size )]
145
+ chunks = [
146
+ list [val : val + window_size ] for val in range (0 , len (list ), window_size )
147
+ ]
137
148
avgs = []
138
149
for chunk in chunks :
139
150
avgs .append (sum (chunk ) / len (chunk ))
@@ -142,19 +153,24 @@ def _average_samples(self, list, window_size):
142
153
143
154
def start_measuring (self ):
144
155
"""Start measuring"""
145
- if not self .measuring : # toggle measuring flag only if currently not measuring
146
- self .current_measurements = [] # reset current measurements
147
- self .measuring = True # set internal flag
148
- self .ppk2 .start_measuring () # send command to ppk2
156
+ # toggle measuring flag only if currently not measuring
157
+ if not self .measuring :
158
+ # reset current measurements
159
+ self .current_measurements = []
160
+ # set internal flag
161
+ self .measuring = True
162
+ # send command to ppk2
163
+ self .ppk2 .start_measuring ()
149
164
self .measurement_start_time = time .time ()
150
165
151
166
def stop_measuring (self ):
152
167
"""Stop measuring and return average of period"""
153
168
self .measurement_stop_time = time .time ()
154
169
self .measuring = False
155
- self .ppk2 .stop_measuring () # send command to ppk2
170
+ # send command to ppk2
171
+ self .ppk2 .stop_measuring ()
156
172
157
- #samples_average = self._average_samples(self.current_measurements, 1000)
173
+ # samples_average = self._average_samples(self.current_measurements, 1000)
158
174
if self .filename is not None :
159
175
self .write_csv_rows (self .current_measurements )
160
176
@@ -172,24 +188,33 @@ def get_average_current_mA(self):
172
188
if len (self .current_measurements ) == 0 :
173
189
return 0
174
190
175
- average_current_mA = (sum (self .current_measurements ) / len (self .current_measurements )) / 1000 # measurements are in microamperes, divide by 1000
191
+ # measurements are in microamperes, divide by 1000
192
+ average_current_mA = (
193
+ sum (self .current_measurements ) / len (self .current_measurements )
194
+ ) / 1000
176
195
return average_current_mA
177
196
178
197
def get_average_power_consumption_mWh (self ):
179
198
"""Return average power consumption of last measurement in mWh"""
180
199
average_current_mA = self .get_average_current_mA ()
181
- average_power_mW = (self .source_voltage_mV / 1000 ) * average_current_mA # divide by 1000 as source voltage is in millivolts - this gives us milliwatts
182
- measurement_duration_h = self .get_measurement_duration_s () / 3600 # duration in seconds, divide by 3600 to get hours
200
+ # divide by 1000 as source voltage is in millivolts - this gives us milliwatts
201
+ average_power_mW = (self .source_voltage_mV / 1000 ) * average_current_mA
202
+ # duration in seconds, divide by 3600 to get hours
203
+ measurement_duration_h = self .get_measurement_duration_s () / 3600
183
204
average_consumption_mWh = average_power_mW * measurement_duration_h
184
205
return average_consumption_mWh
185
206
186
207
def get_average_charge_mC (self ):
187
208
"""Returns average charge in milli coulomb"""
188
209
average_current_mA = self .get_average_current_mA ()
189
- measurement_duration_s = self .get_measurement_duration_s () # in seconds
210
+ # in seconds
211
+ measurement_duration_s = self .get_measurement_duration_s ()
190
212
return average_current_mA * measurement_duration_s
191
213
192
214
def get_measurement_duration_s (self ):
193
215
"""Returns duration of measurement"""
194
- measurement_duration_s = (self .measurement_stop_time - self .measurement_start_time ) # measurement duration in seconds
195
- return measurement_duration_s
216
+ # measurement duration in seconds
217
+ measurement_duration_s = (
218
+ self .measurement_stop_time - self .measurement_start_time
219
+ )
220
+ return measurement_duration_s
0 commit comments