vdsm中的qos
2024-02-26 15:13:18

https://tonydeng.github.io/sdn-handbook/linux/tc.html
Vdsm 中,vm 的流量控制与 host 的流量控制有不同的接口和调用链。
fq_codel 是一种流量控制算法:https://queue.acm.org/detail.cfm?id=2209336

QOS

服务质量(英语:Quality of Service,缩写QoS)是一个术语,在分组交换网络领域中指网络满足给定业务合约的几率;或在许多情况下,非正式地指分组在网络中两点间通过的几率。QoS是一种控制机制,它提供了针对不同用户或者不同数据流采用相应不同的优先级,或者是根据应用程序的要求,保证数据流的性能达到一定的水准。QoS的保证对于容量有限的网络来说是十分重要的,特别是对于流多媒体应用,例如VoIP和IPTV等,因为这些应用常常需要固定的传输率,对延迟也比较敏感。

Ovirt 中的 QOS

在 ovirt 中,可以针对磁盘、主机网络、虚拟机网络、cpu设置 qos 。

Host 网络 Qos

代码分析

Ovrit 中设置网络相关的,基本都由一个函数(setupNetworks)作为入口。
https://www.ovirt.org/develop/developer-guide/vdsm/network.html

1
2
3
4
5
6
Vdsm defines a concept of “host network”. These networks are configured by the following Vdsm verbs:
addNetwork Add the required Linux networking devices for a new network, as well as the configuration files required to re-create these devices on next boot.
delNetwork Delete a previously-added network.
editNetwork Replace an existing network definition by a new one.
setSafeNetConfig Declare network configuration as “safe”, so it persists after host reboot.
setupNetworks

在 ovirt-engine 代码中,以下几种情况会调用 setupNetworks:
1、创建主机、更新主机,或者手动调用主机的重新安装会执行;
2、在主机的网络编辑页面对网络适配器的增删改会调用;
3、复制主机网络;
4、主机网络启用/禁用;
5、给网络配置打标签;
首先,在 API.py 中暴露 rpc 接口给 engine: 代码相对路径 lib/vdsm/API.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Networking-related functions
@api.logged(on="api.network")
def setupNetworks(self, networks, bondings, options):
"""Add a new network to this vds, replacing an old one."""

if not self._cif._networkSemaphore.acquire(blocking=False):
self.log.warn('concurrent network verb already executing')
return errCode['unavail']

try:
self._cif._netConfigDirty = True
supervdsm.getProxy().setupNetworks(networks, bondings, options)
if options.get('commitOnSuccess'):
# This option ensures that persist is called after
# setupNetworks
self._cif._netConfigDirty = False
return {'status': doneCode}
except ConfigNetworkError as e:
self.log.error('%s', e.msg, exc_info=True)
return {'status': {'code': e.errCode, 'message': e.msg}}
except exception.HookError as e:
return response.error('hookError', 'Hook error: ' + str(e))
finally:
self._cif._networkSemaphore.release()

Networks 的接口通过 supervdsm-api 暴露出来,所以非 supervdsm 服务在调用时需要使用 getProxy() 去调用,被调用的函数位置为:lib/vdsm/network/api.py -> _setup_networks

1
2
3
4
5
6
7
def _setup_networks(networks, bondings, options):
# exec before_setupnetwork_hooks
bondings, networks, options = _apply_hook(bondings, networks, options)

in_rollback = options.get('_inRollback', False)
with _rollback():
netswitch.configurator.setup(networks, bondings, options, in_rollback)

继续跟着 setup 的函数进去,会来到一个名为 _setup_nmstate 的函数,位置在: lib/vdsm/network/netswitch/configurator.py
这里调用了 _setup_qos 的方法去设置 host 网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# https://cn.linux-console.net/?p=2621 声明式网络配置工具
# 这段注释说明了 vdsm 的 network 模块如何使用 nmstate 来设置网络,
# 并通过事务和回滚机制来确保网络配置的正确性和可用性。这样可以更好地管理网络配置,并提供可靠的网络连接。
def _setup_nmstate(networks, bondings, options, in_rollback):
"""
Setup the networks using nmstate as the backend provider.
nmstate handles the rollback by itself in case of an error during the
transaction or in case the resulted state does not include the desired one.

In order to support the connectivity check, the "regular" rollback is
used (the Transaction context).
"""
logging.info('Processing setup through nmstate')
desired_state = nmstate.generate_state(networks, bondings)
logging.info('Desired state: %s', desired_state)
_setup_dynamic_src_routing(networks)
nmstate.setup(desired_state, verify_change=not in_rollback)
net_info = NetInfo(netinfo_get())

