Tim BishopVishnu Kumar
Created May 26, 2023 © GPL3+

LoraWAN AC Current Sensor

Measure real and apparent AC Current. 100A. Self-powered energy harvesting sensor. Easy install: Clip on 2 CTs and you are done.

IntermediateWork in progress4 hours118
LoraWAN AC Current Sensor

Things used in this project

Hardware components

Polycase WC-22F
×1
Seeed Studio - Grove I2C 50/60HZ AC current sensor module
×1
10 Pack 3.5mm Female Jack
×1
Grove - Wio-E5 (STM32WLE5JC)
Seeed Studio Grove - Wio-E5 (STM32WLE5JC)
×1
Seeed Studio XIAO ESP32C3
Seeed Studio XIAO ESP32C3
×1
Seeed Studio Grove Base for XIAO
×1
Seeed Studio Grove - Universal 4 Pin Buckled 5cm Cable (5 PCs Pack)
Seeed Studio Grove - Universal 4 Pin Buckled 5cm Cable (5 PCs Pack)
×1
Polymer Lithium Ion Battery - 2200mAh 3.7V
Seeed Studio Polymer Lithium Ion Battery - 2200mAh 3.7V
×1
Seeed Studio Non-invasive AC Current Sensor (100A max)
×2
PCB MOUNTING STANDOFF, NYLON 6.6, 12.5MM
×1
Cable Gland, M16
Cable Gland, M16
×1

Software apps and online services

PlatformIO Core
PlatformIO Core
Visual Studio Code Extension for Arduino
Microsoft Visual Studio Code Extension for Arduino
MicroPython
MicroPython

Story

Read more

Schematics

HCT21 Firmware

HCT21 Hardware

Code

ESP32C3 Micropython code - main.py

Python
Load this into main.py on the ESP32C3.
# main.py

import machine
import re
import time
import json
import sys
import struct
from hct21 import HCT21
import io
import os
from machine import ADC, Pin, RTC


# return number of days in a month
def days_in_month(month, year):
    if month == 2:
        if year % 4 == 0:
            return 29
        else:
            return 28
    elif month in [4,6,9,11]:
        return 30
    else:
        return 31
    
# RTC.datetime returns the following tuple
#(year, month, day, weekday, hours, minutes, seconds, subseconds)
# please convert to the number of seconds since 2000-01-01 00:00:00
def our_mktime_from_RTC(t):
    seconds = t[6] + t[5]*60 + t[4]*3600 + t[2]*86400
    for year in range(2000,t[0]):
        seconds += 31536000
        if year % 4 == 0:
            seconds += 86400
    for month in range(1,t[1]):
        seconds += days_in_month(month, t[0]) * 86400
    return seconds

def device_now():
    rtc = machine.RTC()
    t8 = rtc.datetime()
    return our_mktime_from_RTC(t8)

def device_bat_mv():
    p1 = Pin(3, Pin.IN)  # D1 / GPIO3
    adc_p1 = ADC(p1,atten=ADC.ATTN_11DB)  
    Vbat = adc_p1.read_uv() / (1000) * 4
    return int(Vbat)



class nvram:
    """NVRAM Class  - Store data in RTC memory"""
    def __init__(self, initial_state):
        self.inital_state = initial_state
        self.rtc = machine.RTC()

    def save(self, dat):
        """Save data to NVRAM"""
        self.rtc.memory(json.dumps(dat))

    def load(self):
        """Load data from NVRAM"""
        mem = self.rtc.memory()
        if mem == b'':
            return self.inital_state
        try:
            dat = json.loads(mem)
        except ValueError:
            return self.inital_state
        return dat
    
    def clear(self):
        """Clear NVRAM"""
        self.rtc.memory(json.dumps(self.inital_state))

class LorawanData:
    def __init__(self):
        self.dict = {}

    def get_lorawan_msg_hex(self):
        """Get binary hex"""
        bat_p = struct.pack('>H', self.dict['bat']) 
        temp_p = self.dict['temp']
        humid_p = self.dict['humid']
        count_p = struct.pack('>H', self.dict['count']) 
        hex = "{} {} {} {}".format(''.join('{:02x}'.format(x) for x in bat_p), ''.join('{:02x}'.format(x) for x in temp_p), ''.join('{:02x}'.format(x) for x in humid_p), ''.join('{:02x}'.format(x) for x in count_p))
        return hex
    

