Commit e4fd944b authored by Agrigor's avatar Agrigor

Merge branch 'master' of gitlab:hacksaar/Jeopardy

parents f2ed0301 898f6980
*.kde*
.kde*/
.idea
\ No newline at end of file
.idea
*.kate*
#!/usr/bin/python
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~ IMPORT LIBRARIES ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
import sys
import threading
import socket
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~ GLOBAL CONSTANTS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
PORT = 50000
IP = "127.0.0.1"
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~ CLASS DEFINITION ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class Comunication:
def __init__(self, ip, port):
self.__ip = ip
self.__port = port
def check_ip(self):
try:
socket.inet_aton(self.__ip)
return True
except OSError:
return False
def create_client_socket(self):
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__socket.bind(("", self.__port))
def receive(self, bytes):
return self.__socket.recvfrom(bytes)
def cleanup(self):
self.__socket.close()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ MAIN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class Client:
def __init__(self, ip, port):
self.UDP = Comunication(ip, port)
self.UDP.create_client_socket()
def main(self):
try:
threading.Event().wait()
finally:
self.cleanup()
def cleanup(self):
self.UDP.cleanup()
print("\n Bye Bye")
sys.exit(0)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PROGRAM ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Start Client
client = Client(IP, PORT)
client.main()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ END ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......@@ -9,6 +9,8 @@ import RPi.GPIO as GPIO
import sys
import socket
import threading
import time
import random
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......@@ -18,7 +20,6 @@ import threading
BUZZER_PINS = [35, 36, 37, 38]
BOUNCE = 300
PORT = 50000
......@@ -44,20 +45,20 @@ class Communication:
buzzer.register_observer(self)
def notify(self, buzzer):
self.__msg = "Button " + str(buzzer.number) + " pressed"
self.send(self.msg)
print(self.msg + " -> " + self.__target_ip)
msg = "Button " + str(buzzer.number) + " pressed"
self.send(msg)
print(msg + " -> " + self.__target_ip)
class Buzzer:
def __init__(self, number, board_pin, bounce, observers=set()):
def __init__(self, number, board_pin, bounce):
self.number = number
self.pin = board_pin
self.__bounce = bounce
GPIO.setmode(GPIO.BOARD)
GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
self.create_event()
self.observers = observers
self.observers = set()
def create_event(self):
GPIO.add_event_detect(self.pin,
......@@ -82,10 +83,11 @@ class Buzzer:
class Server:
def __init__(self, ip, port):
def __init__(self, ip, port, simulation=0):
self.UDP = Communication(ip, port)
self.buzzers = [Buzzer(i, pin, BOUNCE) for i, pin in enumerate(BUZZER_PINS, start=1)]
self.UDP.observe_buzzers(self.buzzers)
self.sim = int(simulation)
# Initialize target ip address
try:
socket.inet_aton(ip)
......@@ -99,20 +101,35 @@ class Server:
print("\n Bye Bye")
sys.exit(0)
def send_simulation(self):
msg = "Button " + str(random.randint(1,4)) + " pressed"
print(msg)
self.UDP.send(msg)
def main(self):
try:
threading.Event().wait()
if self.sim == 1:
while True:
self.send_simulation()
time.sleep(1)
else:
threading.Event().wait()
finally:
self.cleanup()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PROGRAM ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Test if second script argument exists and
# set simulation mode with "1" or unset with "0"
if len(sys.argv) == 3:
SIM = sys.argv[2]
else:
SIM = 0
# Start Server
server = Server(sys.argv[1], PORT)
server = Server(sys.argv[1], PORT, SIM)
server.main()
......
#!/usr/bin/python
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~ IMPORT LIBRARIES ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
import sys
import socket
import time
import random
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~ GLOBAL CONSTANTS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
IP = "127.0.0.1"
PORT = 50000
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~ CLASS DEFINITION ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class Communication:
def __init__(self, ip, port):
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__target_ip = ip
self.__target_port = port
def send(self, message):
self.__socket.sendto(str.encode(message), (self.__target_ip, self.__target_port))
def cleanup(self):
self.__socket.close()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ MAIN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class Server:
def __init__(self, ip, port):
self.UDP = Communication(ip, port)
def cleanup(self):
self.UDP.cleanup()
print("\n Bye Bye")
sys.exit(0)
def send_simulation(self):
msg = "Button " + str(random.randint(1, 4)) + " pressed"
print(msg)
self.UDP.send(msg)
def main(self):
try:
while True:
self.send_simulation()
time.sleep(1)
finally:
self.cleanup()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PROGRAM ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Start Server
server = Server(IP, PORT)
server.main()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ END ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment