|
基于ODL开发已经有一段时间了,对于一个全新的平台,总是喜欢每隔一段时间就总结一番,本篇算是第一篇吧。下面会介绍以下内容:
1.openflow服务注册
2.netty服务创建
3.序列化、反序列化工作原理
4.从hello到of-session
</p>一、前提
Openflowjava是基于netty进行的开发,在阅读Openflowjava的源码的时候需要简单了解一下netty原理。
Openflowjava是一个独立的线程,功能入口函数是SwitchConnectionProviderImpl.java中startup函数。因此以类SwitchConnectionProviderImpl为起点进行展开分析。
二、Openflow服务注册
openflowjava使用了多种设计模式,例如生产者-消费者、工厂模式等。通过类名SwitchConnectionProviderImpl便知,连接管理服务使用的生产者-消费者模式(openflowjava是生成者,openflowplugin是消费者)。我们先看一下构造函数:
Shell
/** Constructor */
public SwitchConnectionProviderImpl {
// 序列化工厂注册 底层实现就是初始化map
serializerRegistry = new SerializerRegistryImpl;
serializerRegistry.init;
serializationFactory = new SerializationFactory;
serializationFactory.setSerializerTable(serializerRegistry);
// 反序列化工厂注册 底层实现就是初始化map
deserializerRegistry = new DeserializerRegistryImpl;
deserializerRegistry.init;
deserializationFactory = new DeserializationFactory;
deserializationFactory.setRegistry(deserializerRegistry);
}
通过成员变量名称知道,序列化和反序列化采用工厂模式,由于openflow消息很多,因此采用工厂模式即可针对不同的消息,实例化不同的对象,以达到统一处理。下面是消息注册的整体组织架构图:
由此结构图可知,序列化和反序列化工厂注册消息结构类似,里面的消息类型基本是对应的,如:hellorequest和helloreply,barrierrequest和barrierreply等。
上面这个结构图信息来自serializerRegistry.init和deserializerRegistry.init;方法,在进行展开说明。这里想说一下,大多数都基于odl进行二次开发,因此很多时候需要扩展openflow消息,通过上面结构图,如果扩展openflow消息,则需要在openflow message type增加对应代码,如果扩展flowmod则需要在match entry增加对应代码。
通过上图我们能做到,当我们扩展openflow协议的时候,在哪里增加代码即可,内部代码实现需要我们自己去深入分析。
三、 netty服务创建
上面已经分析过,startup是openflowjava入口函数,此函数比较简单,如下所示:
Shell
@Override
public ListenableFuture<Boolean> startup {
LOGGER.debug("Startup summoned");
ListenableFuture<Boolean> result = null;
try {
serverFacade = createAndConfigureServer; //这个函数是重点,根据配置创建netty服务
if (switchConnectionHandler == null) {
throw new IllegalStateException("SwitchConnectionHandler is not set");
}
new Thread(serverFacade).start;//线程启动
result = serverFacade.getIsOnlineFuture;
} catch (Exception e) {
SettableFuture<Boolean> exResult = SettableFuture.create;
exResult.setException(e);
result = exResult;
}
return result;
}
方法createAndConfigureServer主要流程图框架,如下图所示:
我们在来看一下代码,这部分需要有一定的netty框架的基础知识:
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
/**
* @return
*/
private ServerFacade createAndConfigureServer {
LOGGER.debug("Configuring ..");
ServerFacade server = null;
//实例化Channel,主要初始化序列化和反序列化工厂对象
ChannelInitializerFactory factory = new ChannelInitializerFactory;
factory.setSwitchConnectionHandler(switchConnectionHandler);
factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout);
factory.setTlsConfig(connConfig.getTlsConfiguration);
factory.setSerializationFactory(serializationFactory);
factory.setDeserializationFactory(deserializationFactory);
//根据不同协议类型,创建不同netty的服务,针对openflow协议来说,我们是TCP服务
TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol;
if (transportProtocol.equals(TransportProtocol.TCP) || transportProtocol.equals(TransportProtocol.TLS)) {//创建tcp服务
server = new TcpHandler(connConfig.getAddress, connConfig.getPort); //实例化TcpHandler
//初始化服务线程
TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer;
((TcpHandler) server).setChannelInitializer(channelInitializer);
((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration);
//初始化工作线程
NioEventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup;
connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler);
connectionInitializer.setChannelInitializer(channelInitializer);
connectionInitializer.run; //这个地方很重要,调用run方法。下面会进行分析。
} else if (transportProtocol.equals(TransportProtocol.UDP)){//创建UDP服务
server = new UdpHandler(connConfig.getAddress, connConfig.getPort);
((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer);
} else {
throw new IllegalStateException("Unknown transport protocol received: " + transportProtocol);
}
server.setThreadConfig(connConfig.getThreadConfiguration);
return server;
}
|
上面这些内容,大部分都是基于netty接口实现的,本人也只是了解一点netty,这不能做深入分析。我们接下来看一下上面提到的run方法,其实run方法很简单,里面只是调用了set方法:
public void run {
b = new Bootstrap;
b.group(workerGroup).channel(NioSocketChannel.class)
.handler(channelInitializer); // handler设置,业务处理逻辑层
}
这个地方的channelInitializer是主要作用是注册业务处理,即当接收到一个网络数据包时应该如何对其进行解析。在查看类TcpChannelInitializer的具体实现时候主要看方法initChannel,这个方法是一个接口,主要用于netty框架调用。先简单介绍一下netty处理报文机制:
Netty中的所有handler都实现自ChannelHandler接口,按照输入输出方向来划分,可分为ChannelInboundHandler、ChannelOutboundHandler两大类。
ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;
ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。
inbound 的处理顺序是与 add 顺序一致的, 而 outbound 的处理顺序是跟 add 顺序相反的。
有两点需要特别说明:
1)inbound格式的数据流处理完成后不会进入outbound处理流程。也就是说一个流要么进入inbound要进入outbound。
2)进入流水线之后,上一个处理流程的输出结果,可能会作为下一个处理流程的输入。注意是可能而不是必须。
我们来看一下代码:
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
@Override
protected void initChannel(final SocketChannel ch) {
if (ch.remoteAddress != null) {
// 当一个client请求连接到来时,进行判断是否允许访问
// 在第二次开发的时候可以在这里进行访问控制。
// 比如说:黑名单ip不允许访问
InetAddress switchAddress = ch.remoteAddress.getAddress;
int port = ch.localAddress.getPort;
int remotePort = ch.remoteAddress.getPort;
LOGGER.debug("Incoming connection from (remote address): {}:{} --> :{}",
switchAddress.toString, remotePort, port);
if (!getSwitchConnectionHandler.accept(switchAddress)) {
ch.disconnect;
LOGGER.debug("Incoming connection rejected");
return;
}
}
//通过上面if判断后则表示允许连接
LOGGER.debug("Incoming connection accepted - building pipeline");
allChannels.add(ch);
ConnectionFacade connectionFacade = null;
//设置连接属性,如超时时间
connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null);
try {
LOGGER.debug("calling plugin: {}", getSwitchConnectionHandler);
getSwitchConnectionHandler.onSwitchConnected(connectionFacade);
connectionFacade.checkListeners;
ch.pipeline.addLast(PipelineHandlers.IDLE_HANDLER.name, new IdleHandler(getSwitchIdleTimeout, TimeUnit.MILLISECONDS));
boolean tlsPresent = false;
// If this channel is configured to support SSL it will only support SSL
if (getTlsConfiguration != null) {// TLS连接处理
…
}
// 流水线处理注册 注册ChannelInboundHandlerAdapter
ch.pipeline.addLast(PipelineHandlers.OF_FRAME_DECODER.name,
new OFFrameDecoder(connectionFacade, tlsPresent));// openflow头解析
ch.pipeline.addLast(PipelineHandlers.OF_VERSION_DETECTOR.name, new OFVersionDetector); // openflow版本号解析
OFDecoder ofDecoder = new OFDecoder;//实例化反序列化对象并加到
ofDecoder.setDeserializationFactory(getDeserializationFactory);
ch.pipeline.addLast(PipelineHandlers.OF_DECODER.name, ofDecoder);
//流水线处理注册 注册ChannelOutboundHandlerAdapter
OFEncoder ofEncoder = new OFEncoder;//实例化序列化对象
ofEncoder.setSerializationFactory(getSerializationFactory);
ch.pipeline.addLast(PipelineHandlers.OF_ENCODER.name, ofEncoder);
//流水线处理注册 注册ChannelInboundHandlerAdapter 后面会用到
ch.pipeline.addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name, new DelegatingInboundHandler(connectionFacade));
if (!tlsPresent) {
connectionFacade.fireConnectionReadyNotification;
}
} catch (Exception e) {
LOGGER.warn("Failed to initialize channel", e);
ch.close;
}
}
|
假如控制器收到交换机的一个消息,那么会经过OF_FRAME_DECODER、OF_VERSION_DETECTOR、OF_DECODER处理。其中在OF_VERSION_DETECTOR确定openflow版本号(1.0还是1.3等),OF_DECODER是针对某个特定消息处理,如packet-in消息。
经过这个函数里之后,仅仅代表tcp建立成功,openflow会话还未建立。即没有处理hello消息,下面我们会分析hello处理流程。
四、序列化、反序列化工作原理
反序列化类是OFDecoder,这个只有三个方法,其中最重要的方法是decode。该方法是由上层调用,后面会在进行分析的,这里先看一下decode代码:
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@Override
protected void decode(ChannelHandlerContext ctx, VersionMessageWrapper msg,
List<Object> out) throws Exception {
statisticsCounter.incrementCounter(CounterEventTypes.US_RECEIVED_IN_OFJAVA);
if (LOGGER.isDebugEnabled) {
LOGGER.debug("VersionMessageWrapper received");
LOGGER.debug("<< {}", ByteBufUtils.byteBufToHexString(msg.getMessageBuffer));
}
try {
// 经过上一个流水线处理后得到of版本号即msg.getVersion,然后再反序列化工厂中查找
// 对应的实例。
final DataObject dataObject = deserializationFactory.deserialize(msg.getMessageBuffer,
msg.getVersion);
if (dataObject == null) {
LOGGER.warn("Translated POJO is null");
statisticsCounter.incrementCounter(CounterEventTypes.US_DECODE_FAIL);
} else {
out.add(dataObject);//把反序列化后的对象放到list中,由业务层进行后续处理
statisticsCounter.incrementCounter(CounterEventTypes.US_DECODE_SUCCESS);
}
} catch (Exception e) {
LOGGER.warn("Message deserialization failed", e);
statisticsCounter.incrementCounter(CounterEventTypes.US_DECODE_FAIL);
} finally {
msg.getMessageBuffer.release; //消息buffer释放
}
}
|
通过方法deserializationFactory.deserialize,此方法实际是根据版本号及openflowtype生成key在map中查找,实现如下:
Shell
public DataObject deserialize(final ByteBuf rawMessage, final short version) {
DataObject dataObject = null;
int type = rawMessage.readUnsignedByte;
//从map中获取具体实例
Class<?> clazz = messageClassMap.get(new TypeToClassKey(version, type));
rawMessage.skipBytes(EncodeConstants.SIZE_OF_SHORT_IN_BYTES);
OFDeserializer<DataObject> deserializer = registry.getDeserializer(
new MessageCodeKey(version, type, clazz));
//调用具体实例的deserialize方法
dataObject = deserializer.deserialize(rawMessage);
return dataObject;
}
假如此消息是hello消息且openflow协议版本号1.3,则具体实例为HelloMessageFactory,那么反序列化方法则为:
public HelloMessage deserialize(ByteBuf rawMessage) {
HelloMessageBuilder builder = new HelloMessageBuilder;
builder.setVersion((short) EncodeConstants.OF13_VERSION_ID);
builder.setXid(rawMessage.readUnsignedInt);
if (rawMessage.readableBytes > 0) {
builder.setElements(readElement(rawMessage));
}
return builder.build;
}
所以上面decode方法中dataobject实际指向是HelloMessageBuilder
五、从hello消息到of-Session
从hello反序列化到of-session(openflowplugin)层次比较深入,hello返序列化方法处理结束后仅仅代表数据包处理完成,但是业务层还未处理,即还需要创建session。方法处理完会回到netty的方法:MessageToMessageDecoder.channelRead,通过下图展示出从netty业务层:
业务层中有一个类是DelegatingInboundHandler,这个在注册netty流水线时已经提到了。我们直接看consume方法,这个地方是所有数据包处理入口。代码如下:
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
@Override
public void consume(final DataObject message) {
LOG.debug("ConsumeIntern msg on {}", channel);
if (disconnectOccured ) {
return;
}
if (message instanceof Notification) {
...
} else if (message instanceof HelloMessage) {// hello 消息
LOG.info("Hello received / branch");
messageListener.onHelloMessage((HelloMessage) message);
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof MultipartReplyMessage) {// multipart消息
if (outputManager != null) {
outputManager.onMessage((OfHeader) message);
}
messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof PacketInMessage) {//packet in消息
LOG.debug(((PacketInMessage) message).getXid.toString);
messageListener.onPacketInMessage((PacketInMessage) message);
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
....
} else if (message instanceof OfHeader) {// 非notification消息 如果barrier-reply消息
LOG.debug("OFheader msg received");
if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
RpcResponseKey key = createRpcResponseKey((OfHeader) message);
LOG.debug("Created RpcResponseKey is {} ",key);
final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
if (listener != null) {
LOG.debug("corresponding rpcFuture found");
listener.completed((OfHeader)message);
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
LOG.debug("after setting rpcFuture");
responseCache.invalidate(key);
} else {
LOG.warn("received unexpected rpc response: {}", key);
}
}
} else {
LOG.warn("message listening not supported for type: {}", message.getClass);
}
}
|
由上面代码可知,当时hello消息则会进入onHelloMessage,进入业务逻辑处理。下面这张图显示出,业务处理流程:
这个流程图调用关系很深,需要一点一点阅读代码,这里主要介绍几个关键方法,不一一介绍:
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
@Override
public synchronized void shake(HelloMessage receivedHello) {
if (version != null) {
// Some switches respond with a second HELLO acknowledging our HELLO
// but we&#39;ve already completed the handshake based on the negotiated
// version and have registered this switch.
LOG.debug("Hello recieved after handshake already settled ... ignoring.");
return;
}
LOG.trace("handshake STARTED");
setActiveXid(20L);
try {
if (receivedHello == null) {//进入这个分支表示作为客户端连接服务器
// first Hello sending
sendHelloMessage(highestVersion, getNextXid);
lastProposedVersion = highestVersion;
LOG.trace("ret - firstHello+wait");
return;
}
// process the 2. and later hellos 处理接收到hello握手消息
Short remoteVersion = receivedHello.getVersion;
List<Elements> elements = receivedHello.getElements;
setActiveXid(receivedHello.getXid);
List<Boolean> remoteVersionBitmap = MessageFactory.digVersions(elements);
LOG.debug("Hello message: version={}, xid={}, bitmap={}", remoteVersion,
receivedHello.getXid, remoteVersionBitmap);
// 下面这个if-else分支,进入那个都无所谓,最终都会调用到postHandshake
// 为了简单假设进入if分支
if (useVersionBitmap && remoteVersionBitmap != null) {
// versionBitmap on both sides -> ONE STEP DECISION
handleVersionBitmapNegotiation(elements);
} else {
// versionBitmap missing at least on one side -> STEP-BY-STEP NEGOTIATION applying
handleStepByStepVersionNegotiation(remoteVersion);
}
} catch (Exception ex) {
errorHandler.handleException(ex, null);
LOG.trace("ret - shake fail - closing");
handshakeListener.onHandshakeFailure;
}
}
private void handleVersionBitmapNegotiation(List<Elements> elements) throws Exception {
final Short proposedVersion = proposeCommonBitmapVersion(elements);
if (lastProposedVersion == null) {//先发送hello reply消息,采用futures方式
// first hello has not been sent yet
Long nexHelloXid = getNextXid;
ListenableFuture<Void> helloDone = sendHelloMessage(proposedVersion, nexHelloXid);
Futures.addCallback(helloDone, new FutureCallback<Void> {
@Override
public void onSuccess(Void result) {
LOG.trace("ret - DONE - versionBitmap");
postHandshake(proposedVersion, getNextXid); // 握手成功进入session创建
}
@Override
public void onFailure(Throwable t) {
// NOOP
}
});
LOG.trace("next proposal [{}] with versionBitmap hooked ..", nexHelloXid);
} else {
LOG.trace("ret - DONE - versionBitmap");
postHandshake(proposedVersion, getNextXid);
}
}
@Override
public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
Short negotiatedVersion) {
postHandshakeBasic(featureOutput, negotiatedVersion); // 创建session会话
// session创建完成之后向交换机发送端口统计消息
if (version == OFConstants.OFP_VERSION_1_3_x) {
requestPorts;
} else if (version == OFConstants.OFP_VERSION_1_0) {
requestDesc;
}
}
public static void registerSession(ConnectionConductorImpl connectionConductor,
GetFeaturesOutput features, short version) {
SwitchSessionKeyOF sessionKey = createSwitchSessionKey(features
.getDatapathId);
SessionContext sessionContext = getSessionManager.getSessionContext(sessionKey);
if (LOG.isDebugEnabled) {
LOG.debug("registering sessionKey: {}", Arrays.toString(sessionKey.getId));
}
if (features.getAuxiliaryId == null || features.getAuxiliaryId == 0) {
// handle primary 创建session并且注册到mdsal中
if (sessionContext != null) {
LOG.warn("duplicate datapathId occured while registering new switch session: "
+ dumpDataPathId(features.getDatapathId));
getSessionManager.invalidateSessionContext(sessionKey);
}
// register new session context (based primary conductor)
SessionContextOFImpl context = new SessionContextOFImpl;
context.setPrimaryConductor(connectionConductor);
context.setNotificationEnqueuer(connectionConductor);
context.setFeatures(features);
context.setSessionKey(sessionKey);
context.setSeed((int) System.currentTimeMillis);
connectionConductor.setSessionContext(context);
getSessionManager.addSessionContext(sessionKey, context);
} else {
...
}
// check registration result
SessionContext resulContext = getSessionManager.getSessionContext(sessionKey);
if (resulContext == null) {
throw new IllegalStateException("session context registration failed");
} else {
if (!resulContext.isValid) {
throw new IllegalStateException("registered session context is invalid");
}
}
}
@Override
public void onSessionAdded(final SwitchSessionKeyOF sessionKey, final SessionContext context) {
GetFeaturesOutput features = context.getFeatures;
BigInteger datapathId = features.getDatapathId;
InstanceIdentifier<Node> identifier = identifierFromDatapathId(datapathId);
NodeRef nodeRef = new NodeRef(identifier);
NodeId nodeId = nodeIdFromDatapathId(datapathId);
// 针对每一个有效连接,都会创建一个对象ModelDrivenSwitchImpl, 用于描述底层交换机
// 这个对象是日后操作源头,比如说下发flowmod、barrier等
ModelDrivenSwitchImpl ofSwitch = new ModelDrivenSwitchImpl(nodeId, identifier, context);
CompositeObjectRegistration<ModelDrivenSwitch> registration =
ofSwitch.register(rpcProviderRegistry);
context.setProviderRegistration(registration);
LOG.debug("ModelDrivenSwitch for {} registered to MD-SAL.", datapathId);
NotificationQueueWrapper wrappedNotification = new NotificationQueueWrapper(
nodeAdded(ofSwitch, features, nodeRef),
context.getFeatures.getVersion);
context.getNotificationEnqueuer.enqueueNotification(wrappedNotification);
}
|
总结一下:
1、从openflowjava到openflowplugin流程比较繁琐,需要耐心阅读代码。
2、代码中有很多设计模式,需要了解设计模式后在阅读代码时候才能做到“知其然而知其所以然”
3、openflowjava底层是用netty作为连接管理层,因此需要简单了解netty。
4、有生以来第一次写java相关文章,很多逻辑可能存在"断篇",如有不合适之处,请批评指出。
5、接下来会介绍flowmod下发流程。
作者简介:
徐小冰:毕业于河北大学,主要从事嵌入式软件开发,虚拟化,SDN。目前基于ODL和Open vSwitch进行二次开发,希望与广大网友一起探讨学习。作者系OpenDaylihgt群(194240432)资深活跃用户,@IT难人。
--------------华丽的分割线------------------
本文系《SDNLAB原创文章奖励计划》投稿文章,该计划旨在鼓励广大从业人员在SDN/NFV/Cloud网络领域创新技术、开源项目、产业动态等方面进行经验和成果的文字传播、分享、交流。有意向投稿的同学请通过官方唯一指定投稿通道进行文章投递,投稿细则请参考《SDNLAB原创文章奖励计划》
声明:本文转载自网络。版权归原作者所有,如有侵权请联系删除。 |
扫描并关注51学通信微信公众号,获取更多精彩通信课程分享。
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
|