【框架学习】Apache Camel 入门 - hippowc/hippowc.github.io GitHub Wiki
以下内容参考:
- Camel in Action
消息模型
有两个抽象消息模型,一个是Message:基础实体,包含了要被传输的data;一个是Exchange:包含一个in Message和一个out Message
Message
消息有一个唯一的String标识。
- Header中保存消息相关信息,譬如发送者标识,内容编码,授权信息,是name-value对。还可以有attachment用于web service和email组件
- Body 类型为Object,可以存任意类型任意大小
- Fault Flag 区别返回值和错误信息
Exchange
是消息的容器,为多种交互类型提供支持,或者叫message exchange patterns(MEPs),常常用来区分one-way和request-response消息风格,支持如下风格:
- InOnly one-way消息或者事件消息,譬如JMS消息经常是one-way消息
- InOut request-response消息,譬如基于http传输的就是这种
Exchange可以通过getIn和getOut获取进来和返回的消息,每一步的out消息被用作下一步的in消息,如果没有out消息,in消息就会被复用传递到下一步。
在InOut模式下,最后一步的out消息会被返回给最初的消息生产者,在InOnly模式下,最后一步的out消息会被扔掉
模型结构:
- Exchange ID 标识,Camel自动生成
- MEP 定义模式,是InOnly还是InOut
- Exception 路由的过程中发生错误,异常信息会放到这里
- Properties 类似于Message的headers,在整个交换期间都存在,存放global级别的信息,Message的header只是特定的某个消息
- In message
- Out message 当MeP为InOut时存在
Camel架构
CamelContext
可以看做Camel的运行时系统,把所有功能都组合到了一起
- Components,使用的组件
- Endpoint,包含已经被使用的endpoint
- Routes,所有被加入和routes
- Type converters,被加载的type converters,camel可以自动或手动转换类型
- Data formats,格式转换
- Registry, 可以使用registry查找bean
- Languages,使用很多语言创建表达式
routing engine
路由引擎在底层移动消息,没有暴露给开发者,保证消息被正常的路由,不建议在一个route中使用多个input,虽然在2.X是支持的,但后续会移除掉。
DSL
在camel中DSL是一个流式的java api,其中包含的方法使用EIP命名
Processor
camel中一个核心的概念,表示一个可以使用,创建或者修改一个exchange的节点,一个路由开始于一个consumer(外部系统的信息消费者),大部分情况,processor不设置out信息,这是in message会被复用,在route的最后,如果exchange的MEP是InOut,那么就需要发送一个reply到最开始的route调用方
所有Camel中的EIPs都是使用processor实现的,自己也可以实现一个简单的Processor接口
Component
Component是Camel的主要扩展点,目前有280多个components(具体在chapter 8),在开发者的角度,component和uri中使用的名字关联,他们实际是一个endpoints的工厂。譬如:FileComponent 和uri中的file关联,它会创建FileEndpoints
Endpoint
Endpoint是哪些一个系统中可以收发消息的抽象模型,在运行时,camel根据uri的表达式查找Endpoint,格式 -- scheme:context path ? options
- scheme标识Endpoint的类型,譬如file选择FileComponent
- 然后FileComponent作为一个工厂类,根据uri创建FileEndpoint
Producer
表示一个可以发送消息到Endpoint的实体。当一个一个消息发送到一个Endpoint时,producer要做些事情是的消息数据和特定的Endpoint匹配,譬如:FileProducer会把消息数据写到一个file中,JmsProducer会将Camel的消息和jms的消息做映射,这个是camel重要的功能,它隐藏了和其他特定传输的复杂性。
也就是说在该组件使用 to 的时候回用到producer,在producer中可以拿到exchanger原始消息,并可以通过process方法将他处理成我们需要的形式,还可以根据Endpoint拿到用户相关信息。
Consumer
consumer接受外部系统的消息(不是流程内部从to接受的消息,而是流程外部),包装到exchange中,在调用processor处理他们
- 事件驱动型consumer:和cs架构以及web服务有关,也被认为是异步的接受者,监听特定的消息通道,譬如:tcpIP端口,jms队列,websocket等
- 拉取型consumer:它主动向特定源获取消息,也被认为是同步接受者,一个特色的就是scheduled polling consumer,像File,FTP和email组件都是使用的scheduled polling consumer
目前理解,该组件使用到from的时候回用到该组件,而不是前面有个消息to这个组件时会使用到consumer
一个例子
从ftp服务器中下载订单:ftp://xxx.com/orders?username=xx&password=xx
camel先从component registry中查找FTPComponent组件,然后组件根据context path和options创建FTPEndpoint,FTPComponent不是camel-core的模块,需要增加camel-ftp依赖。
使用jms需要先配置component,将其加入到camelContext中
和spring整合
可以不需要手动将component加到camelContext中,通过spring的bean声明,会自动的将component注入到其中去
EIP
the Content-Based Router(cbr)
Using message filters
Using multicasting
Using recipient lists
Using the wireTap method
需要一个第二目的地,保存日志使用
自定义组件
一般来说,首先建议考虑使用Bean或者Processor的方式来完成自定义Camel组件的目的。所以如果能够使用一个简单的Class来封装逻辑,那是最好的方式。 Bean和Processor简单易用。但是使用Bean或者Processor有一个问题,就是一个Processor没有办法支持用户的自定义配置,因为Processor的参数只有Exchanger。所以当面临这样的问题时:你要定义一个Mapping处理组件,不同用户会有自定义的映射配置,需要根据不同的映射配置去进行处理,这时就需要自定义组件了,组件的Endpoint中的uri配置可以用于保存用户的特殊配置信息。
实际上所有的组件都是Processor。自定义组件的开发一般会涉及到四个概念:Component,Endpoint,Producer,Consumer。
Component是组件的抽象,用于创建一个Endpoint,一个Endpoint就是一个具有用户自定义参数的处理器。Producer和Consumer都是可以看做处理器,可以这么理解:Consumer要比Producer多一些处理动作,Consumer要负责从camel流程外部获取信息输入(通过监听,定时任务等方式),然后调用Processor处理外部信息,to到下一个节点。而Producer主要工作就是Processor,它从上一个节点获得上一个节点产出的原始消息Exchanger,通过调用Processor处理消息,并把消息传递给下一个或者输出
开发自定义组件其实也很简单,脚手架: mvn archetype:generate -DarchetypeGroupId=org.apache.camel.archetypes -DarchetypeArtifactId=camel-archetype-component -DarchetypeVersion=2.24.2 -DgroupId=com.alibaba.open -DartifactId=camel-ocean-api -Dname=oceanAPI -Dscheme=oceanapi
会创建出一个component项目:
- resources/META-INF/services/org/apache/camel/component/oceanapi - 允许组件自动发现的配置文件,dingbot是自定义组件的uri 格式,文件内容定义了组件类。
- OceanAPIComponent.java - 创建 “oceanapi:”端点的组件类
- OceanAPIEndpoin Endpoint的实现类,通过其URI(“oceanapi:” 在本例中)在DSL中引用。端点负责创建与之关联的Producer和Consumer实例。
- OceanAPIProducer.java - 用于将消息交换发送到端点的Producer实现。
- OceanAPIConsumer.java - 用于从端点消费消息的Consumer实现。
然后发布二方库后,使用者直接使用二方库就可以了
以下内容参考:
Apache Camel介绍
Apache Camel的官网并没有把Camel定义成一个ESB中间件服务,Apache Camel在编排模式中依托URI描述规则,实现了传输协议和消息格式的转换:HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF等等。Camel还可以嵌入到任何java应用程序中:看到了吧,Apache Camel不是ESB中间件服务,它需要依赖于相应的二次开发才能被当成ESB服务的核心部分进行使用。
Apache Camel支持使用JAVA语言和Scala语言进行DSL规则描述,也支持使用XML文件进行的规则描述。提一下,JBOSS提供了一套工具“Tools for Apache Camel”可以图形化Apache Camel的规则编排过程。
Camel要素
Endpoint 控制端点
Apache Camel中关于Endpoint最直白的解释就是,Camel作为系统集成的基础服务组件,在已经编排好的路由规则中,和其它系统进行通信的设定点。这个“其它系统”,可以是存在于本地或者远程的文件系统,可以是进行业务处理的订单系统,可以是消息队列服务,可以是提供了访问地址、访问ip、访问路径的任何服务。
Camel中的Endpoint控制端点使用URI的方式描述对目标系统的通信。例如以下URI描述了对外部MQ服务的通信,消息格式是Stomp:
// 以下代码表示从名为test的MQ队列中接收消息,消息格式为stomp
// 用户名为username,监听本地端口61613
from("stomp:queue:test?tcp://localhost:61613&login=username")
// 以下代码表示将消息发送到名为test的MQ队列中,消息格式为stomp
to("stomp:queue:test?tcp://localhost:61613&login=username");
再例如,我们可以使用Http协议和某一个外部系统进行通信,更多关于Http控制端点的说明也可参见Camel中的说明:http://camel.apache.org/http.html:
// 主动向http URI描述的路径发出请求(http的URI笔者不需要再介绍了吧)
from("http://localhost:8080/dbk.manager.web/queryOrgDetailById")
// 将上一个路由元素上Message Out中消息作为请求内容,
// 向http URI描述的路径发出请求
// 注意,Message Out中的Body内容将作为数据流映射到Http Request Body中
to("http://localhost:8080/dbk.manager.web/queryOrgDetailById")
请注意“from”部分的说明。它并不是等待某个Http请求匹配描述的URI发送到路由路径上,而是主动向http URI描述的路径发送请求。如果想要达到前者的效果,请使用Jetty/Servlet开头的相关通信方式:http://camel.apache.org/servlet.html 和 http://camel.apache.org/jetty.html。而通过Apache Camel官网中 http://camel.apache.org/uris.html 路径可以查看大部分Camel通过URI格式所支持的Endpoint。
不同的endpoint都是通过URI格式进行描述的,并且通过Camel中的org.apache.camel.Component(endpoint构建器)接口的响应实现进行endpoint实例的创建。需要注意的是,Camel通过plug方式提供对某种协议的endpoint支持,所以如果读者需要使用某种Camel的endpoint,就必须确定自己已经在工程中引入了相应的plug。例如,如果要使用Camel对Netty4-Endpoint的支持,就需要在工程中引入Camel对Netty4的支持
特殊的Endpoint Direct
Endpoint Direct用于在两个编排好的路由间实现Exchange消息的连接,上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Direct连接的下一个路由起始位置(http://camel.apache.org/direct.html)。注意,两个被连接的路由一定要是可用的,并且存在于同一个Camel服务中。
从以上执行效果我们可以看到,被连接的两个路由使用的Exchange对象是同一个,也就是说在DirectRouteB路由中如果Exchange对象中的内容发生了变化就会在随后继续执行的DirectRouteA路由中产生影响。Endpoint Direct元素在我们实际使用Camel进行路由编排时,应用频度非常高。因为它可以把多个已编排好的路由按照业务要求连接起来,形成一个新的路由,保持原有路由的良好重用。
Exchange和Message消息格式
消息在我们已经编排好的业务路径上进行传递,通过我们自定义的消息转换方式或者Apache Camel提供的消息转换方式进行消息格式转换。那么为了完成这些消息传递、消息转换过程Camel中的消息必须使用统一的消息描述格式,并且保证路径上的控制端点都能存取消息。
Camel提供的Exchange要素帮助开发人员在控制端点到处理器、处理器到处理器的路由过程中完成消息的统一描述。一个Exchange元素的结构如下图所示:
Processor 处理器
Camel中另一个重要的元素是Processor处理器,它用于接收从控制端点、路由选择条件又或者另一个处理器的Exchange中传来的消息信息,并进行处理。Camel核心包和各个Plugin组件都提供了很多Processor的实现,开发人员也可以通过实现org.apache.camel.Processor接口自定义处理器(后者是通常做法)。
既然是做编码,那么我们自然可以在自定义的Processor处理器中做很多事情。这些事情可能包括处理业务逻辑、建立数据库连接去做业务数据存储、建立和某个第三方业务系统的RPC连接,但是我们一般不会那样做——那是Endpoint的工作。Processor处理器中最主要的工作是进行业务数据格式的转换和中间数据的临时存储。这样做是因为Processor处理器是Camel编排的路由中,主要进行Exchange输入输出消息交换的地方。
处理器Processor是和控制端点平级的概念。要看一个URI对应的实现是否是一个控制端点,最根本的就是看这个实现类是否实现了org.apache.camel.Endpoint接口;而要看一个路由中的元素是否是Processor处理器,最根本的就是看这个类是否实现了org.apache.camel.Processor接口。
Routing路由条件
在控制端点和处理器之间、处理器和处理器之间,Camel允许开发人员进行路由条件设置。例如开发人员可以拥有当Exchange In Message的内容为A的情况下将消息送入下一个处理器A,当Exchange In Message的内容为B时将消息送入下一个处理器B的处理能力。又例如,无论编排的路由中上一个元素的处理消息如何,都将携带消息的Exchange对象复制 多份,分别送入下一处理器X、Y、Z。开发人员甚至还可以通过路由规则完成Exchange到多个Endpoint的负载传输。
Camel中支持的路由规则非常丰富,包括:Message Filter、Based Router、Dynamic Router、Splitter、Aggregator、Resequencer等等。在Camel的官方文档中使用了非常形象化的图形来表示这些路由功能(http://camel.apache.org/enterprise-integration-patterns.html):
Service与生命周期
在Apache Camel中有一个比Endpoint、Component、CamelContext等元素更基础的概念元素:Service。 包括Endpoint、Component、CamelContext等元素在内的大多数工作在Camel中的元素,都是一个一个的Service。例如,我们虽然定义了一个JettyHttpComponent(就是在代码中使用DSL定义的”jetty:http://0.0.0.0:8282/directCamel“头部所表示的Component),但是我们想要在Camel应用程序运行阶段使用这个Component,就需要利用start方法将这个Component启动起来。
实际上通过阅读org.apache.camel.component.jetty.JettyHttpComponent的源代码,读者可以发现JettyHttpComponent的启动过程起始大多数情况下什么都不会做,只是在org.apache.camel.support.ServiceSupport中更改了JettyHttpComponent对象的一些状态属性。倒是HttpConsumer这个Service,在启动的过程中启动了JettyHttpComponent对象的连接监听,并建立了若干个名为【qtp-*】的处理线程。
Service有且只有两个接口方法定义:start()和stop(),这两个方法的含义显而易见,启动服务和终止服务。另外继承自Service的另外两个子级接口SuspendableService、ShutdownableService分别还定义了另外几个方法:suspend()、resume()和shutdown()方法,分别用来暂停服务、恢复服务和彻底停止服务(彻底停止服务意味着在Camel应用程序运行的有生之年不能再次启动了)。
Camel应用程序中的每一个Service都是独立运行的,各个Service的关联衔接通过CamelContext上下文对象完成。每一个Service通过调用start()方法被激活并参与到Camel应用程序的工作中,直到它的stop()方法被调用。也就是说,每个Service都有独立的生命周期。(http://camel.apache.org/lifecycle.html)
那么问题来了,既然每个Service都有独立的生命周期,我们启动Camel应用程序时就要启动包括Route、Endpoint、Component、Producer、Consumer、LifecycleStrategy等概念元素在内的无数多个Service实现,那么作为开发人员不可能编写代码一个一个的Service来进行启动(大多数开发人员不了解Camel的内部结构,也根本不知道要启动哪些Service)。那么作为Camel应用程序肯定需要提供一个办法,在应用程序启动时分析应用程序所涉及到的所有的Service,并统一管理这些Service启动和停止的动作。这就是CamelContext所设计的另一个功能。
CamelContext上下文
CamelContext从英文字面上理解,是Camel服务上下文的意思。CamelContext在Apache Camel中的重要性,就像ApplicationContext之于Spring、ServletContext之于Servlet…… 但是包括Camel官方文档在内的,所有读者能够在互联网上找到的资料对于CamelContext的介绍都只有聊聊数笔。
如果我们翻阅DefaultCamelContext的源代码,首先就会发现在其中定义了许多全局变量,数量在70个左右(实际上根据《代码大全》的描述,一个类中不应该有这么多全局变量。究竟这个类的作者当时是怎样的想法,就不清楚了)。其中一些变量负责记录CamelContext的状态属性、一些负责引用辅助工具还有一些记录关联的顶层工作对象(例如Endpoint、Servcie、Routes、)Components等等)。
看来CamelContext是挺重要的,它基本将Camel应用程序运行所需要的所有基本信息都记录在案。另外,Apache Camel中还有一个名叫org.apache.camel.CamelContextAware的接口,只要实现该接口的就必须实现这个接口定义的两个方法:setCamelContext和getCamelContext。而实际上在Camel中的大多数元素都实现了这个接口,所以我们在阅读代码时可以发现DefaultCamelContext在一边启动各个Service的时候,顺便将自己所为参数赋给了正在启动的Service,最终实现了各个Service之间的共享上下文信息的效果