1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext sessionContext) { super(identifier, sessionContext); this.nodeId = nodeId; messageService = sessionContext.getMessageDispatchService; version = sessionContext.getPrimaryConductor.getVersion; final NotificationProviderService rpcNotificationProviderService = OFSessionUtil.getSessionManager.getNotificationProviderService; //绑定session和设置服务 这里的session就是上一篇提到的session,此处会把session // 与某一个底层交换机进行绑定 rpcTaskContext = new OFRpcTaskContext; rpcTaskContext.setSession(sessionContext); rpcTaskContext.setMessageService(messageService); rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService); //超时时间默认是1分钟 rpcTaskContext.setMaxTimeout(maxTimeout); rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit); //设置rpc线程池,这个池中有10个线程,目的下发流表等 rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager.getRpcPool); rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager.getMessageSpy); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | @Override public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) { LOG.debug("Calling the FlowMod RPC method on MessageDispatchService"); // use primary connection SwitchConnectionDistinguisher cookie = null; //构造flowmod和barrier消息,表现形式是task OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie); //将task放到线程中执行 这个地方线程就是构造函数中提到的那个具有10个线程的线程池 ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit; // result是UpdateFlowOutput类型,返回值要求AddFlowOutput,因此需要将result // 进行转换。无论是updateflow,removeflow都需要进行一次转换。 // 此处的Future.transform是一个接口,我们只需要实现回调函数即可第二个参数 return Futures.transform(result, OFRpcFutureResultTransformFactory.createForAddFlowOutput); } public static Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>> createForAddFlowOutput { return new Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>>{ @Override public RpcResult<AddFlowOutput> apply(RpcResult<UpdateFlowOutput> input){ // ODL在代码中进行封装统一,所有flowmod操作(Add,Update,Remove),结果均以 // 都是以UpdateFlowOutput形式返回 // 在调用方法getResult时候,如果结果已经存在则立即返回,如果没有结果则进行等待 UpdateFlowOutput updateFlowOutput = input.getResult; // 将updateflowoutput转成AddFlowOutput AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder; addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId); AddFlowOutput result = addFlowOutput.build; RpcResult<AddFlowOutput> rpcResult = assembleRpcResult(input, result); LOG.debug(MSG_ADD_FLOW_RPC); return rpcResult; } }; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | private static FlowModInputBuilder toFlowModInput(Flow flow, short version, BigInteger datapathid) { // 此处代码采用的设计模式--建造者设计模式 FlowModInputBuilder flowMod = new FlowModInputBuilder; // 填充flowmod消息头 salToOFFlowCookie(flow, flowMod); salToOFFlowCookieMask(flow, flowMod); salToOFFlowTableId(flow, flowMod); salToOFFlowCommand(flow, flowMod); salToOFFlowIdleTimeout(flow, flowMod); salToOFFlowHardTimeout(flow, flowMod); salToOFFlowPriority(flow, flowMod); salToOFFlowBufferId(flow, flowMod); salToOFFlowOutPort(flow, flowMod); salToOFFlowOutGroup(flow, flowMod); // convert and inject flowFlags FlowFlagReactor.getInstance.convert(flow.getFlags, version, flowMod, datapathid); // 下面的match域和instructions域往往是我们最关心的因为扩展flowmod消息就是修改这两个地方 这个地方需要特别注意!!! MatchReactor.getInstance.convert(flow.getMatch, version, flowMod, datapathid); if (flow.getInstructions != null) { flowMod.setInstruction(toInstructions(flow, version, datapathid)); flowMod.setAction(getActions(version, datapathid, flow)); } flowMod.setVersion(version); return flowMod; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | @Override public Future<RpcResult<UpdateFlowOutput>> flowMod(……) { LOG.debug("Calling OFLibrary flowMod"); Future<RpcResult<Void>> response = null; try {// 下发flowmod,返回值为Future,这个future是一个RPC,在下面代码会进行解析 response = getConnectionAdapter(cookie).flowMod(input); } catch (ConnectionException e) { return RpcResultUtil.getRpcErrorFuture(e); } // 将future放到线程中,如果接收到返回值,则调用apply方法。Apply方法会把RPC结果转成UpdateOutput类型 ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform( JdkFutureAdapters.listenInPoolThread(response), new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>> { @Override public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) { // 实例化UpdateFlowOutput,将RPC结果放到这个实例中,返回给业务层,由业务进行后续处理(如果是addflow则后续处理就是把UpdateFlowOutput转成AddFlowOutput) UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder; BigInteger bigIntXid = BigInteger.valueOf(input.getXid); flowModOutput.setTransactionId(new TransactionId(bigIntXid)); UpdateFlowOutput result = flowModOutput.build; RpcResult<UpdateFlowOutput> rpcResult = RpcResultBuilder .<UpdateFlowOutput>status(inputArg.isSuccessful) .withResult(result).withRpcErrors(inputArg.getErrors) .build; return rpcResult; } }); return xidResult; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; // byte数组用于保存序列化后数据 try { if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); try { encode(ctx, cast, buf); // encode这个接口用于序列化操作,也就是有openflowjava实现,实际指向为OFEncoder.encode。 } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable) { ctx.write(buf, promise); // 如果上面序列化操作正确,则会进入此分支,netty会把bytebuf放到队列中,等待时机将数据发送到网络中。 } else { buf.release; ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } …. } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | @Override protected void encode(ChannelHandlerContext ctx, MessageListenerWrapper wrapper, ByteBuf out) throws Exception { LOGGER.trace("Encoding"); try { // 根据报文内容,在找序列化工厂中查找对应序列化对象,然后进行序列化操作。 // 其中参数wrapper是就是来自netty流水线处理后的数据 // 参数out就是用于保存序列化后数据 serializationFactory.messageToBuffer(wrapper.getMsg.getVersion, out, wrapper.getMsg); if(wrapper.getMsg instanceof FlowModInput){ statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_SENT); } statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_SUCCESS); } catch(Exception e) { LOGGER.warn("Message serialization failed ", e); statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_FAIL); Future<Void> newFailedFuture = ctx.newFailedFuture(e); wrapper.getListener.operationComplete(newFailedFuture); out.clear; return; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext sessionContext) { super(identifier, sessionContext); this.nodeId = nodeId; messageService = sessionContext.getMessageDispatchService; version = sessionContext.getPrimaryConductor.getVersion; final NotificationProviderService rpcNotificationProviderService = OFSessionUtil.getSessionManager.getNotificationProviderService; // rpcTaskContext实例化 rpcTaskContext = new OFRpcTaskContext; rpcTaskContext.setSession(sessionContext); // 关联session rpcTaskContext.setMessageService(messageService); //关联消息分发服务 //关联notifaction服务 rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService); rpcTaskContext.setMaxTimeout(maxTimeout); rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit); //设置线程池,默认线程池中有10个线程 rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager.getRpcPool); rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager.getMessageSpy); } |
欢迎光临 51学通信论坛2017新版 (http://bbs.51xuetongxin.com/) | Powered by Discuz! X3 |