def bytes_to_hex(bytes):
    """Convert bytes to hex"""
    return ''.join('{:02x}'.format(x) for x in bytes)

def config_sensors():
    """Configre Sensors"""
    # configure sensors
    pass

def read_sensors():
    """Read Sensors"""
    # read sensors
    sht = SHT20(resolution=SHT20.TEMP_RES_14bit)
    temp = sht.read_temp_raw()
    humid = sht.read_humid_raw()
    bat = 3.3
    return {'temp': temp, 'humid': humid, 'bat': int(bat*1000)}


class LoraWAN_E5(object):
    """Lorawan Class"""
    def __init__(self):
        pass

    def begin(self,baud):
        self.uart = machine.UART(1, baudrate=baud, tx=21, rx=20)

    def set_new_appkey(self, appkey=None):
        if appkey is None:
            appkey = os.urandom(16).hex()
            print("New AppKey: {}".format(appkey))
        s,_ = self.run('AT+KEY=APPKEY,"{}"'.format(appkey), '.*KEY: APPKEY.*', '.*ERROR.*',300)
        return s

    #AT+ID=DevEui // Read DevEui
    #AT+ID=AppEui
    def get_dev_eui(self):
        s,_ = self.run('AT+ID=DevEui', '.*ID: DevEui.*', '.*ERROR.*',300)
        return s
    
    def get_app_eui(self):
        s,_ = self.run('AT+ID=AppEui', '.*ID: AppEui.*', '.*ERROR.*',300)
        return s

    def config(self):
        """Configure LoRaWAN"""
        self.begin(115200)
        s,_=self.run('AT', '.*OK.*', '.*ERROR.*',100)
        while s is False:
            self.begin(9600)
            self.run('AT+UART=BR, 115200', '.*UART: BR.*', '.*ERROR.*',300)
            self.run('AT+RESET', '.* RESET.*', '.*ERROR.*',300)
            self.begin(115200)
            time.sleep_ms(2000)
            s,_ = self.run('AT', '.*OK.*', '.*ERROR.*',300)

        # Set to low power mode, turn on debugging info.
        self.run("AT+LOWPOWER=AUTOON", '.*LOWPOWER: AUTOON.*', '.*ERROR.*',300)
        self.run("AT+LOG=DEBUG", '.*LOG: DEBUG.*', '.*ERROR.*',300)
        self.run("AT+ID", '.*ID: AppEui.*', '.*ERROR.*',300)

        # Set up for US915.
        self.run("AT+DR=US915", '.*DR: US915.*', '.*ERROR.*',300)
        self.run("AT+CH=NUM,8-15", '.*CH: NUM,.*', '.*ERROR.*',300)
        self.run("AT+DR=1", '.*DR:.*', '.*ERROR.*',300)

        # OTA, ADR ON.
        self.run("AT+LW=LEN", '.*LW:.*', '.*ERROR.*',300)
        self.run("AT+MODE=LWOTAA", '.*MODE:.*', '.*ERROR.*',300)
        self.run("AT+ADR=ON", '.*ADR: ON.*', '.*ERROR.*',300)

    def join(self,force=False):
        """Connect to LoRaWAN"""
        if force:
            ok, _ = self.run("AT+JOIN=FORCE", '.*JOIN: Done', '.*JOIN: Join failed.*',20_000)
        else:
            ok, _ = self.run("AT+JOIN", '.*JOIN: Done', '.*JOIN: Join failed.*',20_000)

        return ok

    def send_data(self, port, byte_dat):
        assert port < 250
        assert port > 0
        msg_hex = ''.join('{:02x}'.format(x) for x in byte_dat)
        self.run("AT+PORT={}".format(port), '.*PORT.*', '.*ERROR.*',100)
        self.run("AT+DR", '.*DR:.*', '.*ERROR.*',100)
        msg = str("AT+MSGHEX=\"{}\"".format(msg_hex))
        ok, _ =self.run(msg, '.*MSGHEX: Done.*', '.*MSGHEX: Please join network first.*|.*ERROR.*',10_000)
        self.run("AT+DR", '.*DR:.*', '.*ERROR.*',100)

        return ok

    def send_status(self):
        self.run("AT+PORT=99", '.*PORT.*', '.*ERROR.*',300)
        self.run("AT+DR", '.*DR:.*', '.*ERROR.*',300)
        msg = str("AT+MSGHEX=\"{0:02x} {1:02x} {2:02x}\"".format(1,2,3))
        ok, _ = self.run(msg, '.*MSGHEX: Done.*', '.*MSGHEX: Please join network first.*|.*ERROR.*',10_000)
        self.run("AT+DR", '.*DR:.*', '.*ERROR.*',300)

        return ok

    def max_payload(self):
        """Get max payload"""
        ok, resp = self.run("AT+LW=LEN", '.*LW: LEN, (\d+)', '.*ERROR.*',100)
        if ok:
            return int(resp)
        else:
            return 0

    def connected(self):
        return True


    def send_command(self, command, ok_resp, error_resp, timeout=200):
        """Send AT Command"""
        self.uart.write('\xff\xff\xff\xff' + command + '\r\n')
        print(str(command))
        start = time.ticks_ms()
        ok_re = re.compile(ok_resp)
        error_re = re.compile(error_resp)
        while True:
            time.sleep_us(500)  
            if self.uart.any():
                resp = self.uart.readline()
                resp = resp.replace(b'\xff', b'')
                ok_m = ok_re.match(resp)
                error_m = error_re.match(resp)
                line = resp.strip()
                if ok_m:
                    print(str(line + " SUCCESS"))
                    try:
                        m_str = ok_m.group(1)
                    except :
                        m_str = ok_m.group(0)
                    return [True, m_str]
                elif error_m:
                    print(str(line + " ERROR"))
                    try:
                        m_str = error_m.group(1)
                    except :
                        m_str = error_m.group(0)
                    return [False, m_str]
                if time.ticks_ms() - start > timeout:
                    print(str(line + " TIMEOUT"))
                    return [False, None]
                elif line != b'':
                    print(str(line))
            if time.ticks_ms() - start > timeout:
                print(str("TIMEOUT"))
                return [False, None]

    def run(self, command, ok_resp, error_resp, timeout=200):
        # send command, and look for ok or error response
        extra_time = 2  # ms
        result = self.send_command(command, ok_resp, error_resp, timeout-extra_time)
        # slurp any extra lines after the response has been found
        start = time.ticks_ms()
        while True:
            time.sleep_us(500)
            if self.uart.any():
                line = self.uart.readline().strip()
                line = line.replace(b'\xff', b'')
                if line != b'':
                    print(line)
            if time.ticks_ms() - start > extra_time:
                return result
        

