1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | /** * @return */ private ServerFacade createAndConfigureServer { LOGGER.debug("Configuring .."); ServerFacade server = null; //实例化Channel,主要初始化序列化和反序列化工厂对象 ChannelInitializerFactory factory = new ChannelInitializerFactory; factory.setSwitchConnectionHandler(switchConnectionHandler); factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout); factory.setTlsConfig(connConfig.getTlsConfiguration); factory.setSerializationFactory(serializationFactory); factory.setDeserializationFactory(deserializationFactory); //根据不同协议类型,创建不同netty的服务,针对openflow协议来说,我们是TCP服务 TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol; if (transportProtocol.equals(TransportProtocol.TCP) || transportProtocol.equals(TransportProtocol.TLS)) {//创建tcp服务 server = new TcpHandler(connConfig.getAddress, connConfig.getPort); //实例化TcpHandler //初始化服务线程 TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer; ((TcpHandler) server).setChannelInitializer(channelInitializer); ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration); //初始化工作线程 NioEventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup; connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler); connectionInitializer.setChannelInitializer(channelInitializer); connectionInitializer.run; //这个地方很重要,调用run方法。下面会进行分析。 } else if (transportProtocol.equals(TransportProtocol.UDP)){//创建UDP服务 server = new UdpHandler(connConfig.getAddress, connConfig.getPort); ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer); } else { throw new IllegalStateException("Unknown transport protocol received: " + transportProtocol); } server.setThreadConfig(connConfig.getThreadConfiguration); return server; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | @Override protected void initChannel(final SocketChannel ch) { if (ch.remoteAddress != null) { // 当一个client请求连接到来时,进行判断是否允许访问 // 在第二次开发的时候可以在这里进行访问控制。 // 比如说:黑名单ip不允许访问 InetAddress switchAddress = ch.remoteAddress.getAddress; int port = ch.localAddress.getPort; int remotePort = ch.remoteAddress.getPort; LOGGER.debug("Incoming connection from (remote address): {}:{} --> :{}", switchAddress.toString, remotePort, port); if (!getSwitchConnectionHandler.accept(switchAddress)) { ch.disconnect; LOGGER.debug("Incoming connection rejected"); return; } } //通过上面if判断后则表示允许连接 LOGGER.debug("Incoming connection accepted - building pipeline"); allChannels.add(ch); ConnectionFacade connectionFacade = null; //设置连接属性,如超时时间 connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null); try { LOGGER.debug("calling plugin: {}", getSwitchConnectionHandler); getSwitchConnectionHandler.onSwitchConnected(connectionFacade); connectionFacade.checkListeners; ch.pipeline.addLast(PipelineHandlers.IDLE_HANDLER.name, new IdleHandler(getSwitchIdleTimeout, TimeUnit.MILLISECONDS)); boolean tlsPresent = false; // If this channel is configured to support SSL it will only support SSL if (getTlsConfiguration != null) {// TLS连接处理 … } // 流水线处理注册 注册ChannelInboundHandlerAdapter ch.pipeline.addLast(PipelineHandlers.OF_FRAME_DECODER.name, new OFFrameDecoder(connectionFacade, tlsPresent));// openflow头解析 ch.pipeline.addLast(PipelineHandlers.OF_VERSION_DETECTOR.name, new OFVersionDetector); // openflow版本号解析 OFDecoder ofDecoder = new OFDecoder;//实例化反序列化对象并加到 ofDecoder.setDeserializationFactory(getDeserializationFactory); ch.pipeline.addLast(PipelineHandlers.OF_DECODER.name, ofDecoder); //流水线处理注册 注册ChannelOutboundHandlerAdapter OFEncoder ofEncoder = new OFEncoder;//实例化序列化对象 ofEncoder.setSerializationFactory(getSerializationFactory); ch.pipeline.addLast(PipelineHandlers.OF_ENCODER.name, ofEncoder); //流水线处理注册 注册ChannelInboundHandlerAdapter 后面会用到 ch.pipeline.addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name, new DelegatingInboundHandler(connectionFacade)); if (!tlsPresent) { connectionFacade.fireConnectionReadyNotification; } } catch (Exception e) { LOGGER.warn("Failed to initialize channel", e); ch.close; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | @Override protected void decode(ChannelHandlerContext ctx, VersionMessageWrapper msg, List<Object> out) throws Exception { statisticsCounter.incrementCounter(CounterEventTypes.US_RECEIVED_IN_OFJAVA); if (LOGGER.isDebugEnabled) { LOGGER.debug("VersionMessageWrapper received"); LOGGER.debug("<< {}", ByteBufUtils.byteBufToHexString(msg.getMessageBuffer)); } try { // 经过上一个流水线处理后得到of版本号即msg.getVersion,然后再反序列化工厂中查找 // 对应的实例。 final DataObject dataObject = deserializationFactory.deserialize(msg.getMessageBuffer, msg.getVersion); if (dataObject == null) { LOGGER.warn("Translated POJO is null"); statisticsCounter.incrementCounter(CounterEventTypes.US_DECODE_FAIL); } else { out.add(dataObject);//把反序列化后的对象放到list中,由业务层进行后续处理 statisticsCounter.incrementCounter(CounterEventTypes.US_DECODE_SUCCESS); } } catch (Exception e) { LOGGER.warn("Message deserialization failed", e); statisticsCounter.incrementCounter(CounterEventTypes.US_DECODE_FAIL); } finally { msg.getMessageBuffer.release; //消息buffer释放 } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | @Override public void consume(final DataObject message) { LOG.debug("ConsumeIntern msg on {}", channel); if (disconnectOccured ) { return; } if (message instanceof Notification) { ... } else if (message instanceof HelloMessage) {// hello 消息 LOG.info("Hello received / branch"); messageListener.onHelloMessage((HelloMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof MultipartReplyMessage) {// multipart消息 if (outputManager != null) { outputManager.onMessage((OfHeader) message); } messageListener.onMultipartReplyMessage((MultipartReplyMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof PacketInMessage) {//packet in消息 LOG.debug(((PacketInMessage) message).getXid.toString); messageListener.onPacketInMessage((PacketInMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); .... } else if (message instanceof OfHeader) {// 非notification消息 如果barrier-reply消息 LOG.debug("OFheader msg received"); if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { RpcResponseKey key = createRpcResponseKey((OfHeader) message); LOG.debug("Created RpcResponseKey is {} ",key); final ResponseExpectedRpcListener<?> listener = findRpcResponse(key); if (listener != null) { LOG.debug("corresponding rpcFuture found"); listener.completed((OfHeader)message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); LOG.debug("after setting rpcFuture"); responseCache.invalidate(key); } else { LOG.warn("received unexpected rpc response: {}", key); } } } else { LOG.warn("message listening not supported for type: {}", message.getClass); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | @Override public synchronized void shake(HelloMessage receivedHello) { if (version != null) { // Some switches respond with a second HELLO acknowledging our HELLO // but we&#39;ve already completed the handshake based on the negotiated // version and have registered this switch. LOG.debug("Hello recieved after handshake already settled ... ignoring."); return; } LOG.trace("handshake STARTED"); setActiveXid(20L); try { if (receivedHello == null) {//进入这个分支表示作为客户端连接服务器 // first Hello sending sendHelloMessage(highestVersion, getNextXid); lastProposedVersion = highestVersion; LOG.trace("ret - firstHello+wait"); return; } // process the 2. and later hellos 处理接收到hello握手消息 Short remoteVersion = receivedHello.getVersion; List<Elements> elements = receivedHello.getElements; setActiveXid(receivedHello.getXid); List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements); LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion, receivedHello.getXid, remoteVersionBitmap); // 下面这个if-else分支,进入那个都无所谓,最终都会调用到postHandshake // 为了简单假设进入if分支 if (useVersionBitmap && remoteVersionBitmap != null) { // versionBitmap on both sides -> ONE STEP DECISION handleVersionBitmapNegotiation(elements); } else { // versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying handleStepByStepVersionNegotiation(remoteVersion); } } catch (Exception ex) { errorHandler.handleException(ex, null); LOG.trace("ret - shake fail - closing"); handshakeListener.onHandshakeFailure; } } private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception { final Short proposedVersion = proposeCommonBitmapVersion(elements); if (lastProposedVersion == null) {//先发送hello reply消息,采用futures方式 // first hello has not been sent yet Long nexHelloXid = getNextXid; ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid); Futures.addCallback(helloDone, new FutureCallback<Void> { @Override public void onSuccess(Void result) { LOG.trace("ret - DONE - versionBitmap"); postHandshake(proposedVersion, getNextXid); // 握手成功进入session创建 } @Override public void onFailure(Throwable t) { // NOOP } }); LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid); } else { LOG.trace("ret - DONE - versionBitmap"); postHandshake(proposedVersion, getNextXid); } } @Override public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput, Short negotiatedVersion) { postHandshakeBasic(featureOutput, negotiatedVersion); // 创建session会话 // session创建完成之后向交换机发送端口统计消息 if (version == OFConstants.OFP_VERSION_1_3_x) { requestPorts; } else if (version == OFConstants.OFP_VERSION_1_0) { requestDesc; } } public static void registerSession(ConnectionConductorImpl connectionConductor, GetFeaturesOutput features, short version) { SwitchSessionKeyOF sessionKey = createSwitchSessionKey(features .getDatapathId); SessionContext sessionContext = getSessionManager.getSessionContext(sessionKey); if (LOG.isDebugEnabled) { LOG.debug("registering sessionKey: {}", Arrays.toString(sessionKey.getId)); } if (features.getAuxiliaryId == null || features.getAuxiliaryId == 0) { // handle primary 创建session并且注册到mdsal中 if (sessionContext != null) { LOG.warn("duplicate datapathId occured while registering new switch session: " + dumpDataPathId(features.getDatapathId)); getSessionManager.invalidateSessionContext(sessionKey); } // register new session context (based primary conductor) SessionContextOFImpl context = new SessionContextOFImpl; context.setPrimaryConductor(connectionConductor); context.setNotificationEnqueuer(connectionConductor); context.setFeatures(features); context.setSessionKey(sessionKey); context.setSeed((int) System.currentTimeMillis); connectionConductor.setSessionContext(context); getSessionManager.addSessionContext(sessionKey, context); } else { ... } // check registration result SessionContext resulContext = getSessionManager.getSessionContext(sessionKey); if (resulContext == null) { throw new IllegalStateException("session context registration failed"); } else { if (!resulContext.isValid) { throw new IllegalStateException("registered session context is invalid"); } } } @Override public void onSessionAdded(final SwitchSessionKeyOF sessionKey, final SessionContext context) { GetFeaturesOutput features = context.getFeatures; BigInteger datapathId = features.getDatapathId; InstanceIdentifier<Node> identifier = identifierFromDatapathId(datapathId); NodeRef nodeRef = new NodeRef(identifier); NodeId nodeId = nodeIdFromDatapathId(datapathId); // 针对每一个有效连接,都会创建一个对象ModelDrivenSwitchImpl, 用于描述底层交换机 // 这个对象是日后操作源头,比如说下发flowmod、barrier等 ModelDrivenSwitchImpl ofSwitch = new ModelDrivenSwitchImpl(nodeId, identifier, context); CompositeObjectRegistration<ModelDrivenSwitch> registration = ofSwitch.register(rpcProviderRegistry); context.setProviderRegistration(registration); LOG.debug("ModelDrivenSwitch for {} registered to MD-SAL.", datapathId); NotificationQueueWrapper wrappedNotification = new NotificationQueueWrapper( nodeAdded(ofSwitch, features, nodeRef), context.getFeatures.getVersion); context.getNotificationEnqueuer.enqueueNotification(wrappedNotification); } |
欢迎光临 51学通信论坛2017新版 (http://bbs.51xuetongxin.com/) | Powered by Discuz! X3 |