51学通信论坛2017新版

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 1283|回复: 0
打印 上一主题 下一主题

二、Flowmod下发流程

[复制链接]

 成长值: 15613

  • TA的每日心情
    开心
    2022-7-17 17:50
  • 2444

    主题

    2544

    帖子

    7万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    74104
    跳转到指定楼层
    楼主
    发表于 2017-9-17 14:38:57 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
    上一篇(http://www.sdnlab.com/16316.html)简单分析了openflowjava到openflowplugin(介绍的hello消息),本篇介绍如何从openflowplugin到openflowjava,即介绍flowmod消息。
    </p>一、ModelDrivenSwitchImpl核心类

    在上篇中提到了类ModelDrivenSwitchImpl。可以说这个类是openflow交换机的抽象。几乎所有操作都需要调用类的方法,比如:addflow,addgroup等。简单看一下构造函数:
    Java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

    protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier,
    final SessionContext sessionContext) {
    super(identifier, sessionContext);
    this.nodeId = nodeId;
    messageService = sessionContext.getMessageDispatchService;
    version = sessionContext.getPrimaryConductor.getVersion;
    final NotificationProviderService rpcNotificationProviderService =
    OFSessionUtil.getSessionManager.getNotificationProviderService;
    //绑定session和设置服务 这里的session就是上一篇提到的session,此处会把session
    // 与某一个底层交换机进行绑定
    rpcTaskContext = new OFRpcTaskContext;
    rpcTaskContext.setSession(sessionContext);
    rpcTaskContext.setMessageService(messageService);
    rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService);
    //超时时间默认是1分钟
    rpcTaskContext.setMaxTimeout(maxTimeout);
    rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
    //设置rpc线程池,这个池中有10个线程,目的下发流表等
    rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager.getRpcPool);
    rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager.getMessageSpy);
    }

    Flowmod分为三种,add、update、remove。这三种类型操作类似,这里以addflow为例进行分析。
    代码addflow非常简单,但是有几点需要说明:
    Java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34

    @Override
    public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
    LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
    // use primary connection
    SwitchConnectionDistinguisher cookie = null;
    //构造flowmod和barrier消息,表现形式是task
    OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
    OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie);
    //将task放到线程中执行 这个地方线程就是构造函数中提到的那个具有10个线程的线程池
    ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit;
    // result是UpdateFlowOutput类型,返回值要求AddFlowOutput,因此需要将result
    // 进行转换。无论是updateflow,removeflow都需要进行一次转换。
    // 此处的Future.transform是一个接口,我们只需要实现回调函数即可第二个参数
    return Futures.transform(result, OFRpcFutureResultTransformFactory.createForAddFlowOutput);
    }
    public static Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>> createForAddFlowOutput {
    return new Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>>{
    @Override
    public RpcResult<AddFlowOutput> apply(RpcResult<UpdateFlowOutput> input){
    // ODL在代码中进行封装统一,所有flowmod操作(Add,Update,Remove),结果均以
    // 都是以UpdateFlowOutput形式返回
    // 在调用方法getResult时候,如果结果已经存在则立即返回,如果没有结果则进行等待
    UpdateFlowOutput updateFlowOutput = input.getResult;
    // 将updateflowoutput转成AddFlowOutput
    AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder;
    addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId);
    AddFlowOutput result = addFlowOutput.build;
    RpcResult<AddFlowOutput> rpcResult = assembleRpcResult(input, result);
    LOG.debug(MSG_ADD_FLOW_RPC);
    return rpcResult;
    }
    };
    }

    上面简单分析了一下最外层逻辑处理,归为两点:
    1)创建返回类型UpdateOutput的task并提交到线程池。
    2) 接收返回类型UpdateOutput的future并且转成对应的Output,如AddOutput、RemoveOutput。
    下面开始flowmod下发流程分析,逻辑层次比较深入还是先放一张流程图:


    由上图可知,flowmod下发流程层次很深,这里只分析一下几个方法:
    1、toFlowModInput方法,该方法可以说是将业务数据转成标准flowmod消息入口。如果需要扩展flowmod消息,则需要从这个函数着手修改:
    Java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    private static FlowModInputBuilder toFlowModInput(Flow flow, short version, BigInteger datapathid) {
    // 此处代码采用的设计模式--建造者设计模式
    FlowModInputBuilder flowMod = new FlowModInputBuilder;
    // 填充flowmod消息头
    salToOFFlowCookie(flow, flowMod);
    salToOFFlowCookieMask(flow, flowMod);
    salToOFFlowTableId(flow, flowMod);
    salToOFFlowCommand(flow, flowMod);
    salToOFFlowIdleTimeout(flow, flowMod);
    salToOFFlowHardTimeout(flow, flowMod);
    salToOFFlowPriority(flow, flowMod);
    salToOFFlowBufferId(flow, flowMod);
    salToOFFlowOutPort(flow, flowMod);
    salToOFFlowOutGroup(flow, flowMod);
    // convert and inject flowFlags
    FlowFlagReactor.getInstance.convert(flow.getFlags, version, flowMod, datapathid);
    // 下面的match域和instructions域往往是我们最关心的因为扩展flowmod消息就是修改这两个地方 这个地方需要特别注意!!!
    MatchReactor.getInstance.convert(flow.getMatch, version, flowMod, datapathid);
    if (flow.getInstructions != null) {
    flowMod.setInstruction(toInstructions(flow, version, datapathid));
    flowMod.setAction(getActions(version, datapathid, flow));
    }
    flowMod.setVersion(version);
    return flowMod;
    }

    代码MatchReactor.getInstance.convert(…),最终会调用到MatchConvertorImpl.convert这个函数中,就有我们常见的协议字段,如:inport,ipv4等。这个函数主要是组建oxm。这些内容没有什么逻辑,只要按照标准协议填写即可,不再累赘阐述。
    方法toInstructions,getActions也类似,按照标准协议填写即可。
    以上内容主要想说明,当我们扩展flowmod消息的时候,能够知道在哪里修改即可。
    2、方法MessageDispatchServiceImpl.flowMod
    Java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33

    @Override
    public Future<RpcResult<UpdateFlowOutput>> flowMod(……) {
    LOG.debug("Calling OFLibrary flowMod");
    Future<RpcResult<Void>> response = null;
    try {// 下发flowmod,返回值为Future,这个future是一个RPC,在下面代码会进行解析
    response = getConnectionAdapter(cookie).flowMod(input);
    } catch (ConnectionException e) {
    return RpcResultUtil.getRpcErrorFuture(e);
    }
    // 将future放到线程中,如果接收到返回值,则调用apply方法。Apply方法会把RPC结果转成UpdateOutput类型
    ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
    JdkFutureAdapters.listenInPoolThread(response),
    new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>> {
    @Override
    public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
    // 实例化UpdateFlowOutput,将RPC结果放到这个实例中,返回给业务层,由业务进行后续处理(如果是addflow则后续处理就是把UpdateFlowOutput转成AddFlowOutput)
    UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder;
    BigInteger bigIntXid = BigInteger.valueOf(input.getXid);
    flowModOutput.setTransactionId(new TransactionId(bigIntXid));
    UpdateFlowOutput result = flowModOutput.build;
    RpcResult<UpdateFlowOutput> rpcResult = RpcResultBuilder
    .<UpdateFlowOutput>status(inputArg.isSuccessful)
    .withResult(result).withRpcErrors(inputArg.getErrors)
    .build;
    return rpcResult;
    }
    });
    return xidResult;
    }

    3、简单分析netty中write方法,此方法功能是将数据包发到网络中。
    在说write方法之前,简单说一下netty背景知识吧。netty为了降低应用程开发帮我们做了很多东西,比如说常见的tcp粘包、丢包问题,最简编解码,异步IO等。应用程序基本不要关心这些底层内容,不像C/C++语言开发时,需要由应用程序自己处理粘包,丢包以及自己构造异步IO,这些内容如果处理不好,会带来很大问题。
    然而关于编解码问题,netty只是做了最基本的序列化工作,比如说分割符,定长编解码等,对于咱们Openflow协议这种编解码,netty是没有提供的,而且也不应该由netty提供。
    我们来看一下write方法:
    Java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null; // byte数组用于保存序列化后数据
    try {
    if (acceptOutboundMessage(msg)) {
    @SuppressWarnings("unchecked")
    I cast = (I) msg;
    buf = allocateBuffer(ctx, cast, preferDirect);
    try {
    encode(ctx, cast, buf); // encode这个接口用于序列化操作,也就是有openflowjava实现,实际指向为OFEncoder.encode。
    } finally {
    ReferenceCountUtil.release(cast);
    }
    if (buf.isReadable) {
    ctx.write(buf, promise); // 如果上面序列化操作正确,则会进入此分支,netty会把bytebuf放到队列中,等待时机将数据发送到网络中。
    } else {
    buf.release;
    ctx.write(Unpooled.EMPTY_BUFFER, promise);
    }
    buf = null;
    } else {
    ctx.write(msg, promise);
    }
    }
    ….
    }

    针对上面的ctx.write(buf, promise);这行代码再多说两句,这个函数只是将消息放到netty队列中,择机发到网络中。那么择机发送是什么时候发送呢?一般有两个条件:
    1、队列已满。
    2、超时。
    然而应用程序若想立即将数据包发送到网络中则需要调用方法writeAndFlush。在这里讨论这些内容,主要因为某些应用要求实时性比较高的时候,所以可以考虑这种方式。

    三、再谈序列化OFEncoder.encode

    在上一篇中,我们提到了序列化,这个在简单过一下吧。OFEncoder是序列化入口,所有序列化操作都需要调用该类的方法,或者是调用encode方法。
    Java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

    @Override
    protected void encode(ChannelHandlerContext ctx, MessageListenerWrapper wrapper, ByteBuf out)
    throws Exception {
    LOGGER.trace("Encoding");
    try {
    // 根据报文内容,在找序列化工厂中查找对应序列化对象,然后进行序列化操作。
    // 其中参数wrapper是就是来自netty流水线处理后的数据
    // 参数out就是用于保存序列化后数据
    serializationFactory.messageToBuffer(wrapper.getMsg.getVersion, out, wrapper.getMsg);
    if(wrapper.getMsg instanceof FlowModInput){
    statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_SENT);
    }
    statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_SUCCESS);
    } catch(Exception e) {
    LOGGER.warn("Message serialization failed ", e);
    statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_FAIL);
    Future<Void> newFailedFuture = ctx.newFailedFuture(e);
    wrapper.getListener.operationComplete(newFailedFuture);
    out.clear;
    return;
    }
    }

    关于序列化工厂注册内容,请参考上一篇文章(http://www.sdnlab.com/16316.html)。

    四、Session存储关系图


    控制器所有操作都是建立在session基础之上的,换句话说就是建立在连接之上的,下图就是session相关组织结构图。


    1.单例模式保存所有session对象
    SessionManagerOFImpl是一个单例,保存当前有效session会话。从hello握手成功session一直存在,直到交换机与控制器断链后。
    2.SessionListener有两个方法,onSessionAdded,onSessionRemoved。这两方法从便可知道用于管理添加、删除session对象。这两个方法主要是操作HashMap即ConcurrentHashMap<SwitchSessionKeyOF, SessionContext> sessionLot。这个hashmap中每一个元素都是一个有效的连接,也就是说有一个元素存在就对应着一个交换机。
    3.Hashmap中键值value是SessionContext,这个上下文包含了一个ModelDrivenSwitchImpl,实则是ODL对交换机的抽象。也就是说ODL用类ModelDrivenSwitchImpl来表示一个交换机。抽象出来的ModelDrivenSwitchImpl进行再次包装,插入到SessionContext的notifaction队列当中。以便能够处理后续的notification消息。
    4.在ModelDrivenSwitchImpl中有一个属性OFRpcTaskContext,这个属性是关联SessionContext以及Notification服务等。控制器给交换机发送的消息中大部分都是采用RPC形式,所以rpcTaskContext是至关重要的。
    Java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

    protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier,
    final SessionContext sessionContext) {
    super(identifier, sessionContext);
    this.nodeId = nodeId;
    messageService = sessionContext.getMessageDispatchService;
    version = sessionContext.getPrimaryConductor.getVersion;
    final NotificationProviderService rpcNotificationProviderService =
    OFSessionUtil.getSessionManager.getNotificationProviderService;
    // rpcTaskContext实例化
    rpcTaskContext = new OFRpcTaskContext;
    rpcTaskContext.setSession(sessionContext); // 关联session
    rpcTaskContext.setMessageService(messageService); //关联消息分发服务
    //关联notifaction服务
    rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService);
    rpcTaskContext.setMaxTimeout(maxTimeout);
    rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
    //设置线程池,默认线程池中有10个线程
    rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager.getRpcPool);
    rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager.getMessageSpy);
    }

    以上全部内容基本上(加上前一篇)就是ODL代码从上到下,从下到上,openflow消息处理流程。有些内容分析不到位,请各位网友多多批评指出,欢迎大家及时讨论。
    作者简介:
    徐小冰:毕业于河北大学,主要从事嵌入式软件开发,虚拟化,SDN。目前基于ODL和Open vSwitch进行二次开发,希望与广大网友一起探讨学习。作者系OpenDaylihgt群(194240432)资深活跃用户,@IT难人
    --------------华丽的分割线------------------
    本文系《SDNLAB原创文章奖励计划》投稿文章,该计划旨在鼓励广大从业人员在SDN/NFV/Cloud网络领域创新技术、开源项目、产业动态等方面进行经验和成果的文字传播、分享、交流。有意向投稿的同学请通过官方唯一指定投稿通道进行文章投递,投稿细则请参考《SDNLAB原创文章奖励计划》
    声明:本文转载自网络。版权归原作者所有,如有侵权请联系删除。
    扫描并关注51学通信微信公众号,获取更多精彩通信课程分享。

    本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有帐号?立即注册

    x
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    Archiver|手机版|小黑屋|51学通信技术论坛

    GMT+8, 2025-1-31 19:53 , Processed in 0.060198 second(s), 33 queries .

    Powered by Discuz! X3

    © 2001-2013 Comsenz Inc.

    快速回复 返回顶部 返回列表