Skip to content

ruff format + github action to check formatting #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/ruff.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: Ruff
on: [ push, pull_request ]
jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/ruff-action@v3

26 changes: 16 additions & 10 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@

"""
Basic usage of PPK2 Python API.
The basic ampere mode sequence is:
1. read modifiers
2. set ampere mode
3. read stream of data
"""

import time
from ppk2_api.ppk2_api import PPK2_API

Expand All @@ -22,17 +22,20 @@
ppk2_test.get_modifiers()
ppk2_test.set_source_voltage(3300)

ppk2_test.use_source_meter() # set source meter mode
ppk2_test.toggle_DUT_power("ON") # enable DUT power
# set source meter mode
ppk2_test.use_source_meter()
# enable DUT power
ppk2_test.toggle_DUT_power("ON")
# start measuring
ppk2_test.start_measuring()

ppk2_test.start_measuring() # start measuring
# measurements are a constant stream of bytes
# the number of measurements in one sampling period depends on the wait between serial reads
# it appears the maximum number of bytes received is 1024
# the sampling rate of the PPK2 is 100 samples per millisecond
for i in range(0, 1000):
read_data = ppk2_test.get_data()
if read_data != b'':
if read_data != b"":
samples, raw_digital = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")

Expand All @@ -46,14 +49,15 @@
print()
time.sleep(0.01)

ppk2_test.toggle_DUT_power("OFF") # disable DUT power

ppk2_test.use_ampere_meter() # set ampere meter mode
# disable DUT power
ppk2_test.toggle_DUT_power("OFF")
# set ampere meter mode
ppk2_test.use_ampere_meter()

ppk2_test.start_measuring()
for i in range(0, 1000):
read_data = ppk2_test.get_data()
if read_data != b'':
if read_data != b"":
samples, raw_digital = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")

Expand All @@ -65,6 +69,8 @@
# Print last 10 values of each channel
print(ch[-10:])
print()
time.sleep(0.01) # lower time between sampling -> less samples read in one sampling period

# lower time between sampling -> less samples read in one sampling period
time.sleep(0.01)

ppk2_test.stop_measuring()
35 changes: 25 additions & 10 deletions example_mp.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@

"""
Basic usage of PPK2 Python API - multiprocessing version.
The basic ampere mode sequence is:
1. read modifiers
2. set ampere mode
3. read stream of data
"""

import time
from ppk2_api.ppk2_api import PPK2_MP as PPK2_API

Expand All @@ -18,24 +18,34 @@
print(f"Too many connected PPK2's: {ppk2s_connected}")
exit()

ppk2_test = PPK2_API(ppk2_port, buffer_max_size_seconds=1, buffer_chunk_seconds=0.01, timeout=1, write_timeout=1, exclusive=True)
ppk2_test = PPK2_API(
ppk2_port,
buffer_max_size_seconds=1,
buffer_chunk_seconds=0.01,
timeout=1,
write_timeout=1,
exclusive=True,
)
ppk2_test.get_modifiers()
ppk2_test.set_source_voltage(3300)

"""
Source mode example
"""
ppk2_test.use_source_meter() # set source meter mode
ppk2_test.toggle_DUT_power("ON") # enable DUT power
# set source meter mode
ppk2_test.use_source_meter()
# enable DUT power
ppk2_test.toggle_DUT_power("ON")
# start measuring
ppk2_test.start_measuring()

ppk2_test.start_measuring() # start measuring
# measurements are a constant stream of bytes
# the number of measurements in one sampling period depends on the wait between serial reads
# it appears the maximum number of bytes received is 1024
# the sampling rate of the PPK2 is 100 samples per millisecond
while True:
read_data = ppk2_test.get_data()
if read_data != b'':
if read_data != b"":
samples, raw_digital = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")

Expand All @@ -50,18 +60,21 @@

time.sleep(0.001)

ppk2_test.toggle_DUT_power("OFF") # disable DUT power

