Python设计模式 - littleboy12580/learning_python GitHub Wiki
设计模式概念
设计模式是针对程序设计中可能遇到的一些常见问题(Known Uses),根据面向对象的程序设计原则(单一职责原则、开放封闭原则、依赖倒置原则、里氏替换原则、接口隔离原则、迪米特原则)以及先人的程序设计经验,所给出的一套优雅解决方案
设计模式的初衷是解决问题,不是创造问题,所以不要为了套用设计模式而去设计模式
创建型模式
创建型模式,就是创建对象的模式,抽象了实例化的过程。它帮助一个系统独立于如何创建、组合和表示它的那些对象。关注的是对象的创建,创建型模式将创建对象的过程进行了抽象,也可以理解为将创建对象的过程进行了封装,作为客户程序仅仅需要去使用对象,而不再关心创建对象过程中的逻辑
单例模式
目的:保证一个类仅有一个实例,并提供一个访问它的全局访问点;Python 实现单例模式有两个常用的方法。
方法一:思路是使用 module 替代 class 来使用
class Singleton(type):
def __init__(cls, name, bases, attrs):
super(Singleton, cls).__init__(name, bases, attrs)
cls._instance = None
def __call__(cls, *args, **kwargs):
raise NotImplementedError('Call `get_instance()` to get instance')
def get_instance(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instance
class db(object):
__metaclass__ = Singleton
class MySQL(db): pass
obj1, obj2 = db.get_instance(), db.get_instance()
obj3, obj4 = MySQL.get_instance(), MySQL.get_instance()
print id(obj1), id(obj2), id(obj3), id(obj4)
### OUTPUT ###
# 4391229776 4391229776 4391229840 4391229840
方法二:使用元类(metaclass)
class db(object):
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '_inst'):
cls._inst = super(db, cls).__new__(cls, *args, **kwargs)
return cls._inst
class MySQL(db): pass
obj1, obj2, obj3 = db(), db(), MySQL()
print id(obj1), id(obj2), id(obj3)
### OUTPUT ###
# 4527400912 4527400912 4527400912
工厂模式
目的:提供一个创建一系列相关或互相依赖对象的接口,而无需指定它们具体的类;工厂模式的核心思想是对象实例化过程的封装
class EnglishTranslator(object):
def get(self, msgid):
return str(msgid)
class ChineseTranslator(object):
def __init__(self):
self.trans = dict(Alipay="支付宝")
def get(self, msgid):
return self.trans.get(msgid, str(msgid))
def i18n_factory(language="Chinese"):
languages = dict(English=EnglishTranslator, Chinese=ChineseTranslator)
return languages[language]()
for local in ["English", "Chinese"]:
""" Duck Typing """
translator = i18n_factory(language=local)
print translator.get("Alipay")
### OUTPUT ###
# Alipay
# 支付宝
对象池模式
目的:将对象集合池化,实现对象集合的管理以及对象的分发
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
class WorkThread(threading.Thread):
"""
线程池中的工作线程
"""
def __init__(self, work_queue, result_queue, timeout, **kwargs):
super(WorkThread, self).__init__(kwargs=kwargs)
# 线程在结束前等待任务队列多长时间
self.timeout = timeout
# 将主线程设置为守护进程
self.setDaemon(True)
self.work_queue = work_queue
self.result_queue = result_queue
self.start()
def run(self):
"""
重写threading.Thread的run函数
"""
while True:
try:
# 轮询工作队列,并获取一个任务,最大等待时间设置为timeout
callable, args, kwargs = self.work_queue.get(timeout=self.timeout)
res = callable(*args, **kwargs)
# 任务返回的结果放在结果队列中
self.result_queue.put(res)
self.work_queue.task_done()
except Queue.Empty:
# 超时返回Queue.Empty,任务队列空的时候结束此线程
break
except Exception as e:
print "Error: ", str(e)
raise
class ThreadPool(object):
"""
线程池
num_of_workthread: 工作线程个数
timeout: 轮询等待时间(秒)
"""
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '_inst'):
cls._inst = super(ThreadPool, cls).__new__(cls, *args, **kwargs)
return cls._inst
def __init__(self, num_of_workthread, timeout):
# Queue.Queue() 创建一个线程安全队列
self.work_queue = Queue.Queue()
self.result_queue = Queue.Queue()
self.workthread_list = []
self._createThreadPool(num_of_workthread, timeout)
def _createThreadPool(self, num_of_workthread, timeout):
"""
创建工作线程组
"""
for i in range(num_of_workthread):
thread = WorkThread(self.work_queue, self.result_queue, timeout)
self.workthread_list.append(thread)
def wait_for_complete(self):
"""
等待所有工作线程完成
"""
while len(self.workthread_list):
thread = self.workthread_list.pop()
# 等待线程结束
if thread.isAlive():
# 判断线程是否还存活来决定是否调用join
thread.join()
def add_job(self, callable, *args, **kwargs):
"""
向工作队列中添加任务
"""
self.work_queue.put((callable, args, kwargs))
if __name__ == '__main__':
"""
分段计算sum(9999)
注:演示用例,实际上 Python 多线程处理计算密集型任务反而效率更低
"""
def demo_task(seq):
return sum(seq)
threadpool_inst = ThreadPool(num_of_workthread=10, timeout=1)
for i in range(100):
threadpool_inst.add_job(demo_task, range(100*i, 100*i+100))
threadpool_inst.wait_for_complete()
result = []
while threadpool_inst.result_queue.qsize():
result.append(threadpool_inst.result_queue.get())
print sum(result)
### OUTPUT ###
# 49995000
结构型模式
结构型模式是为解决怎样组装现有的类,设计他们的交互方式,从而达到实现一定的功能的目的。结构型模式包容了对很多问题的解决。例如:扩展性(外观、组成、代理、装饰)封装性(适配器,桥接)
装饰器模式
目的:动态地给一个对象添加一些额外的职责
def PermissionChecker(fn):
def wrapped(*args, **kwargs):
# 访问权限判断
if session in permission_group:
# 通过校验
return fn(*args, **kwargs)
else:
# 未通过校验
return Response(status=403)
return wrapped
@route(...)
@PermissionChecker
def controller():
pass
代理模式
目的:为其他对象提供一种代理以控制对这个对象的访问
代理模式常用于以下场景:
- 远程代理:以本地对象来代表不同地址空间的其它对象,例如 Python 的 RPyC 库
- 安全代理:为对象的访问添加访问控制
- 虚拟代理:用一个轻量级的对象来替代创建开销非常大的一个对象,延缓真实对象的创建,实现性能优化。(例如在网速不佳时,浏览器渲染页面时只会引入大图片的路径和尺寸信息,图片的载入被延迟以加快页面渲染的速度)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from abc import ABCMeta, abstractmethod
class Subject(object):
__metaclass__ = ABCMeta
@abstractmethod
def get_secret(self): pass
class Secret(Subject):
_secret = ""
def get_secret(self):
return self._secret
class Proxy(Subject):
def __init__(self):
self.real_subject = None
def get_secret(self):
permission check here...
self.real_subject = Secret()
return self.real_subject.get_secret()
外观模式
目的:为子系统中的一组接口提供一个一致的界面,Facade 模式定义了一个高层接口,这个接口使得这一子系统更加容易使用;如果你正在设计一套 API ,那么你就是在实现一个外观模式
# Complex Parts
class CPU(object):
def freeze(self): pass
def jump(self, position): pass
def execute(self): pass
class Memory(object):
def load(self, position, data): pass
class HardDrive(object):
def read(self, lba, size): pass
# Facade
class ComputerFacade(object):
def __init__(self):
self.processor = CPU()
self.ram = Memory()
self.hd = HardDrive()
def start(self):
self.processor.freeze()
self.ram.load(BOOT_ADDRESS, self.hd.read(BOOT_SECTOR, SECTOR_SIZE))
self.processor.jump(BOOT_ADDRESS)
self.processor.execute()
# Client
computer_facade = ComputerFacade()
computer_facade.start()
适配器模式
目的:将一个类的接口转换成客户希望的另外一个接口;与外观模式比较类似的都是包装接口,区别在于外观模式是包装子系统、或者说是下层系统的接口,而适配器模式包装的是其他服务的标准接口
class CommonService(object):
def business(self, *args, **kwargs):
print "Call business"
class SpecialService(object):
def special_business(self, param, *args, **kwargs):
print "Call special_business, param: {0}".format(param)
class Adapter(object):
def __init__(self, obj, **adapted_methods):
""" 适配方法 """
self.obj = obj
self.__dict__.update(adapted_methods)
def __getattr__(self, attr):
""" 其它未适配方法的处理 """
return getattr(self.obj, attr)
def original_dict(self):
""" 被适配方法 """
return self.obj.__dict__
common_service = CommonService()
special_service = SpecialService()
objects = list()
objects.append(common_service)
objects.append(Adapter(obj=special_service,
business=lambda: special_service.special_business("special_param")))
for obj in objects:
obj.business()
### OUTPUT ###
# Call business
# Call special_business, param: special_param
行为型模式
行为型模式涉及到算法和对象间职责的分配,行为模式描述了对象和类的模式,以及它们之间的通信模式,行为型模式刻划了在程序运行时难以跟踪的复杂的控制流可分为行为类模式和行为对象模式1.行为模式使用继承机制在类间分派行为2.行为对象模式使用对象聚合来分配行为。一些行为对象模式描述了一组对等的对象怎样相互协作以完成其中任何一个对象都无法单独完成的任务
状态模式
目的:允许一个对象在其内部状态改变时改变它的行为(状态机)
下面是一个简单的FSM Python实现
import itertools
class FSMException(Exception): pass
class DuplicateRule(FSMException):
""" 状态机转换规则冲突 """
pass
class InvalidStateTransition(FSMException):
""" 无效的输入事件 """
pass
class State(object):
""" 装饰器:状态机中的常规状态 """
def __init__(self, func):
self._func = func
def __call__(self, event, old_state):
return self._func(event, old_state)
@property
def name(self):
return self._func.__name__
class TerminalState(State):
""" 装饰器:状态机中的终止状态 """
pass
class Event(dict):
""" 输入事件 """
def __init__(self, name, *args, **kwargs):
self._name = name
super(Event, self).__init__(*args, **kwargs)
@property
def name(self):
return self._name
class FiniteStateMachine(object):
""" 状态机生成器 """
def __init__(self):
self._state = None
self._rules = {}
def add_rule(self, event_types, valid_states, end_state):
"""
event_types: 输入的事件信息
valid_states: 对该事件生效的状态机状态
end_state:该事件生效后的状态机状态
"""
for trigger in list(itertools.product(event_types, valid_states)):
if trigger in self._rules:
raise DuplicateRule(*trigger)
self._rules[trigger] = end_state
def start(self, event):
while self._state is None or not isinstance(self._state, TerminalState):
trigger = (event.name, self.state)
if trigger not in self._rules:
raise InvalidStateTransition(*trigger)
old_state = self.state
self._state = self._rules[trigger]
# 执行当前事件输入的行为,并产生下一个事件输入
event = self._state(event, old_state)
def reset(self):
""" 清空当前状态机状态 """
self._state = None
@property
def state(self):
return self._state
下面是该FSM的使用方法
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from fsm import *
import time
@State
def state_a(event, old_state):
if event["count"] > 0:
print "State A, Count: {}".format(event["count"])
time.sleep(1)
event["count"] -= 1
return Event("to_state_b", **event)
else:
return Event("to_state_terminal", **event)
@State
def state_b(event, old_state):
if event["count"] > 0:
print "State B, Count: {}".format(event["count"])
time.sleep(1)
event["count"] -= 1
return Event("to_state_c", **event)
else:
return Event("to_state_terminal", **event)
@State
def state_c(event, old_state):
if event["count"] > 0:
print "State C, Count: {}".format(event["count"])
time.sleep(1)
event["count"] -= 1
return Event("to_state_a", **event)
else:
return Event("to_state_terminal", **event)
@TerminalState
def state_terminal(event, old_state):
print "Stop."
def build_fsm():
f = FiniteStateMachine()
# add_rule([输入事件], [该输入事件有效的当前状态], 输入事件后的下一个状态)
f.add_rule(["to_state_a"], [None, state_c], state_a)
f.add_rule(["to_state_b"], [state_a], state_b)
f.add_rule(["to_state_c"], [state_b], state_c)
f.add_rule(["to_state_terminal"], [state_a, state_b, state_c], state_terminal)
return f
fsm = build_fsm()
fsm.start(Event("to_state_a", count=10))
### OUTPUT ###
# State A, Count: 10
# State B, Count: 9
# State C, Count: 8
# State A, Count: 7
# State B, Count: 6
# State C, Count: 5
# State A, Count: 4
# State B, Count: 3
# State C, Count: 2
# State A, Count: 1
# Stop.
观察者模式
目的:定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""http://code.activestate.com/recipes/131499-observer-pattern/"""
class Subject(object):
def __init__(self):
self._observers = []
def attach(self, observer):
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer):
try:
self._observers.remove(observer)
except ValueError:
pass
def notify(self):
for observer in self._observers:
observer.update(self)
# Example usage
class Data(Subject):
def __init__(self, name=''):
Subject.__init__(self)
self.name = name
self._data = 0
@property
def data(self):
return self._data
@data.setter
def data(self, value):
self._data = value
self.notify()
class Viewer(object):
def update(self, subject):
print 'Viewer: Subject {0} has data {1}'.format(subject.name, subject.data)
# Example usage...
data = Data('Data')
view = Viewer()
data.attach(view)
print("Setting Data = 10")
data.data = 10
print("Setting Data = 3")
data.detach(view)
print("Setting Data = 10")
data.data = 10
### OUTPUT ###
# Setting Data = 10
# Viewer: Subject Data has data 10
# Setting Data = 3
# Setting Data = 10
发布订阅模式
目的:订阅发布模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态
class Provider(object):
""" 消息产生者 """
def __init__(self):
self.msg_queue = []
self.subscribers = {}
def notify(self, msg):
""" 添加发布的消息 """
self.msg_queue.append(msg)
def subscribe(self, msg, subscriber):
""" 添加对 msg 消息的订阅者 subscriber """
self.subscribers.setdefault(msg, []).append(subscriber)
def unsubscribe(self, msg, subscriber):
""" 删除对 msg 消息的订阅者 subscriber """
self.subscribers[msg].remove(subscriber)
def update(self):
for msg in self.msg_queue:
if msg in self.subscribers.keys():
for sub in self.subscribers[msg]:
sub.run(msg)
self.msg_queue = []
class Publisher(object):
""" 消息发布者 """
def __init__(self, msg_center):
self.provider = msg_center
def publish(self, msg):
""" 发布消息 """
self.provider.notify(msg)
class Subscriber(object):
""" 消息的订阅者 """
def __init__(self, name, msg_center):
self.name = name
self.provider = msg_center
def subscribe(self, msg):
""" 订阅消息 """
self.provider.subscribe(msg, self)
def run(self, msg):
print("{} got {}".format(self.name, msg))
message_center = Provider()
cctv = Publisher(message_center)
s1 = Subscriber("s1", message_center)
s1.subscribe("movie")
s2 = Subscriber("s2", message_center)
s2.subscribe("music")
cctv.publish("movie")
cctv.publish("music")
cctv.publish("ads")
message_center.update()
### OUTPUT ###
# s1 got movie
# s2 got music
生产消费者模式
目的:引入阻塞队列来解决生产者和消费者的耦合问题,使生产者和消费者不直接通讯,而是使用阻塞队列来进行通讯
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""http://wilbeibi.com/2015/03/2015-03-02-coroutine-producer-comsumer/"""
import time
import random
def coroutine(func):
# A wrapper to convert function into generator
# From David Beazley
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start
def producer(target):
while True:
time.sleep(1)
data = random.randint(1, 10)
print ("# producer: sending data {}".format(data))
target.send(data)
@coroutine
def consumer():
while True:
data = yield
print ("# consumer: receving data {}".format(data))
if __name__ == '__main__':
g_consumer = consumer()
producer(g_consumer)
责任链模式
目的:使多个对象都有机会处理请求,从而避免请求的发送者和接收者之间的耦合关系。将这些对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止;现实中的一个典型责任链模式应用就是工单的审批流。此外 Web 开发中稍微复杂一点的过滤器和拦截器也都可以用责任链模式实现
class Handler(object):
def __init__(self, successor=None):
self._successor = successor
def handle(self, request):
res = self._handle(request)
if not res:
self._successor.handle(request)
def _handle(self, request):
raise NotImplementedError('Must provide implementation in subclass.')
class ConcreteHandler1(Handler):
def _handle(self, request):
if 0 < request <= 10:
print('request {} handled in handler 1'.format(request))
return True
class ConcreteHandler2(Handler):
def _handle(self, request):
if 10 < request <= 20:
print('request {} handled in handler 2'.format(request))
return True
class DefaultHandler(Handler):
def _handle(self, request):
print('end of chain, no handler for {}'.format(request))
return True
class Client(object):
def __init__(self):
default_handler = DefaultHandler()
concrete_handler_2 = ConcreteHandler2(default_handler)
self.handler = ConcreteHandler1(concrete_handler_2)
def delegate(self, requests):
for request in requests:
self.handler.handle(request)
if __name__ == "__main__":
import random
client1 = Client()
requests = [random.randint(1, 30) for i in xrange(10)]
client1.delegate(requests)