51学通信论坛2017新版

标题: 二、Flowmod下发流程 [打印本页]

作者: admin    时间: 2017-9-17 14:38
标题: 二、Flowmod下发流程
上一篇(http://www.sdnlab.com/16316.html)简单分析了openflowjava到openflowplugin(介绍的hello消息),本篇介绍如何从openflowplugin到openflowjava,即介绍flowmod消息。
</p>一、ModelDrivenSwitchImpl核心类

在上篇中提到了类ModelDrivenSwitchImpl。可以说这个类是openflow交换机的抽象。几乎所有操作都需要调用类的方法,比如:addflow,addgroup等。简单看一下构造函数:
Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier,
final SessionContext sessionContext) {
super(identifier, sessionContext);
this.nodeId = nodeId;
messageService = sessionContext.getMessageDispatchService;
version = sessionContext.getPrimaryConductor.getVersion;
final NotificationProviderService rpcNotificationProviderService =
OFSessionUtil.getSessionManager.getNotificationProviderService;
//绑定session和设置服务 这里的session就是上一篇提到的session,此处会把session
// 与某一个底层交换机进行绑定
rpcTaskContext = new OFRpcTaskContext;
rpcTaskContext.setSession(sessionContext);
rpcTaskContext.setMessageService(messageService);
rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService);
//超时时间默认是1分钟
rpcTaskContext.setMaxTimeout(maxTimeout);
rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
//设置rpc线程池,这个池中有10个线程,目的下发流表等
rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager.getRpcPool);
rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager.getMessageSpy);
}

Flowmod分为三种,add、update、remove。这三种类型操作类似,这里以addflow为例进行分析。
代码addflow非常简单,但是有几点需要说明:
Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

@Override
public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
// use primary connection
SwitchConnectionDistinguisher cookie = null;
//构造flowmod和barrier消息,表现形式是task
OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie);
//将task放到线程中执行 这个地方线程就是构造函数中提到的那个具有10个线程的线程池
ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit;
// result是UpdateFlowOutput类型,返回值要求AddFlowOutput,因此需要将result
// 进行转换。无论是updateflow,removeflow都需要进行一次转换。
// 此处的Future.transform是一个接口,我们只需要实现回调函数即可第二个参数
return Futures.transform(result, OFRpcFutureResultTransformFactory.createForAddFlowOutput);
}
public static Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>> createForAddFlowOutput {
return new Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>>{
@Override
public RpcResult<AddFlowOutput> apply(RpcResult<UpdateFlowOutput> input){
// ODL在代码中进行封装统一,所有flowmod操作(Add,Update,Remove),结果均以
// 都是以UpdateFlowOutput形式返回
// 在调用方法getResult时候,如果结果已经存在则立即返回,如果没有结果则进行等待
UpdateFlowOutput updateFlowOutput = input.getResult;
// 将updateflowoutput转成AddFlowOutput
AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder;
addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId);
AddFlowOutput result = addFlowOutput.build;
RpcResult<AddFlowOutput> rpcResult = assembleRpcResult(input, result);
LOG.debug(MSG_ADD_FLOW_RPC);
return rpcResult;
}
};
}

上面简单分析了一下最外层逻辑处理,归为两点:
1)创建返回类型UpdateOutput的task并提交到线程池。
2) 接收返回类型UpdateOutput的future并且转成对应的Output,如AddOutput、RemoveOutput。
下面开始flowmod下发流程分析,逻辑层次比较深入还是先放一张流程图:

[attach]1317[/attach]

由上图可知,flowmod下发流程层次很深,这里只分析一下几个方法:
1、toFlowModInput方法,该方法可以说是将业务数据转成标准flowmod消息入口。如果需要扩展flowmod消息,则需要从这个函数着手修改:
Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