class Device_LCT(object):
    """Device LCT Class"""
    def __init__(self,lorawan,hct):
        self.lorawan = lorawan
        self.hct = hct
        self.nvram = nvram({'count':0, 'uptime_ms':0})
        self.state = self.nvram.load()
        self.hct_bytes = None
        self.v_bytes1 = None
        self.b_bytes1 = None
        pass

    def sample_channel(self):
        hct.start_channel_sampling()  # takes 90ms
        hct_bytes = hct.read_channel_data()
        i = 0
        while hct_bytes is None and i < 10:
            hct_bytes = hct.read_channel_data()
            i += 1
        hct_dict = None
        if hct_bytes is not None:
            self.hct_bytes = hct_bytes
            hct_dict = hct.decode_channel_data(hct_bytes)
        return hct_dict

    def sample_all(self):
        hct.device_reset()
        self.sample_channel()
        if self.hct_bytes is not None:
            v_series1, v_bytes1 = hct.get_series("V")
            print(v_series1)
            print(v_bytes1)
            b_series1, b_bytes1 = hct.get_series("B")
            print(b_series1)
            print(b_bytes1)
        self.v_bytes1 = v_bytes1
        self.b_bytes1 = b_bytes1

    def send_uplink(self):
        hct_bytes = self.hct_bytes
        v_bytes1 = self.v_bytes1
        b_bytes1 = self.b_bytes1
        lorawan = self.lorawan
        maxb = lorawan.max_payload()
        if hct_bytes is None:
            return
        ok = lorawan.send_data(10, hct_bytes)
        while not ok:
            lorawan.join()
            lorawan.send_status()
            ok = lorawan.send_data(10, hct_bytes)
        if v_bytes1 and (len(v_bytes1) <= maxb):
            ok = lorawan.send_data(11, v_bytes1)
        if b_bytes1 and (len(b_bytes1) <= maxb):
            ok = lorawan.send_data(11, b_bytes1)
        return ok

    def status_message(self):
        return { 'uptime_ms': self.state['uptime_ms'] + time.ticks_diff(time.ticks_ms(), self.start_ms), 
                'Vbat': device_bat_mv()}
    
    def send_status_message(self,msg):
        self.status_message_bin = struct.pack('>IH', msg['uptime_ms'],msg['Vbat'])  # 32bit uptime_ms, 16 bit voltage_mv
        ok = self.lorawan.send_data(99, self.status_message_bin)
        while not ok:
            self.lorawan.join()
            ok = self.lorawan.send_data(99, self.status_message_bin)
        return ok

    def run_awake(self):
        """Run Awake function - should last 50ms or less"""
        self.start_ms = time.ticks_ms()
        time.sleep(3) # wait for the USB serial port to become ready
        lorawan = self.lorawan
        lorawan.begin(115200)
        hct = self.hct
        nvm = self.nvram
        if machine.reset_cause() != machine.DEEPSLEEP_RESET:  # == 5
            # FIRST BOOT
            print('first boot')
            lorawan.config()
            hct.device_reset()
            nvm.clear()
        else:
            # WAKE FROM DEEP SLEEP
            print('awake')

        self.sample_all()

        if self.state['count'] % (12*600) == 0:  # every 600 minutes
            i = 0
            ok = lorawan.join(True) # force join
            while not ok and i < 10:
                ok = lorawan.join(True)

        if self.state['count'] % (12*5) == 0:  # every 5 minutes
            #lorawan.join(True) # force join
            msg = device.status_message()
            print(msg)
            device.send_status_message(msg)

        if self.state['count'] % 6 == 0:  # every 30 seconds
            self.send_uplink()

        self.state['count'] += 1
        self.state['uptime_ms'] += time.ticks_diff(time.ticks_ms(), self.start_ms) + 50
        if self.state['uptime_ms'] > 2^31:
            self.state['uptime_ms'] = 0
        print(self.state)
        nvm.save(self.state)
        

    def main(self):
        """Main function"""
        # run main and deep sleep
        led = machine.Pin(10, machine.Pin.OUT, drive=machine.Pin.DRIVE_0)  # 5mA 130R
        boot_button = machine.Pin(9, machine.Pin.IN) # BOOT button - right button - hold to keep from sleeping.
        led.on()
        self.run_awake()
        if boot_button.value() == 1:
            print("going to sleep")
            led.off()
            return True
        else:
            print("BOOT button pressed, staying awake")
            return False


if __name__ == '__main__':
    try:
        lorawan = LoraWAN_E5()
        hct = HCT21()
        device = Device_LCT(lorawan,hct)
        if device.main():
            machine.deepsleep(5000)  # 6 seconds
            #machine.reset()
            #micropython.schedule(machine.deepsleep,1)
    except Exception as e:
        e_file = io.open("exceptionFile.txt", "wt")
        ex_str = e.__class__.__name__
        e_file.write(ex_str+'\n')
        sys.print_exception(e, e_file)
        e_file.close()
        print(e.__class__.__name__)
        raise e

ESP32C3 Micropython code - hct21.py

Python
Library to access the HCT21 I2C AC Current Sensor
from machine import SoftI2C, Pin
import time
import struct



def bytes_to_int(bytes):
    result = 0
    for b in bytes:
        result = result * 256 + int(b)
    return result

def calculateCRC(input):
    crc = 0x0
    for i in range (0, 2):
        crc = crc ^ input[i]
        for j in range(8, 0, -1):
            if crc & 0x80:
                crc = (crc << 1) ^ 0x31
            else:
                crc = crc << 1
    crc = crc & 0x0000FF
    return crc

def checkCRC(result):
    for i in range(2, len(result), 3):
        data = []
        data.append(result[i-2])
        data.append(result[i-1])

        crc = result[i]

        if crc == calculateCRC(data):
            crc_result = True
        else:
            crc_result = False
    return crc_result

class HCT21():
    SHT20_ADDR = 0x40

    I2C_GET_DEVICE_CONFIG = 0xD0
    I2C_START_SAMPLING = 0xD1
    I2C_GET_CHANNEL_DATA = 0xD3
    I2C_GET_VOLTAGE_SERIES_HEADER = 0xD4
    I2C_GET_VOLTAGE_SERIES_DATA = 0xD5
    I2C_GET_A_SERIES_HEADER = 0xD6
    I2C_GET_A_SERIES_DATA = 0xD7
    I2C_GET_B_SERIES_HEADER = 0xD8
    I2C_GET_B_SERIES_DATA = 0xD9
    I2C_GET_C_SERIES_HEADER = 0xDA
    I2C_GET_C_SERIES_DATA = 0xDB

    SOFT_RESET = 0xFE

    NO_ERROR = 0
    ERROR = -1

    def __init__(self):
        self.bus = SoftI2C(scl=Pin(7), sda=Pin(6), freq=100000, timeout=5000)  # microseconds timeout 5ms
        self.sht20_init()

    def sht20_init(self):
        #self.bus.writeto(self.SHT20_ADDR, bytes([self.SOFT_RESET]))
        #time.sleep_ms(15)
        # convert a constant self.SOFT_RESET into a buffer
        #self.bus.writeto(self.SHT20_ADDR, bytes([self.SOFT_RESET]))

        # userReg =  self.bus.readfrom_mem(self.SHT20_ADDR, self.READ_USER_REG, 1)
        # userReg &= 0x38 # we must not change "Reserved bits"
        # userReg |= inits + self.DISABLE_OTP_RELOAD # add our settings + always disable OTP

        # self.bus.writeto_mem(self.SHT20_ADDR, self.WRITE_USER_REG, bytes([userReg]) )
        pass

    def start_channel_sampling(self):
        try: 
            self.bus.writeto(self.SHT20_ADDR, bytes([self.I2C_START_SAMPLING]))
            time.sleep_ms(500)
            return True
        except OSError:
            return False


   #{ FIRMWARE_VERSION, ADC_SAMPLES_PER_SECOND, samples,
   # A_apparent_amps*10.0, B_apparent_amps*10.0, C_apparent_amps*10.0,
   # A_active_amps*10.0, B_active_amps*10.0, C_active_amps*10.0,
   # V_rms*10.0, temp_C, 0 };  // 16bit CRC8  (top 8 bits are always 0 for 16bit CRC8)

    def read_channel_data(self):
        try:
            self.bus.writeto(self.SHT20_ADDR, bytes([self.I2C_GET_CHANNEL_DATA]), False)
            byte_dat = self.bus.readfrom(self.SHT20_ADDR, 12*2)
            return byte_dat
        except OSError:
            return None
      

    def decode_channel_data(self, byte_dat):
        self.data = {}
        self.data['FIRMWARE_VERSION'] = bytes_to_int(byte_dat[0:2])
        self.data['ADC_SAMPLES_PER_SECOND'] = bytes_to_int(byte_dat[2:4])
        self.data['samples'] = bytes_to_int(byte_dat[4:6])
        self.data['A_apparent_amps'] = struct.unpack('>h',byte_dat[6:8])[0] / 100.0  
        self.data['B_apparent_amps'] = struct.unpack('>h',byte_dat[8:10])[0]/ 100.0
        self.data['C_apparent_amps'] = struct.unpack('>h',byte_dat[10:12])[0] / 100.0
        self.data['A_active_amps'] = struct.unpack('>h',byte_dat[12:14])[0] / 100.0
        self.data['B_active_amps'] = struct.unpack('>h',byte_dat[14:16])[0] / 100.0
        self.data['C_active_amps'] = struct.unpack('>h',byte_dat[16:18])[0] / 100.0
        self.data['V_rms'] = bytes_to_int(byte_dat[18:20]) / 10.0
        self.data['temp_C'] = struct.unpack('>h',byte_dat[20:22])[0]
        self.data['crc'] = bytes_to_int(byte_dat[22:24])
        return self.data

    #   Wire.write((uint8_t)(chV.get_num_samples() >> 8)); 
    #   Wire.write((uint8_t) chV.get_num_samples());
    #   Wire.write((uint8_t)(chV._samples8._firstSample >> 8)); 
    #   Wire.write((uint8_t)(chV._samples8._firstSample));
    #   for (int i = 1; i < chV._samples8._tickNum; i++) {  // zero is unused. starts at 1.
    #     Wire.write(chV._samples8._samples[i]);
    #   }


    def get_series(self, channel):
        if channel == 'V':
            header_cmd = self.I2C_GET_VOLTAGE_SERIES_HEADER
            data_cmd = self.I2C_GET_VOLTAGE_SERIES_DATA
        elif channel == 'A':
            header_cmd = self.I2C_GET_A_SERIES_HEADER
            data_cmd = self.I2C_GET_A_SERIES_DATA
        elif channel == 'B':
            header_cmd = self.I2C_GET_B_SERIES_HEADER
            data_cmd = self.I2C_GET_B_SERIES_DATA
        elif channel == 'C':
            header_cmd = self.I2C_GET_C_SERIES_HEADER
            data_cmd = self.I2C_GET_C_SERIES_DATA
        else:
            return [None,None]
        
        try: 
            self.bus.writeto(self.SHT20_ADDR, bytes([header_cmd]))
            byte_dat_a = self.bus.readfrom(self.SHT20_ADDR, 10)
            name_char = chr(byte_dat_a[0])
            sample_sequence = byte_dat_a[1]
            num_samples = bytes_to_int(byte_dat_a[2:4])
            num_samples = min(num_samples, 600)  # upper bound, abitruary.
            samples_per_second = bytes_to_int(byte_dat_a[4:6])
            channel_avg = struct.unpack('>h',byte_dat_a[6:8])[0]
            index_offset = bytes_to_int(byte_dat_a[8:10])

            print(byte_dat_a)
            if num_samples == 0:
                return [None,None]

            self.bus.writeto(self.SHT20_ADDR, bytes([data_cmd]))
            byte_dat_b = self.bus.readfrom(self.SHT20_ADDR, num_samples+1)
            series = [0] * (num_samples)
            series[0] = struct.unpack('>h',byte_dat_b[0:2])[0]
            last_sample = series[0]

            print(name_char, sample_sequence, num_samples, samples_per_second, channel_avg, index_offset, series[0])

            self.series_bin = byte_dat_a + byte_dat_b
            for i in range(len(byte_dat_b[2:])):
                delta = struct.unpack('b',bytes([byte_dat_b[i+2]]))[0]
                last_sample = last_sample + delta
                series[i+1] = last_sample
            return [series, self.series_bin ]
        except OSError:
            return [None,None]



    def start_temp_measurement(self):
        self.bus.writeto(self.SHT20_ADDR, bytes([self.TRIG_TEMP_NOHOLD]), False) # no stop
        time.sleep_ms(75)

    def start_humid_measurement(self):
        self.bus.writeto(self.SHT20_ADDR, bytes([self.TRIG_HUMID_NOHOLD]), False) # no stop
        time.sleep_ms(75)

    def read_temp(self):
        temp_list = []
        self.start_temp_measurement()
        #print(str(time.ticks_ms())+":2")
        readTemp = self.bus.readfrom(self.SHT20_ADDR, 3)

        #print(str(time.ticks_ms())+":3")
        for i in range(len(readTemp)):
            temp_list.append(readTemp[i])
        #print(str(time.ticks_ms())+":4")
        if checkCRC(temp_list):
            temp_list[1] &= 0xFC
            temp = (((temp_list[0] * pow(2, 8) + temp_list[1]) * 175.72) / pow(2, 16)) - 46.85
            return temp
        else:
            return self.ERROR
        
    def read_temp_raw(self):
        temp_list = []
        self.start_temp_measurement()
        #print(str(time.ticks_ms())+":2")
        readTemp = self.bus.readfrom(self.SHT20_ADDR, 3)

        #print(str(time.ticks_ms())+":3")
        for i in range(len(readTemp)):
            temp_list.append(readTemp[i])
        #print(str(time.ticks_ms())+":4")
        if checkCRC(temp_list):
            temp_list[1] &= 0xFC
            temp = struct.pack('BB',temp_list[0], temp_list[1])
            return temp
        else:
            return self.ERROR

    def read_humid(self):
        humid_list = []

        self.start_humid_measurement()

        readHumid = self.bus.readfrom(self.SHT20_ADDR, 3)

        for i in range(len(readHumid)):
            humid_list.append(readHumid[i])

        if checkCRC(humid_list):
            humid_list[1] &= 0xFC
            humid = (((humid_list[0] * pow(2, 8) + humid_list[1]) * 125) / pow(2, 16)) - 6
            return humid
        else:
            return self.ERROR
        

    def read_humid_raw(self):
        humid_list = []

        self.start_humid_measurement()

        readHumid = self.bus.readfrom(self.SHT20_ADDR, 3)

        for i in range(len(readHumid)):
            humid_list.append(readHumid[i])

        if checkCRC(humid_list):
            humid_list[1] &= 0xFC
            humid = struct.pack('BB',humid_list[0], humid_list[1])
            return humid
        else:
            return self.ERROR

    def read_all(self):
        data_list = []
        data_list.append(self.read_temp())
        data_list.append(self.read_humid())

        return data_list

    def device_reset(self):
        try: 
            self.bus.writeto(self.SHT20_ADDR, bytes([self.SOFT_RESET]))
            time.sleep_ms(100)
            return True
        except OSError:
            pass

        try:
            self.bus.writeto(self.SHT20_ADDR, bytes([self.SOFT_RESET]))
            time.sleep_ms(100)
            return True
        except OSError:
            pass

        