with Transaction(in_rollback=in_rollback, persistent=False) as config:
_setup_qos(networks, net_info, config.networks)
for net_name, net_attrs in six.viewitems(networks):
if net_attrs.get('remove'):
config.removeNetwork(net_name)
for net_name, net_attrs in six.viewitems(networks):
# 如果是 remove 就执行,remove操作,不是remove,就执行set操作,所以实际上就一个删除,一个更新
if not net_attrs.get('remove'):
config.setNetwork(net_name, net_attrs)
for bond_name, bond_attrs in six.viewitems(bondings):
if bond_attrs.get('remove'):
config.removeBonding(bond_name)
for bond_name, bond_attrs in six.viewitems(bondings):
if not bond_attrs.get('remove'):
config.setBonding(bond_name, bond_attrs)
config.save()
link_setup.setup_custom_bridge_opts(networks)
connectivity.check(options)

_setup_qos 又去调用了 _configure_qos,而 _configure_qos 最终调用了 qos 模块的 qos.configure_outbound

注意:这里只有设置 outbound,没有 inbound,对应到 ovirt-engine,在设置主机网络的 qos 时,也只支持 outbound,不支持 inbound,原因是网络没有办法限制外部的设备往自身发送网络包,所以 inbound 规则本身能做的事情非常少,合理的方式是去限制网络数据包的发送方。

1
2
3
4
def _configure_qos(net_attrs, out):
vlan = net_attrs.get('vlan')
base_iface = _get_base_iface(net_attrs)
qos.configure_outbound(out, base_iface, vlan)

一直到 configure_outbound 这个函数,终于看到跟网络控制相关的一些代码了。
root_qdisc 获取了网卡设备的 qdisc。

1
2
3
4
5
6
7
8
9
10
11
12
def configure_outbound(qosOutbound, device, vlan_tag):
"""Adds the qosOutbound configuration to the backing device (be it bond
or nic). Adds a class and filter for default traffic if necessary. vlan_tag
can be None"""
root_qdisc = netinfo_qos.get_root_qdisc(tc.qdiscs(device))
class_id = '%x' % (_NON_VLANNED_ID if vlan_tag is None else vlan_tag)
if not root_qdisc or root_qdisc['kind'] != _SHAPING_QDISC_KIND:
_fresh_qdisc_conf_out(device, vlan_tag, class_id, qosOutbound)
else:
_qdisc_conf_out(
device, root_qdisc['handle'], vlan_tag, class_id, qosOutbound
)

然后 _fresh_qdisc_conf_out 会去更新 qdisc,当没有 root_qdisc 时会调用 _qdisc_conf_out 去新增一个 qdisc。
接着看一下 _qdisc_conf_out 方法,这里面就是在拼接一些规则,然后调用 tc 的命令去完成主机网络的配置。
_fresh_qdisc_conf_out 也是类似,拼接 tc 的规则,然后去完成主机网络的配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def _qdisc_conf_out(dev, root_qdisc_handle, vlan_tag, class_id, qos):
"""Adds the traffic class and filtering to the current hfsc qdisc"""
flow_id = _ROOT_QDISC_HANDLE + class_id

def filt_flow_id(filt, kind):
return filt.get(kind, {}).get('flowid')

filters = [
filt
for filt in tc._filters(dev, parent=root_qdisc_handle)
if flow_id in (filt_flow_id(filt, 'basic'), filt_flow_id(filt, 'u32'))
]

# Clear up any previous filters to the class
for filt in filters:
try:
tc.filter.delete(dev, filt['pref'], parent=root_qdisc_handle)
except tc.TrafficControlException as tce:
if tce.errCode != errno.EINVAL: # no filters exist -> EINVAL
raise

# Clear the class in case it exists
try:
tc.cls.delete(dev, classid=root_qdisc_handle + class_id)
except tc.TrafficControlException as tce:
if tce.errCode != errno.ENOENT:
raise

_add_hfsc_cls(dev, root_qdisc_handle, class_id, **qos)
if class_id == _DEFAULT_CLASSID:
_add_non_vlanned_filter(dev, root_qdisc_handle)
else:
if not _is_explicit_defined_default_class(dev):
(default_class,) = [
c['hfsc']
for c in tc.classes(dev)
if c['handle'] == _ROOT_QDISC_HANDLE + _DEFAULT_CLASSID
]
ls_max_rate = _max_hfsc_ls_rate(dev)
default_class['ls']['m2'] = ls_max_rate

tc.cls.delete(dev, classid=_ROOT_QDISC_HANDLE + _DEFAULT_CLASSID)
_add_hfsc_cls(
dev,
_ROOT_QDISC_HANDLE,
_DEFAULT_CLASSID,
ls=default_class['ls'],
)
_add_fair_qdisc(dev, _ROOT_QDISC_HANDLE, _DEFAULT_CLASSID)

