๋ชฉ์ฐจ
๊ฐ๋จํ ๋ฒ์
import serial
import time
import signal
import threading
line = []
# ๋ผ์ธ ๋จ์๋ก ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ ๋ฆฌ์คํธ ๋ณ์
port = 'COM1'
# ์๋ฆฌ์ผ ํฌํธ
baud = 9600
# ์๋ฆฌ์ผ ๋ณด๋๋ ์ดํธ
exitThread = False
# ์ฐ๋ ๋ ์ข
๋ฃ์ฉ ๋ณ์
# ์ฐ๋ ๋ ์ข
๋ฃ์ฉ ์๊ทธ๋ ํจ์
def handler(signum, frame):
exitThread = True
# ๋ฐ์ดํฐ ์ฒ๋ฆฌํ ํจ์
def parsing_data(data):
tmp = ''.join(data)
# ๋ฆฌ์คํธ๋ก ๋ค์ด์๊ธฐ์ ์คํธ๋ง์ผ๋ก ํฉ์น๋ค.
print(tmp)
# ๋ณธ ์ฐ๋ ๋
def readThread(ser):
global line
global exitThread
while not exitThread:
for c in ser.read():
line.append(chr(c))
if c == 10:
# 10์ด ๊ฐํ๋ฌธ์ ๋ท ๋ถ๋ถ(CRLF)
parsing_data(line)
del line[:]
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
# signal.SIGINT(ctrl+c)์ ๋ํ signal์ฒ๋ฆฌ
ser = serial.Serial(port, baud, timeout=0)
# ์๋ฆฌ์ผ ํฌํธ ์ด๊ธฐ
thread = threading.Thread(target=readThread, args=(ser,))
thread.start()
๋ณต์กํ ๋ฒ์
main.py
from Serial_Read import Protocol, ReaderThread
import serial
import time
# ํค ์ธ๋ฑ์ค
KEY_LEFT = 0
KEY_UP = 1
KEY_RIGHT = 2
KEY_DOWN = 3
KEY_SELECT = 4
KEY_START = 5
KEY_SQUARE = 6
KEY_TRIANGLE = 7
KEY_X = 8
KEY_CIRCLE = 9
# ํค ๋งคํ
Keymap = {
b'a': [KEY_LEFT, 'KEY_LEFT'],
b'w': [KEY_UP, 'KEY_UP'],
b'd': [KEY_RIGHT, 'KEY_RIGHT'],
b's': [KEY_DOWN, 'KEY_DOWN'],
b'1': [KEY_SELECT, 'KEY_SELECT'],
b'2': [KEY_START, 'KEY_START'],
b'y': [KEY_SQUARE, 'KEY_SQUARE'],
b'u': [KEY_TRIANGLE, 'KEY_TRIANGLE'],
b'h': [KEY_X, 'KEY_X'],
b'j': [KEY_CIRCLE, 'KEY_CIRCLE'],
}
# ํ๋กํ ์ฝ
class rawProtocal(Protocol):
# ์ฐ๊ฒฐ ์์์ ๋ฐ์
def connection_made(self, transport):
self.transport = transport
self.running = True
# ์ฐ๊ฒฐ ์ข
๋ฃ์ ๋ฐ์
def connection_lost(self, exc):
self.transport = None
# ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ฉด ์ด๊ณณ์์ ์ฒ๋ฆฌํจ.
def data_received(self, data):
# ์
๋ ฅ๋ ๋ฐ์ดํฐ์ ํค๋งต์ ๋น๊ตํด์ ์๋ค๋ฉด
if data in Keymap:
# ๋งคํ๋ ํค์ ๋ฌธ์์ด ์ถ๋ ฅ
print(Keymap[data][1])
# Keymap[data][0]์ ํค ์ธ๋ฑ์ค
# ๋งค์นญ๋๋ key ์ธ๋ฑ์ค ๊ฐ์ ธ์ด
key = Keymap[data][0]
# ๋งคํ ํค๊ฐ CIRCLE ํค์ด๋ฉด ํ๋ก๊ทธ๋จ ์ข
๋ฃ
if key == KEY_CIRCLE:
self.running = False
else:
print('Unknown data', data)
# ๋ฐ์ดํฐ ๋ณด๋ผ ๋ ํจ์
def write(self, data):
print(data)
self.transport.write(data)
# ์ข
๋ฃ ์ฒดํฌ
def isDone(self):
return self.running
# ํฌํธ ์ค์
PORT = '/dev/ttyUSB0'
# ์ฐ๊ฒฐ
ser = serial.serial_for_url(PORT, baudrate=9600, timeout=1)
# ์ฐ๋ ๋ ์์
with ReaderThread(ser, rawProtocal) as p:
while p.isDone():
time.sleep(1)
Serial_Read.py
import serial
import threading
class Protocol(object):
"""\
Protocol as used by the ReaderThread. This base class provides empty
implementations of all methods.
"""
def connection_made(self, transport):
"""Called when reader thread is started"""
def data_received(self, data):
"""Called with snippets received from the serial port"""
def connection_lost(self, exc):
"""\
Called when the serial port is closed or the reader loop terminated
otherwise.
"""
if isinstance(exc, Exception):
raise exc
class ReaderThread(threading.Thread):
"""\
Implement a serial port read loop and dispatch to a Protocol instance (like
the asyncio.Protocol) but do it with threads.
Calls to close() will close the serial port but it is also possible to just
stop() this thread and continue the serial port instance otherwise.
"""
def __init__(self, serial_instance, protocol_factory):
"""\
Initialize thread.
Note that the serial_instance' timeout is set to one second!
Other settings are not changed.
"""
super(ReaderThread, self).__init__()
self.daemon = True
self.serial = serial_instance
self.protocol_factory = protocol_factory
self.alive = True
self._lock = threading.Lock()
self._connection_made = threading.Event()
self.protocol = None
def stop(self):
"""Stop the reader thread"""
self.alive = False
if hasattr(self.serial, 'cancel_read'):
self.serial.cancel_read()
self.join(2)
def run(self):
"""Reader loop"""
if not hasattr(self.serial, 'cancel_read'):
self.serial.timeout = 1
self.protocol = self.protocol_factory()
try:
self.protocol.connection_made(self)
except Exception as e:
self.alive = False
self.protocol.connection_lost(e)
self._connection_made.set()
return
error = None
self._connection_made.set()
while self.alive and self.serial.is_open:
try:
# read all that is there or wait for one byte (blocking)
data = self.serial.read(self.serial.in_waiting or 1)
except serial.SerialException as e:
# probably some I/O problem such as disconnected USB serial
# adapters -> exit
error = e
break
else:
if data:
# make a separated try-except for called used code
try:
self.protocol.data_received(data)
except Exception as e:
error = e
break
self.alive = False
self.protocol.connection_lost(error)
self.protocol = None
def write(self, data):
"""Thread safe writing (uses lock)"""
with self._lock:
print(data)
self.serial.write(data)
def close(self):
"""Close the serial port and exit reader thread (uses lock)"""
# use the lock to let other threads finish writing
with self._lock:
# first stop reading, so that closing can be done on idle port
self.stop()
self.serial.close()
def connect(self):
"""
Wait until connection is set up and return the transport and protocol
instances.
"""
if self.alive:
self._connection_made.wait()
if not self.alive:
raise RuntimeError('connection_lost already called')
return (self, self.protocol)
else:
raise RuntimeError('already stopped')
# - - context manager, returns protocol
def __enter__(self):
"""\
Enter context handler. May raise RuntimeError in case the connection
could not be created.
"""
self.start()
self._connection_made.wait()
if not self.alive:
raise RuntimeError('connection_lost already called')
return self.protocol
def __exit__(self, exc_type, exc_val, exc_tb):
"""Leave context: close port"""
self.close()