The Things Network - Uplink Payload Formatter

JavaScript
function signed16_big(b0,b1) {
  
  let num = b0 << 8 | b1;
  if (num >= 0x8000) {
    num = -(0x10000 - num);
  }
  return num
}

function unsigned16_big(b0,b1) {
  
  let num = b0 << 8 | b1;
  return num
}

function unsigned32_big(b0,b1,b2,b3) {
  
  let num = b0 << 24 | b1 << 16 | b2 << 8 | b3;
  return num
}

function decodeUplink(input) {
  
  if (input.fPort == 1) {
    return {
      data: {
        _port: input.fPort,
        voltage: (input.bytes[0] << 8 | input.bytes[1])/1000.0,
        temperature_c: round2((input.bytes[2] << 8 | input.bytes[3]) * 175.0/65535.0 - 45.0),
        humidity: round2((input.bytes[4] << 8 | input.bytes[5]) * 125.0/65535.0 - 6.0),
        count: (input.bytes[6] << 8 | input.bytes[7]),
        // bytes: input.bytes,//0E1F63077DE0
  
      },
      warnings: [],
      errors: []
    };
  }
  
  // PORT 10 - HCT21 I2C_GET_CHANNEL_DATA
  if (input.fPort == 10) {
      d = {}
      by = input.bytes
      d['FIRMWARE_VERSION'] = unsigned16_big(by[0],by[1])
      d['ADC_SAMPLES_PER_SECOND'] = unsigned16_big(by[2],by[3])
      d['samples'] = unsigned16_big(by[4],by[5])
      d['A_apparent_amps'] = signed16_big(by[6],by[7]) / 100.0  
      d['B_apparent_amps'] = signed16_big(by[8],by[9])/ 100.0
      d['C_apparent_amps'] = signed16_big(by[10],by[11]) / 100.0
      d['A_active_amps'] = signed16_big(by[12],by[13]) / 100.0
      d['B_active_amps'] = signed16_big(by[14],by[15]) / 100.0
      d['C_active_amps'] = signed16_big(by[16],by[17]) / 100.0
      d['V_rms'] = unsigned16_big(by[18],by[19]) / 10.0
      d['temp_C'] = signed16_big(by[20],by[21])
      d['crc'] = unsigned16_big(by[22],by[23])
   
      return {
        data: d,
        warnings: [],
        errors: []
      };
    
  }
  
  if (input.fPort == 99) {
      d = {}
      by = input.bytes
      d['uptime_ms'] = unsigned32_big(by[0],by[1],by[2],by[3])
      d['V_bat'] = unsigned16_big(by[4],by[5]) / 1000
  
       return {
        data: d,
        warnings: [],
        errors: []
      };
    
  }
  
  // unknown message
      return {
      data: {
        bytes: input.bytes,//0E1F63077DE0
  
      },
      warnings: ['unknown message fPort'],
      errors: []
    };
  
  
  
}


function round2(num) {
  return Math.round((num + Number.EPSILON) * 100) / 100
}


//float _convertTicksToCelsius(uint16_t ticks) 
//{
//    return static_cast<float>(ticks * 175.0 / 65535.0 - 45.0);
//}
//float _convertTicksToPercentRH(uint16_t ticks) 
//{
//    return static_cast<float>(ticks * 125.0 / 65535.0 - 6);
//}

Credits

Tim Bishop

Tim Bishop

3 projects • 1 follower
Electrical engineer, my first project was an apple cider press, since then I'm interested in electronics, sustainability, music, and hiking.
Vishnu Kumar

Vishnu Kumar

2 projects • 0 followers

Comments