_add_vlan_filter(dev, vlan_tag, root_qdisc_handle, class_id)
_add_fair_qdisc(dev, root_qdisc_handle, class_id)

VM 网络 Qos

vm qos 由 libvirt xml 定义,参考官网定义:https://libvirt.org/formatnetwork.html#quality-of-service

代码分析

vm 网络的 qos,由 lib/vdsm/API.py 中的 hotplugNic 设置网络接口的 inbound 和 outbound 的带宽。
在 ovirt-engine 中,在插入和拔出网络接口时,会调用 hotplugNic 方法。
首先,由 API.py 中的 hotplugNic 调用了 self.vm.hotplugNic(params)
去查看 self.vm.hotplugNic(params) 的代码:
其中主要就是解析 params 中的数据,然后拼接出来一个 xml 数据,并调用 libvirt 去操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@api.guard(_not_migrating)
def hotplugNic(self, params):
xml = params['xml']
nic = vmdevices.common.dev_from_xml(self, xml)
dom = xmlutils.fromstring(xml)
dom_devices = vmxml.find_first(dom, 'devices')
nic_dom = next(iter(dom_devices))
nicXml = xmlutils.tostring(nic_dom)
nicXml = hooks.before_nic_hotplug(
nicXml, self._custom, params=nic.custom
)
nic._deviceXML = nicXml
# TODO: this is debug information. For 3.6.x we still need to
# see the XML even with 'info' as default level.
self.log.info("Hotplug NIC xml: %s", nicXml)

try:
nic.setup()
self._dom.attachDevice(nicXml)
except libvirt.libvirtError as e:
self.log.exception("Hotplug failed")
nicXml = hooks.after_nic_hotplug_fail(
nicXml, self._custom, params=nic.custom)
if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
raise exception.NoSuchVM()
return response.error('hotplugNic', str(e))
else:
# FIXME! We may have a problem here if vdsm dies right after
# we sent command to libvirt and before save conf. In this case
# we will gather almost all needed info about this NIC from
# the libvirt during recovery process.
device_conf = self._devices[hwclass.NIC]
device_conf.append(nic)
self._hotplug_device_metadata(hwclass.NIC, nic)
self._updateDomainDescriptor()
vmdevices.network.Interface.update_device_info(self, device_conf)
hooks.after_nic_hotplug(nicXml, self._custom,
params=nic.custom)

mirroredNetworks = []
try:
# pylint: disable=no-member
for network in nic.portMirroring:
supervdsm.getProxy().setPortMirroring(network, nic.name)
mirroredNetworks.append(network)
# The better way would be catch the proper exception.
# One of such exceptions is TrafficControlException, but
# I am not sure that we'll get it for all traffic control errors.
# In any case we need below rollback for all kind of failures.
except Exception as e:
self.log.exception("setPortMirroring for network %s failed",
network)
nic_element = xmlutils.fromstring(nicXml)
vmxml.replace_first_child(dom_devices, nic_element)
hotunplug_params = {'xml': xmlutils.tostring(dom)}
self.hotunplugNic(hotunplug_params,
port_mirroring=mirroredNetworks)
return response.error('hotplugNic', str(e))

device_info = {'devices': [{'macAddr': nic.macAddr,
'alias': nic.alias,
}],
'xml': self._domain.xml,
}
return {'status': doneCode, 'vmList': device_info}

dev_from_xml 在 lib/vdsm/virt/vmdevices/network.py 中有对应的实现 from_xml_tree。
attachDevice 最终是调用的 libvirt,官网中有对应的方法 https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainAttachDevice
attachDevice 的调用链:vm.self._dom,而 self._dom 是 在 vm._run 中的

1
2
3
4
5
6
7
8
9
10
11
12
13
def _run(self):
self.log.info("VM wrapper has started")
if not self.recovering and \
self._altered_state.origin != _MIGRATION_ORIGIN:
self._remove_domain_artifacts()

if not self.recovering and not self._altered_state.origin:
# We need to define the domain in order to save device metadata in
# _make_devices(). It'll get redefined with the final version
# later.
domxml = libvirtxml.make_placeholder_domain_xml(self)
dom = self._connection.defineXML(domxml)
self._dom = virdomain.Defined(self.id, dom)

VM Qos(cpu limits和 IO limits)