private static FlowModInputBuilder toFlowModInput(Flow flow, short version, BigInteger datapathid) {
// 此处代码采用的设计模式--建造者设计模式
FlowModInputBuilder flowMod = new FlowModInputBuilder;
// 填充flowmod消息头
salToOFFlowCookie(flow, flowMod);
salToOFFlowCookieMask(flow, flowMod);
salToOFFlowTableId(flow, flowMod);
salToOFFlowCommand(flow, flowMod);
salToOFFlowIdleTimeout(flow, flowMod);
salToOFFlowHardTimeout(flow, flowMod);
salToOFFlowPriority(flow, flowMod);
salToOFFlowBufferId(flow, flowMod);
salToOFFlowOutPort(flow, flowMod);
salToOFFlowOutGroup(flow, flowMod);
// convert and inject flowFlags
FlowFlagReactor.getInstance.convert(flow.getFlags, version, flowMod, datapathid);
// 下面的match域和instructions域往往是我们最关心的因为扩展flowmod消息就是修改这两个地方 这个地方需要特别注意!!!
MatchReactor.getInstance.convert(flow.getMatch, version, flowMod, datapathid);
if (flow.getInstructions != null) {
flowMod.setInstruction(toInstructions(flow, version, datapathid));
flowMod.setAction(getActions(version, datapathid, flow));
}
flowMod.setVersion(version);
return flowMod;
}

代码MatchReactor.getInstance.convert(…),最终会调用到MatchConvertorImpl.convert这个函数中,就有我们常见的协议字段,如:inport,ipv4等。这个函数主要是组建oxm。这些内容没有什么逻辑,只要按照标准协议填写即可,不再累赘阐述。
方法toInstructions,getActions也类似,按照标准协议填写即可。
以上内容主要想说明,当我们扩展flowmod消息的时候,能够知道在哪里修改即可。
2、方法MessageDispatchServiceImpl.flowMod
Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

@Override
public Future<RpcResult<UpdateFlowOutput>> flowMod(……) {
LOG.debug("Calling OFLibrary flowMod");
Future<RpcResult<Void>> response = null;
try {// 下发flowmod,返回值为Future,这个future是一个RPC,在下面代码会进行解析
response = getConnectionAdapter(cookie).flowMod(input);
} catch (ConnectionException e) {
return RpcResultUtil.getRpcErrorFuture(e);
}
// 将future放到线程中,如果接收到返回值,则调用apply方法。Apply方法会把RPC结果转成UpdateOutput类型
ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
JdkFutureAdapters.listenInPoolThread(response),
new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>> {
@Override
public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
// 实例化UpdateFlowOutput,将RPC结果放到这个实例中,返回给业务层,由业务进行后续处理(如果是addflow则后续处理就是把UpdateFlowOutput转成AddFlowOutput)
UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder;
BigInteger bigIntXid = BigInteger.valueOf(input.getXid);
flowModOutput.setTransactionId(new TransactionId(bigIntXid));
UpdateFlowOutput result = flowModOutput.build;
RpcResult<UpdateFlowOutput> rpcResult = RpcResultBuilder
.<UpdateFlowOutput>status(inputArg.isSuccessful)
.withResult(result).withRpcErrors(inputArg.getErrors)
.build;
return rpcResult;
}
});
return xidResult;
}

3、简单分析netty中write方法,此方法功能是将数据包发到网络中。
在说write方法之前,简单说一下netty背景知识吧。netty为了降低应用程开发帮我们做了很多东西,比如说常见的tcp粘包、丢包问题,最简编解码,异步IO等。应用程序基本不要关心这些底层内容,不像C/C++语言开发时,需要由应用程序自己处理粘包,丢包以及自己构造异步IO,这些内容如果处理不好,会带来很大问题。
然而关于编解码问题,netty只是做了最基本的序列化工作,比如说分割符,定长编解码等,对于咱们Openflow协议这种编解码,netty是没有提供的,而且也不应该由netty提供。
我们来看一下write方法:
Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null; // byte数组用于保存序列化后数据
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf); // encode这个接口用于序列化操作,也就是有openflowjava实现,实际指向为OFEncoder.encode。
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable) {
ctx.write(buf, promise); // 如果上面序列化操作正确,则会进入此分支,netty会把bytebuf放到队列中,等待时机将数据发送到网络中。
} else {
buf.release;
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
}
….
}

针对上面的ctx.write(buf, promise);这行代码再多说两句,这个函数只是将消息放到netty队列中,择机发到网络中。那么择机发送是什么时候发送呢?一般有两个条件:
1、队列已满。
2、超时。
然而应用程序若想立即将数据包发送到网络中则需要调用方法writeAndFlush。在这里讨论这些内容,主要因为某些应用要求实时性比较高的时候,所以可以考虑这种方式。

三、再谈序列化OFEncoder.encode

在上一篇中,我们提到了序列化,这个在简单过一下吧。OFEncoder是序列化入口,所有序列化操作都需要调用该类的方法,或者是调用encode方法。
Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

