#!/usr/bin/python # # Monitor a bunch of GPIOs corresponding to manual switches. # Deal with debounce. Send HTTP requests to perform an action. # # NOTE: # $ mpremote fs cp config.json :config.json # import time import asyncio if True: import micropython import machine import network import urequests import homeutil APP = homeutil.BaseServer() FLUTTER_DELAY = 50 # milliseconds class SwitchGPIO: SAMPLE_COUNT = 20 # how many samples to take SAMPLE_DELAY = 5 # milliseconds to wait between samples def __init__(self, gpio_id, onpress_cb): self.gpio_id = gpio_id self.pin = machine.Pin(gpio_id, mode=machine.Pin.IN, pull=machine.Pin.PULL_UP, ) self.pin.irq(self.irq_handler) # A callback for when the specified GPIO/switch is pressed. self.onpress_cb = onpress_cb self._notice = self.notice_v2 # Create bound method object now. # Specifies whether we are currently processing a switch press. self.processing = False # Start, assuming a state of "not-pressed" ### old, v1 thing. self._pressed = False ### debug/stats/etc ### not used, until we find safe Integer increments self.num_irq = 0 self.num_scheduled = 0 def irq_handler(self, pin): ### this might overflow the pre-built integer objects #self.num_irq += 1 if self.processing: # Saw a transition already. Now: residing inside the flutter. # Work was started at the transition. Nothing to do. return # Schedule some work outside of the IRQ self.processing = True micropython.schedule(self._notice, None) ### this might overflow the pre-built integer objects #self.num_scheduled += 1 def notice_v2(self, ticks): # Move from synchronous to an async function to review the switch. asyncio.create_task(self.read_switch_v2()) def notice_v1(self, ticks): # We can allocate the ticks_ms() result now. There will be a little # skew, but nothing of relevance to a human. self._began_transition = time.ticks_ms() log(f'[GP{self.gpio_id}] STATS:', self._began_transition, self.num_irq, self.num_scheduled) asyncio.create_task(self.read_switch_v1()) # NOTE: we have a preference for moving from NOT-PRESSED to PRESSED. # This is based on the assumption that NOT-PRESSED is the at-rest # state of the (momentary) switches. That any signal of them being # touched is important. On the other hand, moving to NOT-PRESSED is # likely part of "flutter" or is the final release of a human press # on the switch. if not self._pressed: # We saw activity while NOT-PRESSED, which means that we saw: # * random electrical spike # * initial transition to "pressed", and the switch is now # in the "flutter" state for a while. # # Move to the PRESSED state. This is the *intent* of the human # pressing the switch. self._pressed = True # Invoke the ONPRESS_CB handler; the flutter and switch-release # is an internal matter. ### maybe pass self, or self.pin? self.onpress_cb() async def read_switch_v2(self): ### add to .num_irq or .num_scheduled ... what stats do we want? pressed = await self.is_pressed() if not pressed: # Likely a line spike. All done here. return log('PRESSED: invoking callback') self.onpress_cb() # Now we wait for the human to let go of the switch. while await self.is_pressed(): pass log('SWITCH RELEASED.') # Done with processing. Look for future presses. self.processing = False async def is_pressed(self): "Determine if the switch is being pressed by a human." # We're going to record samples over a period of time. We don't # need to record the series, but just the totals either direction. sample_on = 0 sample_off = 0 # How many samples did we collect? count = 0 # We prefer to get samples every 5ms. Other tasks might vary our # sampling rate, so wait for absolute times based from this # start time. Whether that is 5ms, 3ms, or we got delayed too # much and need to read straight-away. last_sample = time.ticks_ms() end_time = last_sample + (self.SAMPLE_DELAY * self.SAMPLE_COUNT) ### some debug stuff. Let's see what happens. samples = '\u2bfe' while time.ticks_ms() < end_time: # The GPIO is ACTIVE LOW. if switch_state := not self.pin.value(): sample_on += 1 samples += '\u203e' else: sample_off += 1 samples += '_' # Record how many we actually sampled. Hopefully SAMPLE_COUNT. # Note: observation shows we usually get them all. Occasionally, # we get just a single sample. ### don't know why. There shouldn't be other tasks interfering. count += 1 # Bump this forward, and wait until then. last_sample += self.SAMPLE_DELAY await wait_until(last_sample) # If we never observed the switched pressed within our observation # period, then we clearly just saw a single spike. Don't bother # with any further work to debounce. if sample_on == 0: return False # We've been sampling for a while. log(f'[GP{self.gpio_id}] SAMPLE: on={sample_on} off={sample_off} count={count} TRACKED: {samples}\u221f') # If we saw at least THREE samples, then let's call it a human. return sample_on >= 3 async def read_switch_v1(self): delay = time.ticks_diff(time.ticks_ms(), self._began_transition) print('DELAYED:', delay, self.pin, self.pin.value()) print('NOW: t=', time.ticks_ms()) until = time.ticks_add(self._began_transition, FLUTTER_DELAY) await wait_until(until) print('WAITED: t=', time.ticks_ms()) # Pin is ACTIVE LOW. switch_state = not self.pin.value() if self._pressed and not switch_state: # Has the switch returned to stable not-pressed state? ### I don't think we need to signal this? print(' => released') self._pressed = switch_state # Start looking for another transition self._began_transition = None class Control: "Control a light." def __init__(self, light): self.light = light ### today: state is held in the light controller. We will guess/track. ### future: state is held within a Pico. self.state = False # assume "Off" def pressed(self): asyncio.create_task(self.toggle()) async def toggle(self): if self.state: opcode = self.light.opcode_off() else: opcode = self.light.opcode_on() log(f'[{self.light.ident}] TOGGLE:', opcode) ### this should use uaiohttpclient r = urequests.get(f'http://192.168.0.3/i/{opcode}') ### do anything with r? self.state = not self.state async def run(cfg): # Activate our IP, waiting for DHCP/port to open. wlan = await homeutil.connect_to_network(cfg['network']['ssid'], cfg['network']['password']) lights = build_lights(cfg) # Map shortnames to light instances map_short = { } for l in lights: ### fix terminology. s/ident/shortname/ map_short[l.ident] = l pins = set() controls = set() for gpio_id, name in cfg['switches'].items(): light = map_short[name] c = Control(light) controls.add(c) pins.add(SwitchGPIO(gpio_id, c.pressed)) log(f'GP{gpio_id} -> {name} -> {getattr(light, "i2c", "n/a")}') # Create the web server, and wait for it to start. await APP.begin() # Spin, until the (web) server is closed. while not APP.server.task.done(): ### BUG? The main event loop cannot sleep for long. ### However, it appears that *TASKS* can sleep. await asyncio.sleep(0.01) def build_lights(cfg): import math ### not sure how long we need this lights = set() for ident, info in cfg['lights'].items(): if i2c := info.get('i2c'): # See: pic/relay-board/main.asm for OPCODE format. We # deconstruct that here. ### Obviously, it means those values will match up if ### we compare the two schemes. That's fine. addr = i2c >> 9 pinmask = i2c & 0b00010101 if i2c & 0b00001000: # the SHIFT bit pinmask <<= 1 pin = int(math.log2(pinmask)) if i2c & 0b00000010: port = PORTC else: port = PORTA light = I2C(ident, f'{i2c:04x}', info['label'], addr, port, pin) else: light = ViaHTTP(ident, info['label'], HOST_24V) ### check HOST_24V lights.add(light) return lights async def wait_until(target): while True: delta = time.ticks_diff(target, time.ticks_ms()) if delta <= 0: return await asyncio.sleep_ms(1) def log(*args): t = time.localtime() ts = f'[{t[3]:02}:{t[4]:02}:{t[5]:02}]' print(ts, *args) # ---- # All the lights in the house. Likely: factor this out, in future. class I2C: def __init__(self, ident, i2c, name, address, port, pin): self.ident = ident self.i2c = i2c # I2C address + opcode self.name = name self.address = address self.port = port self.pin = pin def opcode_on(self): return hex(int(self.opcode_off(), 16) | 0x20)[2:] # Enable bit def opcode_off(self): return hex((self.address << 9) | (0x08 if self.pin&1 else 0) # Shift if pin is odd | (0x02 if self.port == PORTC else 0) | (1 << (self.pin & 0xE)) # Mask away LSB )[2:] def opcode_toggle(self): pass ### TBD: unrecognized PORTA = 0 PORTC = 1 class ViaHTTP: "Light operated via HTTP." ### future lights, not connected via I2C today. def __init__(self, ident, name, host): self.ident = ident self.name = name self.host = host HOST_24V = '24V' ### TBD HOST_110 = '110' ### TBD # ---- def test_review(): ### maybe load YAML instead of the translated cfg = eval(open('../config.repr').read()) ### nothing to review right now. print('NOTHING TO REVIEW.') def main(): asyncio.run(run(APP.cfg)) if __name__ == '__main__': #test_review(); import sys; sys.exit(0) main()