vdsm 更新 cpu limits 和 IO limits(磁盘 iops) 是通过调用 libvirt 去更新 vm 的 xml 实现的,更新 vm 的方法有多个。其中一个是 updateVmPolicy,在修改已有的 qos 时,会调用这个方法。
还有一个是 hotplugDisk,这是在附加磁盘时会调用,也是去更新 vm 的 xml。
在网页上设置了 cpu qos 和 磁盘 qos以后,需要附加到虚拟机上,才会调用 updateVmPolicy 方法,使得 qos 生效。没附加到虚拟机之前,这些修改不会在 vdsm 的日志中体现。
updateVmPolicy:

  • vm 探活时执行:探活过程中有判断,判断数据有变化(可能是根据时间戳对比)时调用,探活频率是5秒一次。页面上点击更新是直接触发。
  • 手动移除 qos 时会执行,移除 storage 和 cpu 的 qos 时会触发。
    hotplugDisk:
  • 目前已知在附加磁盘时,会调用。还有一个 AddDiskComand,猜测是在新建虚拟机时点击新建磁盘,最后也会调用到这里。
    代码分析
    针对虚拟机的资源限制,入口函数为 lib/vdsm/API.py 中的 updateVmPolicy。然后调用了 lib/vdsm/virt/vm.py 中的 updateVmPolicy 函数。
    updateVmPolicy 函数如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    @api.guard(_not_migrating)
    def updateVmPolicy(self, params):
    """
    Update the QoS policy settings for VMs.

    The params argument contains the actual properties we are about to
    set. It must not be empty.

    Supported properties are:

    vcpuLimit - the CPU usage hard limit
    ioTune - the IO limits

    In the case not all properties are provided, the missing properties'
    setting will be left intact.

    If there is an error during the processing, this function
    immediately stops and returns. Remaining properties are not
    processed.

    :param params: dictionary mapping property name to its value
    :type params: dict[str] -> anything

    :return: standard vdsm result structure
    """
    if not params:
    self.log.error("updateVmPolicy got an empty policy.")
    return response.error('MissParam',
    'updateVmPolicy got an empty policy.')

    #
    # Get the current QoS block
    metadata_modified = False
    qos = self._getVmPolicy()
    if qos is None:
    return response.error('updateVmPolicyErr')

    #
    # Process provided properties, remove property after it is processed

    if 'vcpuLimit' in params:
    # Remove old value
    vcpuLimit = vmxml.find_first(qos, "vcpuLimit", None)
    if vcpuLimit is not None:
    vmxml.remove_child(qos, vcpuLimit)

    vcpuLimit = vmxml.Element("vcpuLimit")
    vcpuLimit.appendTextNode(str(params["vcpuLimit"]))
    vmxml.append_child(qos, vcpuLimit)

    metadata_modified = True
    self._vcpuLimit = params.pop('vcpuLimit')

    if 'ioTune' in params:
    ioTuneParams = params["ioTune"]

    for ioTune in ioTuneParams:
    if ("path" in ioTune) or ("name" in ioTune):
    continue

    self.log.debug("IoTuneParams: %s", str(ioTune))

    try:
    # All 4 IDs are required to identify a device
    # If there is a valid reason why not all 4 are required,
    # please change the code

    disk = self.findDriveByUUIDs({
    'domainID': ioTune["domainID"],
    'poolID': ioTune["poolID"],
    'imageID': ioTune["imageID"],
    'volumeID': ioTune["volumeID"]})

    self.log.debug("Device path: %s", disk.path)
    ioTune["name"] = disk.name
    ioTune["path"] = disk.path

    except LookupError as e:
    return response.error('updateVmPolicyErr', str(e))

    if ioTuneParams:
    io_tunes = []

    io_tune_element = vmxml.find_first(qos, "ioTune", None)
    if io_tune_element is not None:
    io_tunes = vmtune.io_tune_dom_all_to_list(io_tune_element)
    vmxml.remove_child(qos, io_tune_element)

    vmtune.io_tune_update_list(io_tunes, ioTuneParams)

    vmxml.append_child(qos, vmtune.io_tune_list_to_dom(io_tunes))

    metadata_modified = True

    self._ioTuneInfo = io_tunes

    del params['ioTune']

    # Check remaining fields in params and report the list of unsupported
    # params to the log

    if params:
    self.log.warning("updateVmPolicy got unknown parameters: %s",
    ", ".join(six.iterkeys(params)))

    #
    # Save modified metadata

    if metadata_modified:
    metadata_xml = xmlutils.tostring(qos)

    try:
    self._dom.setMetadata(libvirt.VIR_DOMAIN_METADATA_ELEMENT,
    metadata_xml,
    xmlconstants.METADATA_VM_TUNE_PREFIX,
    xmlconstants.METADATA_VM_TUNE_URI)
    except libvirt.libvirtError as e:
    self.log.exception("updateVmPolicy failed")
    if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
    raise exception.NoSuchVM()
    else:
    return response.error('updateVmPolicyErr', str(e))

    return {'status': doneCode}
    最终 vdsm 是调用了 libvirt 的 setMetadata 方法去更新 vm 的信息,有关 vm 的 xml 定义可以参考:
    https://libvirt.org/formatdomain.html