@Override
protected void encode(ChannelHandlerContext ctx, MessageListenerWrapper wrapper, ByteBuf out)
throws Exception {
LOGGER.trace("Encoding");
try {
// 根据报文内容,在找序列化工厂中查找对应序列化对象,然后进行序列化操作。
// 其中参数wrapper是就是来自netty流水线处理后的数据
// 参数out就是用于保存序列化后数据
serializationFactory.messageToBuffer(wrapper.getMsg.getVersion, out, wrapper.getMsg);
if(wrapper.getMsg instanceof FlowModInput){
statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_SENT);
}
statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_SUCCESS);
} catch(Exception e) {
LOGGER.warn("Message serialization failed ", e);
statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_FAIL);
Future<Void> newFailedFuture = ctx.newFailedFuture(e);
wrapper.getListener.operationComplete(newFailedFuture);
out.clear;
return;
}
}

关于序列化工厂注册内容,请参考上一篇文章(http://www.sdnlab.com/16316.html)。

四、Session存储关系图


控制器所有操作都是建立在session基础之上的,换句话说就是建立在连接之上的,下图就是session相关组织结构图。

[attach]1318[/attach]

1.单例模式保存所有session对象
SessionManagerOFImpl是一个单例,保存当前有效session会话。从hello握手成功session一直存在,直到交换机与控制器断链后。
2.SessionListener有两个方法,onSessionAdded,onSessionRemoved。这两方法从便可知道用于管理添加、删除session对象。这两个方法主要是操作HashMap即ConcurrentHashMap<SwitchSessionKeyOF, SessionContext> sessionLot。这个hashmap中每一个元素都是一个有效的连接,也就是说有一个元素存在就对应着一个交换机。
3.Hashmap中键值value是SessionContext,这个上下文包含了一个ModelDrivenSwitchImpl,实则是ODL对交换机的抽象。也就是说ODL用类ModelDrivenSwitchImpl来表示一个交换机。抽象出来的ModelDrivenSwitchImpl进行再次包装,插入到SessionContext的notifaction队列当中。以便能够处理后续的notification消息。
4.在ModelDrivenSwitchImpl中有一个属性OFRpcTaskContext,这个属性是关联SessionContext以及Notification服务等。控制器给交换机发送的消息中大部分都是采用RPC形式,所以rpcTaskContext是至关重要的。
Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier,
final SessionContext sessionContext) {
super(identifier, sessionContext);
this.nodeId = nodeId;
messageService = sessionContext.getMessageDispatchService;
version = sessionContext.getPrimaryConductor.getVersion;
final NotificationProviderService rpcNotificationProviderService =
OFSessionUtil.getSessionManager.getNotificationProviderService;
// rpcTaskContext实例化
rpcTaskContext = new OFRpcTaskContext;
rpcTaskContext.setSession(sessionContext); // 关联session
rpcTaskContext.setMessageService(messageService); //关联消息分发服务
//关联notifaction服务
rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService);
rpcTaskContext.setMaxTimeout(maxTimeout);
rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
//设置线程池,默认线程池中有10个线程
rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager.getRpcPool);
rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager.getMessageSpy);
}

以上全部内容基本上(加上前一篇)就是ODL代码从上到下,从下到上,openflow消息处理流程。有些内容分析不到位,请各位网友多多批评指出,欢迎大家及时讨论。
作者简介:
徐小冰:毕业于河北大学,主要从事嵌入式软件开发,虚拟化,SDN。目前基于ODL和Open vSwitch进行二次开发,希望与广大网友一起探讨学习。作者系OpenDaylihgt群(194240432)资深活跃用户,@IT难人
--------------华丽的分割线------------------
本文系《SDNLAB原创文章奖励计划》投稿文章,该计划旨在鼓励广大从业人员在SDN/NFV/Cloud网络领域创新技术、开源项目、产业动态等方面进行经验和成果的文字传播、分享、交流。有意向投稿的同学请通过官方唯一指定投稿通道进行文章投递,投稿细则请参考《SDNLAB原创文章奖励计划》
声明:本文转载自网络。版权归原作者所有,如有侵权请联系删除。




欢迎光临 51学通信论坛2017新版 (http://bbs.51xuetongxin.com/) Powered by Discuz! X3