# disable DUT power
ppk2_test.toggle_DUT_power("OFF")
ppk2_test.stop_measuring()

"""
Ampere mode example
"""
ppk2_test.use_ampere_meter() # set ampere meter mode
# set ampere meter mode
ppk2_test.use_ampere_meter()

ppk2_test.start_measuring()
while True:
read_data = ppk2_test.get_data()
if read_data != b'':
if read_data != b"":
samples, raw_digital = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")

Expand All @@ -73,6 +86,8 @@
# Print last 10 values of each channel
print(ch[-10:])
print()
time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period

# lower time between sampling -> less samples read in one sampling period
time.sleep(0.001)

ppk2_test.stop_measuring()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ def read(*names, **kwargs):
"License :: OSI Approved :: GNU General Public License v2 (GPLv2)",
"Operating System :: OS Independent",
],
)
)
87 changes: 56 additions & 31 deletions src/power_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
import csv
import datetime
from threading import Thread

# import numpy as np
# import matplotlib.pyplot as plt
# import matplotlib
from ppk2_api.ppk2_api import PPK2_MP as PPK2_API

class PowerProfiler():

class PowerProfiler:
def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
"""Initialize PPK2 power profiler with serial"""
self.measuring = None
self.measurement_thread = None
self.ppk2 = None

print(f"Initing power profiler")
print("Initing power profiler")

# try:
if serial_port:
Expand All @@ -26,7 +28,8 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
self.ppk2 = PPK2_API(serial_port)

try:
ret = self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct
# try to read modifiers, if it fails serial port is probably not correct
ret = self.ppk2.get_modifiers()
print(f"Initialized ppk2 api: {ret}")
except Exception as e:
print(f"Error initializing power profiler: {e}")
Expand All @@ -35,13 +38,16 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):

if not ret:
self.ppk2 = None
raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}")
raise Exception(
f"Error when initing PowerProfiler with serial port {serial_port}"
)
else:
self.ppk2.use_source_meter()

self.source_voltage_mV = source_voltage_mV

self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V
# set to 3.3V
self.ppk2.set_source_voltage(self.source_voltage_mV)

print(f"Set power profiler source voltage: {self.source_voltage_mV}")

Expand All @@ -62,7 +68,7 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):
# write to csv
self.filename = filename
if self.filename is not None:
with open(self.filename, 'w', newline='') as file:
with open(self.filename, "w", newline="") as file:
writer = csv.writer(file)
row = []
for key in ["ts", "avg1000"]:
Expand All @@ -71,10 +77,10 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None):

