Commit 5a99ca62 authored by Ralf's avatar Ralf
Browse files

make it start without type errors

parent c6979ec0
from libtuer import ThreadFunction, logger
from busfahrer import Pin
import time
class Actor:
......@@ -20,16 +21,16 @@ class Actor:
def execute(self):
if self.verbose:
logger.info("Actor: Running command %s" % self.name)
logger.info("Actor: Running command {0}".format(self.name))
else:
logger.debug("Actor: Running command %s" % self.name)
logger.debug("Actor: Running command {0}".format(self.name))
for (value, delay) in self.todo:
if value is not None:
logger.debug("Actor: Setting pin %d to %d" % (self.pin, value))
logger.debug("Actor: Setting pin {0} to {1}".format(self.pin, value))
self.i2c.setPin(self.pin, self.translator(value))
if delay > 0:
time.sleep(delay)
''' todo list explanation
type:
......@@ -47,9 +48,9 @@ class Actor:
def __init__(self, i2c):
self._CMDs = {
CMD_UNLOCK: CMD("unlock", pin=(0x21,3), tid=0, todo=[(True, 0.3), (False, 0.1)], i2c=i2c, invert=True),
CMD_LOCK: CMD( "lock", pin=(0x21,2), tid=0, todo=[(True, 0.3), (False, 0.1)], i2c=i2c, invert=True),
CMD_BUZZ: CMD( "buzz", pin=(0x21,1), tid=1, todo=[(True, 2.5), (False, 0.1)], i2c=i2c, invert=False),
Actor.CMD_UNLOCK: Actor.CMD("unlock", pin=Pin(0x21,3), tid=0, todo=[(True, 0.3), (False, 0.1)], i2c=i2c, invert=True),
Actor.CMD_LOCK: Actor.CMD( "lock", pin=Pin(0x21,2), tid=0, todo=[(True, 0.3), (False, 0.1)], i2c=i2c, invert=True),
Actor.CMD_BUZZ: Actor.CMD( "buzz", pin=Pin(0x21,1), tid=1, todo=[(True, 2.5), (False, 0.1)], i2c=i2c, invert=False),
}
# launch threads, all running the "_execute" method
self.threads = {}
......
from smbus import SMBus
from threading import RLock
class Pin:
def __init__(self, addr, bit):
assert 0 <= bit and bit < 8, "There are only 8 bits per byte"
self.addr = addr
self.bit = bit
def __str__(self):
return "{0:x}.{1}".format(self.addr, self.bit)
class I2C:
'''This class manages the I²C bus.
It is thread-safe.
......@@ -22,7 +31,7 @@ class I2C:
def setPins(self, addr, stateByte):
'''Set all pins of the given address'''
with self._lock:
print("Setting {0:x} to {1:08b}".format(addr, stateByte))
#print("Setting 0x{0:x} to {1:08b}".format(addr, stateByte))
if self._active:
self._bus.write_byte(addr, stateByte)
self._bytes[addr] = stateByte
......@@ -30,26 +39,23 @@ class I2C:
def setPin(self, pin, state):
'''Set the gien pin: An (address, bit) pair'''
with self._lock:
(addr, bit) = pin
assert 0 <= bit and bit < 8
byte = self._bytes.get(addr, 0x00)
byte = self._bytes.get(pin.addr, 0x00)
# set the bit
if state:
byte |= (1 << bit)
byte |= (1 << pin.bit)
else:
byte &= ~(1 << bit)
byte &= ~(1 << pin.bit)
# now we know what to set
self.setPins(addr, byte)
self.setPins(pin.addr, byte)
def getPin(self, pin):
with self._lock:
if not self._active:
return None
# read the current value
(addr, bit) = pin
byte = self.bus.read_byte(addr)
byte = self._bus.read_byte(pin.addr)
# now we need to get the bit
mask = (1 << bit)
mask = (1 << pin.bit)
return bool(byte & mask)
......
from libtuer import logger
from busfahrer import Pin
import time
import threading
import signal
......@@ -17,7 +18,7 @@ class Blinker(threading.Thread):
self._terminate = False
def run(self):
logger.debug("""LED blinker thread for LED "%s" on pin %d started.""" % (self._name,self._pin))
logger.debug("""LED blinker thread for LED "{0}" on pin {1} started.""".format(self._name,self._pin))
while not self._terminate:
assert len(self._pattern) > 0
self._notifyer.clear()
......@@ -46,15 +47,15 @@ class Blinker(threading.Thread):
class LedActor:
ledPins = {
"red" : (0x21,4),
"green" : (0x21,5),
"red" : Pin(0x21,4),
"green" : Pin(0x21,5),
}
def __init__(self,i2c):
# launch threads, all running the "_execute" method
self._threads = {}
for (name,pin) in self.ledPins.items():
logger.debug("Setting pin %d to output as \"%s\" LED, initial state: False" % (pin,name))
logger.debug("Setting pin {0} to output as \"{1}\" LED, initial state: False".format(pin,name))
# set LED off initially
i2c.setPin(pin,True)
# yes, True means off
......
from collections import namedtuple
from libtuer import ThreadRepeater, logger
from statemachine import StateMachine
from busfahrer import Pin
class PinsState():
pass
......@@ -11,7 +12,7 @@ class PinWatcher():
self.pin = pin
self.i2c = i2c
self._histlen = histlen
self._translator = (lambda x:not x) if invert else bool
self._translate = (lambda x:not x) if invert else bool
# state change detection
self._state = None
self._newstate = None # != None iff we are currently seeing a state change
......@@ -50,10 +51,10 @@ class PinWatcher():
class PinsWatcher():
def __init__(self, state_machine, i2c):
self._pins = {
'bell_ringing': PinWatcher((0x20,2), 2, i2c, invert=True),
'door_closed': PinWatcher((0x20,1), 4, i2c, invert=True),
'door_locked': PinWatcher((0x20,0), 4, i2c, invert=True),
'space_active': PinWatcher((0x20,3), 4, i2c, invert=True),
'bell_ringing': PinWatcher(Pin(0x20,2), 2, i2c, invert=True),
'door_closed': PinWatcher(Pin(0x20,1), 4, i2c, invert=True),
'door_locked': PinWatcher(Pin(0x20,0), 4, i2c, invert=True),
'space_active': PinWatcher(Pin(0x20,3), 4, i2c, invert=True),
}
self._sm = state_machine
self._i2c = i2c
......@@ -67,7 +68,7 @@ class PinsWatcher():
pin = self._pins[name]
if pin.read():
saw_change = True
logger.debug("Pin %s changed to %d" % (name, pin.state()))
logger.debug("Pin {0} changed to {1}".format(name, pin.state()))
if not saw_change:
return None
# create return object
......
from busfahrer import I2C
from busfahrer import I2C, Pin
import time
......@@ -7,9 +7,9 @@ b = I2C(1)
b.activate()
i = 0
while True:
b.setPin((0x21, 4), i % 2 == 0)
b.setPin((0x21, 5), (i//2) % 2 == 0)
b.setPin((0x21, 6), (i//4) % 2 == 0)
b.setPin((0x21, 7), (i//8) % 2 == 0)
b.setPin(Pin(0x21, 4), i % 2 == 0)
b.setPin(Pin(0x21, 5), (i//2) % 2 == 0)
b.setPin(Pin(0x21, 6), (i//4) % 2 == 0)
b.setPin(Pin(0x21, 7), (i//8) % 2 == 0)
time.sleep(0.2)
i += 1
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment