架构模式
Pipe-Filter 模式
非常适合与数据处理及处理分析系统
Filter封装数据处理的功能
松耦合:Filter只跟数据(格式)耦合
Pipe 用于链接Filter传递数据或者在异步处理过程中缓冲数据流。
进程同步调用时,pipe演变为数据在方法调用间传递。
package pipe_filter
// Request is the input of the filter
type Request interface {}
// Response is the uotput of the filter
type Response interface {} // 接受各种数据类型
// filter interface is the deifnition of the data processing components
// Pipe-Filter structure
type Filter interface {
Process(data Request) (Request, error)
}
package pipe_filter
import (
"errors"
"strings"
)
var SplitFilterWrongFormatError = errors.New("input data should be string")
type SplitFilter struct {
delimiter string
}
func NewSplitFilter(delimiter string) *SplitFilter {
return &SplitFilter{delimiter}
}
func (sf *SplitFilter) Process(data Request) (Request, error){
str,ok := data.(string) // 检查数据格式,类型,是否可以处理
if !ok{
return nil, SplitFilterWrongFormatError
}
parts := strings.Split(str, sf.delimiter)
return parts, nil
}
package pipe_filter
import "errors"
var SumFilterWrongFormatError = errors.New("input data should be []int")
type SumFilter struct {
}
func NewSumFilter() *SumFilter {
return &SumFilter{}
}
func (sf *SumFilter) Process(data Request) (Request, error){
elems, ok:= data.([]int)
if !ok {
return nil, SplitFilterWrongFormatError
}
ret := 0
for _,elem := range elems{
ret += elem
}
return ret, nil
}
package pipe_filter
import (
"errors"
"strconv"
)
var ToIntFilterWrongFormatError = errors.New("input date should be []string")
type ToIntFilter struct {
}
func NewToTinFilter() *ToIntFilter{
return &ToIntFilter{}
}
func (tif *ToIntFilter) Process(data Request) (Request, error) {
parts, ok:=data.([]string)
if !ok{
return nil, ToIntFilterWrongFormatError
}
ret := []int{}
for _,part := range parts{
s, err:=strconv.Atoi(part)
if err != nil{
return nil, err
}
ret = append(ret, s)
}
return ret, nil
}
package pipe_filter
func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline{
return &StraightPipeline{
Name: name,
Filters: &filters,
}
}
type StraightPipeline struct {
Name string
Filters *[]Filter
}
func (f *StraightPipeline) Process(data Request) (Response, error){
var ret interface{}
var err error
for _, filter := range *f.Filters{
ret, err = filter.Process(data)
if err != nil{
return ret, err
}
data = ret
}
return ret, err
}
package pipe_filter
import "testing"
func TestStraightPipeline(t *testing.T){
spliter := NewSplitFilter(",")
converter := NewToTinFilter()
sum := NewSumFilter()
sp := NewStraightPipeline("p1",spliter,converter,sum)
ret,err:= sp.Process("1,2,3")
if err !=nil{
t.Fatal(err)
}
if ret != 6{
t.Fatalf("The expected is 6, but the actual is %d", ret)
}
}