def write_csv_rows(self, samples):
"""Write csv row"""
with open(self.filename, 'a', newline='') as file:
with open(self.filename, "a", newline="") as file:
writer = csv.writer(file)
for sample in samples:
row = [datetime.datetime.now().strftime('%d-%m-%Y %H:%M:%S.%f'), sample]
row = [datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f"), sample]
writer.writerow(row)

def delete_power_profiler(self):
Expand All @@ -85,26 +91,26 @@ def delete_power_profiler(self):
print("Deleting power profiler")

if self.measurement_thread:
print(f"Joining measurement thread")
print("Joining measurement thread")
self.measurement_thread.join()
self.measurement_thread = None

if self.ppk2:
print(f"Disabling ppk2 power")
print("Disabling ppk2 power")
self.disable_power()
del self.ppk2

print(f"Deleted power profiler")
print("Deleted power profiler")

def discover_port(self):
"""Discovers ppk2 serial port"""
ppk2s_connected = PPK2_API.list_devices()
if(len(ppk2s_connected) == 1):
if len(ppk2s_connected) == 1:
ppk2_port = ppk2s_connected[0]
print(f'Found PPK2 at {ppk2_port}')
print(f"Found PPK2 at {ppk2_port}")
return ppk2_port
else:
print(f'Too many connected PPK2\'s: {ppk2s_connected}')
print(f"Too many connected PPK2's: {ppk2s_connected}")
return None

def enable_power(self):
Expand All @@ -124,16 +130,21 @@ def disable_power(self):
def measurement_loop(self):
"""Endless measurement loop will run in a thread"""
while True and not self.stop:
if self.measuring: # read data if currently measuring
# read data if currently measuring
if self.measuring:
read_data = self.ppk2.get_data()
if read_data != b'':
if read_data != b"":
samples = self.ppk2.get_samples(read_data)
self.current_measurements += samples # can easily sum lists, will append individual data
time.sleep(0.001) # TODO figure out correct sleep duration
# can easily sum lists, will append individual data
self.current_measurements += samples
# TODO figure out correct sleep duration
time.sleep(0.001)

def _average_samples(self, list, window_size):
"""Average samples based on window size"""
chunks = [list[val:val + window_size] for val in range(0, len(list), window_size)]
chunks = [
list[val : val + window_size] for val in range(0, len(list), window_size)
]
avgs = []
for chunk in chunks:
avgs.append(sum(chunk) / len(chunk))
Expand All @@ -142,19 +153,24 @@ def _average_samples(self, list, window_size):

def start_measuring(self):
"""Start measuring"""
if not self.measuring: # toggle measuring flag only if currently not measuring
self.current_measurements = [] # reset current measurements
self.measuring = True # set internal flag
self.ppk2.start_measuring() # send command to ppk2
# toggle measuring flag only if currently not measuring
if not self.measuring:
# reset current measurements
self.current_measurements = []
# set internal flag
self.measuring = True
# send command to ppk2
self.ppk2.start_measuring()
self.measurement_start_time = time.time()

def stop_measuring(self):
"""Stop measuring and return average of period"""
self.measurement_stop_time = time.time()
self.measuring = False
self.ppk2.stop_measuring() # send command to ppk2
# send command to ppk2
self.ppk2.stop_measuring()

#samples_average = self._average_samples(self.current_measurements, 1000)
# samples_average = self._average_samples(self.current_measurements, 1000)
if self.filename is not None:
self.write_csv_rows(self.current_measurements)

Expand All @@ -172,24 +188,33 @@ def get_average_current_mA(self):
if len(self.current_measurements) == 0:
return 0

average_current_mA = (sum(self.current_measurements) / len(self.current_measurements)) / 1000 # measurements are in microamperes, divide by 1000
# measurements are in microamperes, divide by 1000
average_current_mA = (
sum(self.current_measurements) / len(self.current_measurements)
) / 1000
return average_current_mA

def get_average_power_consumption_mWh(self):
"""Return average power consumption of last measurement in mWh"""
average_current_mA = self.get_average_current_mA()
average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA # divide by 1000 as source voltage is in millivolts - this gives us milliwatts
measurement_duration_h = self.get_measurement_duration_s() / 3600 # duration in seconds, divide by 3600 to get hours
# divide by 1000 as source voltage is in millivolts - this gives us milliwatts
average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA
# duration in seconds, divide by 3600 to get hours
measurement_duration_h = self.get_measurement_duration_s() / 3600
average_consumption_mWh = average_power_mW * measurement_duration_h
return average_consumption_mWh

def get_average_charge_mC(self):
"""Returns average charge in milli coulomb"""
average_current_mA = self.get_average_current_mA()
measurement_duration_s = self.get_measurement_duration_s() # in seconds
# in seconds
measurement_duration_s = self.get_measurement_duration_s()
return average_current_mA * measurement_duration_s

def get_measurement_duration_s(self):
"""Returns duration of measurement"""
measurement_duration_s = (self.measurement_stop_time - self.measurement_start_time) # measurement duration in seconds
return measurement_duration_s
# measurement duration in seconds
measurement_duration_s = (
self.measurement_stop_time - self.measurement_start_time
)
return measurement_duration_s
Loading