Here is the story how I got to this moment...
Step 1: Writing the Image on the RaspberryPi Zero 2W- Download the official Raspberry Pi Imager:
- https://www.raspberrypi.com/software/
- Install and start the program.
- Flash Raspberry Pi OS (Lite) to Your SD Card
- Device: Raspberry Pi Zero 2W
- OS: Raspberry Pi OS Lite (32-bit) (you find it under "other software")
- Storage: your SD card
See image 1
Click NEXT → Imager will ask to customize settings.
Press EDIT SETTINGS.
See image 2
- In File Explorer, you’ll see a small drive called something like boot (FAT32).
- Open that drive.
- In the root of that drive (where you see files like config.txt, cmdline.txt, etc.): Right-click → New → Text Document
- Name it: ssh
- Then remove the.txt extension, so the file is just called ssh (no extension at all).
- If Windows warns you about changing the extension, click Yes.
- The file should now be exactly: boot\ssh (Empty file, no extension.)
- Eject the SD card safely from Windows.
- Put the SD card into the Pi Zero 2 W.
- Power the Pi with your 5V supply and wait ~60–90 seconds (I used the Mini USB port with Power in)
- Open Powershell window as administrator and run this code to check if you have wireless connection:
ping piss.localYou should get something back like this:
PS C:\Users\Michael> ping piss.local
Pinging PISS.local [2a02:1811:e52e:d400:9366:6c4e:93be:a465] with 32 bytes of data:
Reply from 2a02:1811:e52e:d400:9366:6c4e:93be:a465: time=3ms
Reply from 2a02:1811:e52e:d400:9366:6c4e:93be:a465: time=3ms
Reply from 2a02:1811:e52e:d400:9366:6c4e:93be:a465: time=3ms
Reply from 2a02:1811:e52e:d400:9366:6c4e:93be:a465: time=3ms
Ping statistics for 2a02:1811:e52e:d400:9366:6c4e:93be:a465:
Packets: Sent = 4, Received = 4, Lost = 0 (0% loss),
Approximate round trip times in milli-seconds:
Minimum = 3ms, Maximum = 3ms, Average = 3msYou are now ready to start installing and coding the Pi Zero!
Step 2: Installing All Software on the RaspberryPi Zero 2W- Login on the Pi by running this in the PowerShell window as Admin. If you changed the username in the Pi imager you have to change my username with the one you picked.
ssh seafoxc@piss.local- Enter you password
- We first enable I2C interface for the display
sudo raspi-config
Interface Options -> I2C -> Enable- Then Reboot
sudo reboot- Wait 1 minute to reboot and then reconnect
ssh seafoxc@piss.local- Then update your RasberryPi by running these commands:
sudo apt update
sudo apt full-upgrade -y- Then Reboot
sudo reboot- Wait 1 minute to reboot and then reconnect
ssh seafoxc@piss.local- Next we install Python
sudo apt install -y python3-pip python3-dev python3-venv build-essential git- Next we create a project file, a project and a virtual environment to run it by running these 3 commands:
seafoxc@PISS:~ $ mkdir ~/piss_project
seafoxc@PISS:~ $ cd ~/piss_project
seafoxc@PISS:~/piss_project $ python3 -m venv --system-site-packages venv
seafoxc@PISS:~/piss_project $ source venv/bin/activate- We upgrade PIP inside the virtual env.
pip install --upgrade pip- Install libraries you will need (this will take some time)
GPIO is to acces the IO pins of our RaspberryPi Zero 2W
HX711 is for the Weight module
Lightstreamer is to connect to the NASA data
Luma is for the display and -y i2c-tools is to troubleshoot
pip install RPi.GPIO
pip install hx711
pip install lightstreamer-client-lib
pip install --no-build-isolation luma.oled
sudo apt install -y i2c-tools
sudo apt install -y python3-pil python3-smbusAfter this is installed we will put the code on the device:
Still in ~/piss_project with the venv active:
nano iss_urine_test.pyPaste this:
from lightstreamer.client import LightstreamerClient, Subscription, SubscriptionListener
from datetime import datetime, timezone
import time
import sys
SERVER_URL = "https://push.lightstreamer.com"
ADAPTER_SET = "ISSLIVE"
ITEM_NAME = "NODE3000005" # Urine Tank [%]
FIELDS = ["TimeStamp", "Value"]
last_value = None
class UrineSubListener(SubscriptionListener):
def onItemUpdate(self, update):
global last_value
try:
ts_str = update.getValue("TimeStamp")
value_str = update.getValue("Value")
if value_str is None:
return
try:
value = float(value_str)
except ValueError:
return
if last_value is not None and value == last_value:
return
last_value = value
now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
print(f"[{now_utc}] ISS Urine Tank: {value:.1f}% (ISS TimeStamp: {ts_str})")
sys.stdout.flush()
except Exception as e:
print("Error in onItemUpdate:", e, file=sys.stderr)
def main():
client = LightstreamerClient(SERVER_URL, ADAPTER_SET)
client.connectionOptions.setRequestedMaxBandwidth("1.0")
sub = Subscription("MERGE", [ITEM_NAME], FIELDS)
sub.setRequestedSnapshot("yes")
listener = UrineSubListener()
sub.addListener(listener)
client.subscribe(sub)
client.connect()
print("Connected to ISS Live stream. Waiting for urine tank updates...")
print("Press Ctrl+C to stop.")
sys.stdout.flush()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")
finally:
client.unsubscribe(sub)
client.disconnect()
print("Disconnected.")
if __name__ == "__main__":
main()
Save: Ctrl+O, Enter, Ctrl+X.
Save it
Run it:
python3 iss_urine_test.py
You should start seeing lines like:
Connected to ISS Live stream. Waiting for urine tank updates...
[2025-11-23 15:42:10 UTC] ISS Urine Tank: 60.0% (ISS TimeStamp: GMT 321/13:55:34)
...Only when the value changes it is updated
If this works you can build it all.
Step 3: 3D-printingSome parts of this project need to be 3D printed. I used PLA for all components. Since my printer has a maximum build volume of 220 × 220 × 250 mm, I split the base into two separate pieces.
The top part includes added lines to reduce warping, as this was an issue during printing.
When removing the supports from the tower, proceed carefully to avoid damaging the part.
To attach the parts to the base, I used threaded inserts installed with a soldering iron. Ensure proper ventilation during this process to avoid inhaling fumes. The required inserts and bolts are listed in the description.
You also need to drill a hole on the bottem of the bottle for the hose and I sealed it using hot glue.
Make the script '' hx711_calibrate.py''
hx711_calibrate.py
- Reads HX711 load cell (bit-banged)
- Supports tare + calibration with a known weight (grams)
- Shows weight in grams on SH1106 OLED (optional but enabled by default)
- Saves calibration to hx711_cal.json
Pins (BCM):
DT/DOUT = GPIO5
SCK/CLK = GPIO6
"""
from __future__ import annotations
import json
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timezone
import RPi.GPIO as GPIO
# OLED (same stack you used before)
from luma.core.interface.serial import i2c
from luma.oled.device import sh1106
from PIL import Image, ImageDraw
# -------------------- CONFIG --------------------
DT_PIN = 5 # HX711 DT/DOUT (BCM)
SCK_PIN = 6 # HX711 SCK/CLK (BCM)
SAMPLES = 20 # averaging samples per reading
READ_HZ = 5 # console/OLED update rate
OLED_ENABLED = True
I2C_PORT = 1
I2C_ADDR = 0x3C
OLED_ROTATE = 0
OLED_CONTRAST = 255
CAL_FILE = "hx711_cal.json"
# ------------------------------------------------
def utc_str() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
@dataclass
class Calibration:
offset: int = 0 # raw offset (tare)
scale: float = 1.0 # raw units per gram
def to_dict(self):
return {"offset": self.offset, "scale": self.scale}
@staticmethod
def from_dict(d: dict) -> "Calibration":
return Calibration(offset=int(d.get("offset", 0)), scale=float(d.get("scale", 1.0)))
class HX711:
"""
Minimal HX711 reader.
Assumes channel A gain 128 (default) by sending 25 clock pulses total per conversion read.
"""
def __init__(self, dt_pin: int, sck_pin: int):
self.dt = dt_pin
self.sck = sck_pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.sck, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(self.dt, GPIO.IN)
self._power_up()
def cleanup(self):
try:
GPIO.cleanup()
except Exception:
pass
def _power_down(self):
GPIO.output(self.sck, GPIO.LOW)
GPIO.output(self.sck, GPIO.HIGH)
time.sleep(0.001)
def _power_up(self):
GPIO.output(self.sck, GPIO.LOW)
time.sleep(0.001)
def is_ready(self) -> bool:
return GPIO.input(self.dt) == 0
def read_raw(self, timeout_s: float = 1.0) -> int:
"""
Returns signed 24-bit value extended to Python int.
"""
t0 = time.time()
while not self.is_ready():
if time.time() - t0 > timeout_s:
raise TimeoutError("HX711 not ready (DT stayed high). Check wiring/power.")
time.sleep(0.001)
# Read 24 bits
value = 0
for _ in range(24):
GPIO.output(self.sck, GPIO.HIGH)
value = (value << 1) | (1 if GPIO.input(self.dt) else 0)
GPIO.output(self.sck, GPIO.LOW)
# 25th pulse to set gain=128 for next conversion
GPIO.output(self.sck, GPIO.HIGH)
GPIO.output(self.sck, GPIO.LOW)
# Convert from 24-bit two's complement
if value & 0x800000: # negative
value -= 1 << 24
return value
def read_average(self, samples: int = 10) -> int:
total = 0
for _ in range(samples):
total += self.read_raw()
return int(total / samples)
def load_calibration(path: str) -> Calibration:
if not os.path.exists(path):
return Calibration()
with open(path, "r", encoding="utf-8") as f:
return Calibration.from_dict(json.load(f))
def save_calibration(path: str, cal: Calibration) -> None:
with open(path, "w", encoding="utf-8") as f:
json.dump(cal.to_dict(), f, indent=2)
def init_oled():
serial = i2c(port=I2C_PORT, address=I2C_ADDR)
device = sh1106(serial, rotate=OLED_ROTATE)
device.contrast(OLED_CONTRAST)
return device
def oled_render(device, lines: list[str]):
img = Image.new("1", (device.width, device.height), 0)
draw = ImageDraw.Draw(img)
y = 0
for line in lines:
draw.text((0, y), line, fill=1)
y += 11
device.display(img)
def grams_from_raw(raw: int, cal: Calibration) -> float:
# grams = (raw - offset) / scale
if cal.scale == 0:
return 0.0
return (raw - cal.offset) / cal.scale
def prompt_float(prompt: str) -> float:
while True:
s = input(prompt).strip().replace(",", ".")
try:
return float(s)
except ValueError:
print("Enter a number (example: 250).")
def main() -> int:
cal = load_calibration(CAL_FILE)
hx = HX711(DT_PIN, SCK_PIN)
device = None
try:
if OLED_ENABLED:
try:
device = init_oled()
oled_render(device, ["HX711 Scale", "", "Starting...", "", f"UTC {utc_str()}"])
except Exception as e:
print(f"[WARN] OLED init failed ({e}); continuing without OLED.", flush=True)
device = None
print("\nHX711 calibration tool")
print("Commands:")
print(" t = tare (empty scale)")
print(" c = calibrate (place known weight, enter grams)")
print(" r = run (continuous readout)")
print(" q = quit\n")
while True:
cmd = input("Enter command [t/c/r/q]: ").strip().lower()
if cmd == "q":
break
if cmd == "t":
print("[INFO] Tare: remove all weight. Reading...", flush=True)
raw = hx.read_average(SAMPLES)
cal.offset = raw
save_calibration(CAL_FILE, cal)
print(f"[OK] Tare set. offset={cal.offset}", flush=True)
if device:
oled_render(device, ["HX711 Scale", "", "Tare OK", "", f"UTC {utc_str()}"])
elif cmd == "c":
print("[INFO] Calibration: first tare is recommended.", flush=True)
known_g = prompt_float("Enter known weight in grams (from kitchen scale): ")
print("[INFO] Place that weight on the load cell now. Reading...", flush=True)
raw = hx.read_average(SAMPLES)
# scale = (raw - offset) / grams => raw units per gram
delta = raw - cal.offset
if abs(delta) < 1000:
print("[WARN] Delta is very small. Increase weight or check wiring/mechanics.")
cal.scale = delta / known_g if known_g != 0 else cal.scale
save_calibration(CAL_FILE, cal)
g_now = grams_from_raw(raw, cal)
print(f"[OK] Calibrated. scale={cal.scale:.6f} raw/gram (reading now: {g_now:.1f} g)", flush=True)
if device:
oled_render(device, ["HX711 Scale", f"{g_now:.1f} g", "", "Calibrated", f"UTC {utc_str()}"])
elif cmd == "r":
print("[INFO] Running. Ctrl+C to stop.\n", flush=True)
if device:
oled_render(device, ["HX711 Scale", "", "Running...", "", f"UTC {utc_str()}"])
try:
period = 1.0 / max(1, READ_HZ)
while True:
raw = hx.read_average(SAMPLES)
g = grams_from_raw(raw, cal)
print(f"[{utc_str()}] raw={raw:>9d} grams={g:>8.1f} (offset={cal.offset}, scale={cal.scale:.6f})", flush=True)
if device:
oled_render(device, [
"HX711 Scale",
f"{g:,.1f} g".replace(",", " "),
"",
f"raw {raw}",
f"UTC {utc_str()}",
])
time.sleep(period)
except KeyboardInterrupt:
print("\n[INFO] Stopped.\n", flush=True)
else:
print("Unknown command. Use t/c/r/q.", flush=True)
return 0
except TimeoutError as e:
print(f"[ERROR] {e}", file=sys.stderr, flush=True)
return 2
finally:
if device:
try:
oled_render(device, ["HX711 Scale", "", "Bye", "", f"UTC {utc_str()}"])
except Exception:
pass
hx.cleanup()
if __name__ == "__main__":
raise SystemExit(main())Run the script.
First put an empty bottle on the scale and press 't' and write down the offset values. Then fill the bottle with exact 500ml of water and press 'c'. Give the weight in '500' in this case. Write down the offset values.
The values you need to put in the main script.
So now you can run the main script. You can copy paste it here but make sure to weight calibration values from the previous step. (the values are under -------------------- Bottle calibration values ---------------------).
#!/usr/bin/env python3
"""
isslive_monitor.py
ISS urine tank monitor (OLED + console) + bottle scale control (HX711 + 2 pumps)
Changes requested:
- Left-edge artifact line on SH1106:
- Use an X crop + paste method to hide the first columns cleanly (avoids uneven 1–2px stripe).
- Adjust OLED_CROP_LEFT if needed (default 2).
- Pump overtime threshold: 18 seconds (was 60)
- Pump pin change:
- PUMP_FILL_PIN = 27 (GPIO27)
- PUMP_EMPTY_PIN remains GPIO11
- OLED content stays original (no bottle info on OLED), except overtime warning screen.
"""
from __future__ import annotations
import sys
import time
import threading
import traceback
from datetime import datetime, timezone
import RPi.GPIO as GPIO
from lightstreamer.client import LightstreamerClient, Subscription, SubscriptionListener
from luma.core.interface.serial import i2c
from luma.oled.device import sh1106
from PIL import Image, ImageDraw
# -------------------- CONFIG --------------------
# Lightstreamer
SERVER_URL = "https://push.lightstreamer.com"
ADAPTER_SET = "ISSLIVE"
ITEM_NAME = "NODE3000005"
FIELDS = ["TimeStamp", "Value"]
# OLED
I2C_PORT = 1
I2C_ADDR = 0x3C
OLED_ROTATE = 0
OLED_CONTRAST = 255
SCREEN_REFRESH_SEC = 1.0
# SH1106 left-edge artifact handling:
# Many SH1106 modules show an uneven bright stripe on the far-left.
# We draw normally, then crop-left by N pixels and paste back at x=0.
OLED_X_OFFSET = 0 # keep at 0 when using crop method
OLED_CROP_LEFT = 2 # hide 1–4 px; try 2 first, then 3 or 4 if needed
# HX711 (BCM pins)
HX711_DT_PIN = 5 # GPIO5 (Pin 29)
HX711_SCK_PIN = 6 # GPIO6 (Pin 31)
HX711_SAMPLES = 20
# Bottle calibration / control
BOTTLE_FULL_G = 675.0
DEADZONE_G = 50.0
# Pumps (BCM pins)
PUMP_FILL_PIN = 27 # GPIO27 (Physical 13) -> adds liquid
PUMP_EMPTY_PIN = 11 # GPIO11 (Physical 23) -> removes liquid
# MOSFET trigger modules are often "active HIGH" (signal HIGH = ON).
ACTIVE_HIGH = True
# Optional: safety minimum time between pump toggles (seconds)
MIN_SWITCH_INTERVAL_S = 0.5
# Pump overtime safety
PUMP_MAX_CONTINUOUS_SEC = 18.0 # <-- changed from 60 to 18 seconds
OVERTIME_RESET_OFF_SEC = 5.0 # pumps must be OFF this long to auto-clear latch
# ------------------------------------------------
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def utc_str(dt: datetime | None = None) -> str:
if dt is None:
dt = utc_now()
return dt.strftime("%Y-%m-%d %H:%M:%S")
def clamp(x: float, lo: float, hi: float) -> float:
return lo if x < lo else hi if x > hi else x
# -------------------- Shared ISS State --------------------
class SharedState:
def __init__(self):
self.lock = threading.Lock()
self.value: float | None = None # ISS tank percent
self.iss_ts: str | None = None
self.last_rx_utc: datetime | None = None
def update(self, value: float, iss_ts: str | None):
with self.lock:
self.value = value
self.iss_ts = iss_ts
self.last_rx_utc = utc_now()
def snapshot(self):
with self.lock:
return self.value, self.iss_ts, self.last_rx_utc
STATE = SharedState()
# -------------------- OLED --------------------
def init_oled():
serial = i2c(port=I2C_PORT, address=I2C_ADDR)
device = sh1106(serial, rotate=OLED_ROTATE)
device.contrast(OLED_CONTRAST)
return device
def oled_render(device, lines: list[str]):
# Draw full buffer
img = Image.new("1", (device.width, device.height), 0)
draw = ImageDraw.Draw(img)
x = OLED_X_OFFSET
y = 0
for line in lines:
draw.text((x, y), line, fill=1)
y += 11
# Crop-left trick to hide uneven left stripe cleanly
if OLED_CROP_LEFT > 0:
cropped = img.crop((OLED_CROP_LEFT, 0, device.width, device.height))
img2 = Image.new("1", (device.width, device.height), 0)
img2.paste(cropped, (0, 0))
img = img2
device.display(img)
def build_lines(overtime: bool = False) -> list[str]:
if overtime:
return [
"ISS Urine Tank",
"",
"Pump overtime",
"",
f"UTC: {utc_str()}",
]
value, _iss_ts, last_rx = STATE.snapshot()
if value is None:
return [
"ISS Urine Tank",
"",
"Waiting for data",
"",
f"UTC {utc_str()}",
]
age = int((utc_now() - last_rx).total_seconds()) if last_rx else 0
return [
"ISS Urine Tank",
f"{value:.1f} %",
"",
f"Age: {age}s",
f"UTC: {utc_str()}",
]
# -------------------- HX711 (bit-banged) --------------------
class HX711:
def __init__(self, dt_pin: int, sck_pin: int):
self.dt = dt_pin
self.sck = sck_pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.sck, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(self.dt, GPIO.IN)
GPIO.output(self.sck, GPIO.LOW)
time.sleep(0.001)
def is_ready(self) -> bool:
return GPIO.input(self.dt) == 0
def read_raw(self, timeout_s: float = 1.0) -> int:
t0 = time.time()
while not self.is_ready():
if time.time() - t0 > timeout_s:
raise TimeoutError("HX711 not ready (DT stayed high). Check wiring/power.")
time.sleep(0.001)
value = 0
for _ in range(24):
GPIO.output(self.sck, GPIO.HIGH)
value = (value << 1) | (1 if GPIO.input(self.dt) else 0)
GPIO.output(self.sck, GPIO.LOW)
# 25th pulse => gain 128
GPIO.output(self.sck, GPIO.HIGH)
GPIO.output(self.sck, GPIO.LOW)
if value & 0x800000:
value -= 1 << 24
return value
def read_average(self, samples: int) -> int:
total = 0
for _ in range(samples):
total += self.read_raw()
return int(total / samples)
# -------------------- Bottle calibration values --------------------
HX711_OFFSET = -671229
HX711_SCALE = -1060.423246 # raw units per gram
def bottle_grams_from_raw(raw: int) -> float:
if HX711_SCALE == 0:
return 0.0
return (raw - HX711_OFFSET) / HX711_SCALE
def bottle_percent_from_grams(g: float) -> float:
return clamp((g / BOTTLE_FULL_G) * 100.0, 0.0, 100.0)
# -------------------- Pump control --------------------
class PumpController:
def __init__(self, fill_pin: int, empty_pin: int, active_high: bool = True):
self.fill_pin = fill_pin
self.empty_pin = empty_pin
self.active_high = active_high
self.lock = threading.Lock()
self.state = "OFF" # OFF / FILL / EMPTY
self.last_switch = 0.0
self._run_start_ts: float | None = None
self.overtime_latched: bool = False
self._off_since_ts: float | None = None
GPIO.setup(self.fill_pin, GPIO.OUT)
GPIO.setup(self.empty_pin, GPIO.OUT)
self._write(False, False)
def _write(self, fill: bool, empty: bool):
if self.active_high:
GPIO.output(self.fill_pin, GPIO.HIGH if fill else GPIO.LOW)
GPIO.output(self.empty_pin, GPIO.HIGH if empty else GPIO.LOW)
else:
GPIO.output(self.fill_pin, GPIO.LOW if fill else GPIO.HIGH)
GPIO.output(self.empty_pin, GPIO.LOW if empty else GPIO.HIGH)
def _trip_overtime(self, now: float):
self.overtime_latched = True
self._write(False, False)
self.state = "OFF"
self._off_since_ts = now
self._run_start_ts = None
def _update_overtime_state(self, now: float):
if self.state in ("FILL", "EMPTY"):
if self._run_start_ts is None:
self._run_start_ts = now
if (now - self._run_start_ts) >= PUMP_MAX_CONTINUOUS_SEC:
self._trip_overtime(now)
else:
self._run_start_ts = None
if self._off_since_ts is None:
self._off_since_ts = now
if self.overtime_latched and (now - self._off_since_ts) >= OVERTIME_RESET_OFF_SEC:
self.overtime_latched = False # auto-clear after cooldown
def set(self, mode: str):
mode = mode.upper()
if mode not in ("OFF", "FILL", "EMPTY"):
return
with self.lock:
now = time.time()
self._update_overtime_state(now)
if self.overtime_latched:
# Forced OFF while latched
self._write(False, False)
self.state = "OFF"
return
if mode != self.state and (now - self.last_switch) < MIN_SWITCH_INTERVAL_S:
return
if mode == "OFF":
self._write(False, False)
self._off_since_ts = now
self._run_start_ts = None
elif mode == "FILL":
self._write(True, False)
self._off_since_ts = None
self._run_start_ts = now
elif mode == "EMPTY":
self._write(False, True)
self._off_since_ts = None
self._run_start_ts = now
if mode != self.state:
self.state = mode
self.last_switch = now
def tick(self):
with self.lock:
self._update_overtime_state(time.time())
if self.overtime_latched:
self._write(False, False)
self.state = "OFF"
def get(self) -> str:
with self.lock:
return self.state
def is_overtime(self) -> bool:
with self.lock:
return self.overtime_latched
def shutdown(self):
with self.lock:
self._write(False, False)
self.state = "OFF"
self._run_start_ts = None
self._off_since_ts = time.time()
# -------------------- Lightstreamer listener --------------------
class ISSListener(SubscriptionListener):
def onItemUpdate(self, update):
val_str = update.getValue("Value")
iss_ts = update.getValue("TimeStamp")
if not val_str:
return
try:
value = float(val_str)
except ValueError:
print(f"[WARN] Non-numeric value: {val_str!r}", flush=True)
return
STATE.update(value, iss_ts)
print(f"[{utc_str()}] Urine Tank = {value:.1f}% (ISS TS: {iss_ts})", flush=True)
def onSubscriptionError(self, code, message):
print(f"[SUB ERROR] {code}: {message}", flush=True)
# -------------------- Main --------------------
def main() -> int:
# Ensure prints show immediately
try:
sys.stdout.reconfigure(line_buffering=True)
except Exception:
pass
print(f"[{utc_str()}] [BOOT] isslive_monitor starting...", flush=True)
# OLED init
try:
device = init_oled()
except Exception as e:
print(f"[ERROR] OLED init failed: {e}", file=sys.stderr, flush=True)
return 2
oled_render(device, ["ISS Urine Tank", "", "Connecting...", "", f"UTC {utc_str()}"])
# Init HX711 and pumps
try:
hx = HX711(HX711_DT_PIN, HX711_SCK_PIN)
except Exception as e:
print(f"[ERROR] HX711 init failed: {e}", file=sys.stderr, flush=True)
return 3
pumps = PumpController(PUMP_FILL_PIN, PUMP_EMPTY_PIN, active_high=ACTIVE_HIGH)
# Lightstreamer
client = LightstreamerClient(SERVER_URL, ADAPTER_SET)
client.connectionOptions.setRequestedMaxBandwidth("1.0")
sub = Subscription("MERGE", [ITEM_NAME], FIELDS)
sub.setRequestedSnapshot("yes")
sub.addListener(ISSListener())
print(f"[{utc_str()}] [INFO] Connecting to Lightstreamer...", flush=True)
client.connect()
client.subscribe(sub)
print(f"[{utc_str()}] [INFO] Subscribed. Waiting for updates...", flush=True)
last_overtime_print = 0.0
try:
while True:
pumps.tick()
overtime = pumps.is_overtime()
# OLED: show overtime message if active, else original screen
oled_render(device, build_lines(overtime=overtime))
if overtime:
now = time.time()
if now - last_overtime_print >= 1.0:
print(f"[{utc_str()}] Pump overtime", flush=True)
last_overtime_print = now
time.sleep(SCREEN_REFRESH_SEC)
continue
# Read bottle scale
try:
raw = hx.read_average(HX711_SAMPLES)
g = bottle_grams_from_raw(raw)
bottle_pct = bottle_percent_from_grams(g)
except Exception as e:
pumps.set("OFF")
print(f"[{utc_str()}] [WARN] HX711 read failed: {e} -> pumps OFF", flush=True)
time.sleep(SCREEN_REFRESH_SEC)
continue
# Determine target based on ISS %
iss_pct, _iss_ts, _last_rx = STATE.snapshot()
if iss_pct is None:
pumps.set("OFF")
print(
f"[{utc_str()}] Bottle={bottle_pct:.1f}% ({g:.1f} g) | ISS=NA -> pumps OFF",
flush=True,
)
time.sleep(SCREEN_REFRESH_SEC)
continue
iss_pct = clamp(iss_pct, 0.0, 100.0)
target_g = (iss_pct / 100.0) * BOTTLE_FULL_G
diff_g = g - target_g # + => bottle heavier than target
# Deadzone control
if diff_g > DEADZONE_G:
pumps.set("EMPTY")
elif diff_g < -DEADZONE_G:
pumps.set("FILL")
else:
pumps.set("OFF")
# Console output: bottle % required (not on OLED)
print(
f"[{utc_str()}] Bottle={bottle_pct:.1f}% ({g:.1f} g) "
f"| ISS={iss_pct:.1f}% (target {target_g:.1f} g) "
f"| diff={diff_g:+.1f} g | pump={pumps.get()}",
flush=True,
)
time.sleep(SCREEN_REFRESH_SEC)
except KeyboardInterrupt:
print("\n[INFO] Stopping...", flush=True)
finally:
pumps.shutdown()
try:
client.unsubscribe(sub)
client.disconnect()
except Exception:
pass
try:
GPIO.cleanup()
except Exception:
pass
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
print("[FATAL] Unhandled exception:", flush=True)
traceback.print_exc()
raiseRun the script and troubleshoot the issues.
python3 isslive_monitor.pyThe the last step is to set the py up so zhen it restarts it automatic start the script. I did not write down how I did these step but Google and even ChatGPT guided me to the solution.
Now you have a functional ISS Sewage monitor and you can hear when sombody takes a leak in the ISS!


_MsQPLY30wm.png?auto=compress%2Cformat&w=48&h=48&fit=fill&bg=ffffff)











Comments