#!/usr/bin/python # # Monitor a bunch of pins corresponding to manual switches. # Deal with debounce. Send HTTP requests to perform an action. # # NOTE: # $ mpremote fs cp config.json :config.json # import time import json if True: import micropython import machine import uasyncio import network import urequests import homeutil APP = homeutil.BaseServer() FLUTTER_DELAY = 50 # milliseconds class SwitchPin: def __init__(self, pin_id, onpress_cb): self.pin = machine.Pin(pin_id, mode=machine.Pin.IN, pull=machine.Pin.PULL_UP, ) self.pin.irq(self.irq_handler) # A callback for when the specified PIN/switch is pressed. self.onpress_cb = onpress_cb self._notice = self.notice # Create bound method object now. self._first_transition = None # Start, assuming a state of "not-pressed" self._pressed = False def irq_handler(self, pin): if self._first_transition: # Saw a transition already. Now: residing inside the flutter. # Work was started at the transition. Nothing to do. return # Schedule some work outside of the IRQ self._first_transition = time.ticks_ms() micropython.schedule(self._notice, None) def notice(self, ticks): print('TICKS-FIRST:', self._first_transition) uasyncio.create_task(self.read_switch()) # NOTE: we have a preference for moving from NOT-PRESSED to PRESSED. # This is based on the assumption that NOT-PRESSED is the at-rest # state of the (momentary) switches. That any signal of them being # touched is important. On the other hand, moving to NOT-PRESSED is # likely part of "flutter" or is the final release of a human press # on the switch. if not self._pressed: # We saw activity while NOT-PRESSED, which means that we saw: # * random electrical spike # * initial transition to "pressed", and the switch is now # in the "flutter" state for a while. # # Move to the PRESSED state. This is the *intent* of the human # pressing the switch. self._pressed = True # Invoke the ONPRESS_CB handler; the flutter and switch-release # is an internal matter. ### maybe pass self, or self.pin? self.onpress_cb() async def read_switch(self): delay = time.ticks_diff(time.ticks_ms(), self._first_transition) print('DELAYED:', delay, self.pin, self.pin.value()) print('NOW: t=', time.ticks_ms()) until = time.ticks_add(self._first_transition, FLUTTER_DELAY) await wait_until(until) print('WAITED: t=', time.ticks_ms()) # Pin is ACTIVE LOW. switch_state = not self.pin.value() if self._pressed and not switch_state: # Has the switch returned to stable not-pressed state? ### I don't think we need to signal this? print(' => released') self._pressed = switch_state # Start looking for another transition self._first_transition = None class Control: "Control a device." def __init__(self, device): self.device = device ### today: state is held in the device. We will guess/track. ### future: state is held within a Pico. self.state = False # assume "Off" def pressed(self): uasyncio.create_task(self.toggle()) async def toggle(self): if self.state: opcode = self.device.opcode_off() else: opcode = self.device.opcode_on() print('TOGGLE:', opcode) ### this should use uaiohttpclient r = urequests.get(f'http://192.168.0.3/i/{opcode}') ### do anything with r? self.state = not self.state async def run(cfg): # Activate our IP, waiting for DHCP/port to open. wlan = await homeutil.connect_to_network(cfg['network']['ssid'], cfg['network']['password']) async def onpress_work(): print('NOTIFY: pressed') r = urequests.get('http://192.168.0.3/i/282e') print(r.text) def onpress(): uasyncio.create_task(onpress_work()) pins = [ ] for pin, spec in ():#cfg['switches'].items(): if pin.startswith('#'): continue name, opcode = spec print('SWITCH:', pin, name, opcode) pins.append(SwitchPin(int(pin), onpress)) controls = set() d = I2C('281e', 'Hallway (Back)', 20, PORTC, 3) d = I2C('1a18', 'Powder Room', 13, PORTA, 5) c = Control(d) controls.add(c) pins.append(SwitchPin(15, c.pressed)) # Flash the LED, for alive status. uasyncio.create_task(homeutil.flash_LED()) # Create the web server, and wait for it to start. await APP.begin() # Spin, until the (web) server is closed. while not APP.server.task.done(): ### BUG? The main event loop cannot sleep for long. ### However, it appears that *TASKS* can sleep. await uasyncio.sleep(0.01) async def wait_until(target): while True: delta = time.ticks_diff(target, time.ticks_ms()) if delta <= 0: return await uasyncio.sleep_ms(1) # ---- # Pins are connected to which devices? PINS = { # PIN-NUMBER : DEVICE-IDENT 6 : '1618', # Upstairs powder 7 : '2809', # Home theater 8 : '2a10', # Exercise 9 : '1a03', # Marla's closet 10 : '2a09', # Master bathroom 11 : '1a01', # Master WC 12 : '1a18', # Powder room 13 : '2a03', # Master reading 14 : '2803', # Laundry 15 : '280b', # Pantry 16 : '1801', # Nook 17 : '1804', # Marla's Office 18 : 'gread', # Guest reading 19 : '1803', # Guest bedroom 20 : '2a01', # Guest bathroom 21 : '1601', # Office bathroom # UNUSED: 22, 26, 27 } ### for now, override with Powder room PINS[15] = '1a18' ### guest reading lights are out. Bedroom light instead. PINS[18] = '1803' # ---- # All the devices in the house. Likely: factor this out, in future. class I2C: def __init__(self, ident, name, address, port, pin): self.ident = ident self.name = name self.address = address self.port = port self.pin = pin def opcode_on(self): return hex(int(self.opcode_off(), 16) | 0x20)[2:] # Enable bit def opcode_off(self): return hex((self.address << 9) | (0x08 if self.pin&1 else 0) # Shift if pin is odd | (0x02 if self.port == PORTC else 0) | (1 << (self.pin & 0xE)) # Mask away LSB )[2:] def opcode_toggle(self): pass ### TBD: unrecognized PORTA = 0 PORTC = 1 class ViaHTTP: "Device operated via HTTP." ### future devices, not connected via I2C today. def __init__(self, ident, name, host): self.ident = ident self.name = name self.host = host HOST_24V = '24V' ### TBD HOST_110 = '110' ### TBD DEVICES = { I2C('2801', 'Kitchen', 20, PORTA, 0), I2C('1810', 'Kitchen Island', 12, PORTA, 4), I2C('280b', 'Pantry', 20, PORTC, 1), I2C('1801', 'Nook', 12, PORTA, 0), I2C('1604', 'Outdoor Kitchen', 11, PORTA, 2), I2C('1818', 'Front Door Sconces', 12, PORTA, 5), I2C('2a04', 'Foyer', 21, PORTA, 2), I2C('1a18', 'Powder Room', 13, PORTA, 5), I2C('1a10', 'Powder Room (Fan)', 13, PORTA, 4), ViaHTTP('hallF', 'Hallway (Front)', HOST_24V), ViaHTTP('hallM', 'Hallway (Midway)', HOST_24V), I2C('281e', 'Hallway (Back)', 20, PORTC, 3), I2C('1a06', 'Master Bedroom', 13, PORTC, 2), I2C('1a0b', 'Master Bathroom Sconces', 13, PORTC, 1), I2C('2a03', 'Master Reading', 21, PORTC, 0), I2C('2a09', 'Master Bathroom', 21, PORTA, 1), I2C('1a01', 'Master WC', 13, PORTA, 0), I2C('1a09', 'Master WC Fan', 13, PORTA, 1), I2C('1a04', 'Tub Pendant', 13, PORTA, 2), I2C('1a03', 'Marla\'s Closet', 13, PORTC, 0), ViaHTTP('gcloset', 'Greg\'s Closet', HOST_24V), I2C('2a10', 'Exercise', 21, PORTA, 4), I2C('2803', 'Laundry', 20, PORTC, 0), I2C('1803', 'Guest Bedroom', 12, PORTC, 0), ViaHTTP('gread', 'Guest Reading', HOST_24V), I2C('2a01', 'Guest Bathroom', 21, PORTA, 0), ### is the overhead fan/light one circuit? and one relay? I2C('180b', 'Guest Bathroom Fan', 12, PORTC, 1), ### maybe overhead light is tied to sconces? I2C('1806', 'Guest Bathroom Sconces', 11, PORTC, 2), I2C('2809', 'Home Theater', 20, PORTA, 1), ViaHTTP('bar', 'Bar', HOST_24V), I2C('1618', 'Bathroom', 11, PORTA, 5), I2C('1610', 'Bathroom Fan', 11, PORTA, 4), I2C('160b', 'Upstairs Sconces', 11, PORTC, 1), I2C('1606', 'Stair Sconces', 11, PORTC, 2), ViaHTTP('balcony', 'Balcony', HOST_110), I2C('1809', 'Greg\'s Office', 12, PORTA, 1), I2C('1804', 'Marla\'s Office', 12, PORTA, 2), ViaHTTP('server', 'Server Room', HOST_24V), I2C('1601', 'Office Bathroom', 11, PORTA, 0), I2C('1609', 'Office Bathroom Fan', 11, PORTA, 1), ViaHTTP('garageS', 'Garage Sconces', HOST_110), ViaHTTP('garageF', 'Garage Flourescents', HOST_110), ViaHTTP('patio', 'Patio', HOST_110), } # ---- def test_review(): import operator # Review the device opcodes. for d in sorted(DEVICES, key=operator.attrgetter('name')): if isinstance(d, I2C): print(f'{d.name:25s}: {d.ident} -- {d.opcode_off()} {d.opcode_on()}') else: # ViaHTTP print(f'{d.name:25s}: {d.ident} -- {d.host}') if __name__ == '__main__': #test_review(); import sys; sys.exit(0) cfg = homeutil.load_config() uasyncio.run(run(cfg))