【框架学习】Apache Camel 进阶和实践 - hippowc/hippowc.github.io GitHub Wiki

camel运行原理

主要包含路由信息的构建、组件查找、和路由启动过程三方面

路由构建

路由构建其实是构建路由定义,对应Camel中的RouteDefinition类,一个RouteDefinition对象规定了或者说指定了一个消息从哪里产生,中间要经过什么样的处理,最后路由到什么地方。RouteDefinition由RouteBuilder进行构建,再具体点就是调用RouteBuilder的configure()方法,构建完成后再添加到CamelContext中,那么该路由定义就可以运行了。RouteBuilder的configure()方法是一个抽象方法,所以该方法要由开发者进行实现,其实就是在进行路由定义的构建过程。

首先,调用RouteBuilder的from方法获取RouteDefinition,并且根据from传入的参数将一个FromDefinition类型的实例设置给RouteDefinition的inputs(输入)列表,然后调用RouteDefinition的process()和to()方法,分别生成ProcessDefinition和ToDefinition对象,因为ProcessDefinition和ToDefinition都继承自ProcessorDefinition,所以都可以看出输出放到RouteDefinition的outputs(输出)列表中,至此,整个路由定义就构建完成了,其实该路由定义最重要的就是输入与输出,输入与输出都可以有多个,默认情况下,在路由运行后,Camel会依赖调用这些输出处理器并最终将消息路由到指定目的地。

组件查找

Camel的运行是由组件(component)进行组织的,我们前面的调用from()方法传个字符串,camel是要根据uri来查找到对应的组件,即要维护uri到组件之间的映射关系,查找组件的过程是调用DefaultCamelContext中的getComponent(String name)方法来完成的,是在DefaultCamelContext启动的时候调用,获取相应的组件,生成对应的endPoint,组件的查找是从DefaultCamelContext上下文缓存的components中去找,如果没有找到,就会去根据uri前缀生成一个component

其中的findComponent方法是在camel包中默认路径META-INF/services/org/apache/camel/component/+uri前缀名找对应的Properties(不带.properties后缀)文件,文件中有Component组件实例化的类名,找到后通过反射机制生成一个component组件,放到缓存中。

事务处理

camel事务支持

camel没有重复造轮子,去实现一个事务管理器,camel直接使用的spring的事务来支持本地事务

如果要支持全局事务,则需要使用三方事务工具来支持

<from uri="sql:{{sql­from}}?consumer.onConsume={{sql­delete}}
               &amp;dataSource=#myDataSource&amp;transacted=true"/>
    <transacted/>
    <log message="*** transacted ***"/>
    <to uri="log:row"/>
    <to uri="activemq:queue:order"/>

Camel路由问题

多个consumer不能同时消费一个Endpoint

在有些场景下,希望把camel当做一个消息中间件用,想把从一个地方获取的信息 to到一个抽象端点,譬如:direct:test,然后后续的路由都从这个端点消费,但实际camel是不支持主要做的。消费端不可以复用

但是可以to到多个端点,譬如大家可以都 to到direct:test

如果想要实现路由从一个端点发往多个端点,那么可以考虑使用一些集成模式,譬如muticast,或者dynamic等等都可以

Camel 使用实践

camel测试的一些便捷操作

setBody方法

可以在from后面直接使用setBody方案设置body内容,方法内容是一些表达式

setBody(constant("test"))

加载特定的Component

譬如使用sql组件时,可能需要定制DataSource,但是需要将DataSource以注入的方式添加到组件,可能需要和springboot一块使用,为了简便可以这样用

DriverManagerDataSource ds = new DriverManagerDataSource();
ds.setDriverClassName("org.sqlite.JDBC");
ds.setUrl("jdbc:sqlite:/Users/chenwen.wcw/test.db");
SqlComponent sqlComponent = new SqlComponent();
sqlComponent.setDataSource(ds);
camelContext.addComponent("sql", sqlComponent);

EIP -- setHeader和setProperty

在路由处理过程中,一些关键点的信息可以存储在消息的properties和header中,以便于在后续路由处理过程中直接使用

from("direct:a")
            .setHeader("myHeader", constant("test"))
            .to("direct:b");
from("direct:a")
            .setProperty("myProperty", constant("test"))
            .to("direct:b");

可以直接在simple表达式中或者路由中使用这些值

simple("${header.bar} == 100")
// 在路由组件使用需要组件支持,譬如sql组件
"sql:select * from projects where license = :#${body} and id > :#${property.min} order by id"
// sql组件默认支持从body或者从header中取值
sql:select * from table where id=:#myId order by name

camel的扩展语言支持

在camel的DSL中,除了可以使用EIP的各种from,to,choice等等分支过程,还可以增加扩展的流程支持,是的路由处理更加灵活方便

simple

simple表达式是camel提供的一种表达式语言,类似与mvel以及ongl,camel自带,非常方便;主要用于读取以及比较

// 如果body中是一个pojo,并有getAddress方法
simple("${body.address}")
simple("${body?.address?.street}")
// 如果body是一个map或list
simple("${body[foo].name}")
simple("${body[this.is.foo]}")
simple("${body[foo]?.name}")
simple("${body.address.lines[0]}")
simple("${body.address.lines[last-1]}")

bean method

可以在流程中随意调用某个bean的某个方法进行处理

from("activemq:topic:OrdersTopic")
  .filter().method("myBean", "isGoldCustomer")
    .to("activemq:BigSpendersQueue");

constant

设置静态值

from("seda:a")
  .setHeader("theHeader", constant("the value"))
  .to("mock:b");