1. 下载nova源码
从 github 下载 Victoria 版本的 Nova 源码
git clone https://github.com/openstack/nova.git --branch stable/victoria --single-branch
nova/ 文件夹下的目录
accelerator/ # Cyborg 加速器
api/ # Nova API 服务
cmd/ # 各个 Nova 服务的入口程序
compute/ # Nova Compute 服务
conductor/ # Nova Conductor 服务
conf/ # 所有的配置选项
console/ # nova-console 服务
db/ # 封装数据库操作
hacking/ # 编码规范检查
image/ # 封装镜像操作,Glance 接口抽象
keymgr/ # 密钥管理器实现
locale/ # 国际化相关文件
network/ # nova-network 服务
notifications/ # 通知相关功能
objects/ # 封装实体对象的 CURD 操作
pci/ # PCI/SR-IOV 支持
policies/ # 所有 Policy 的默认规则
privsep/ # oslo_privsep 相关
scheduler/ # Nova Scheduler 服务
servicegroup/ # 成员服务(membership),服务组
storage/ # Ceph 存储支持
tests/ # 单元测试
virt/ # 支持的 hypervisor 驱动
volume/ # 封装卷访问接口,Cinder 接口抽象
最新版的源码如下:
nova/ 文件夹下的 python 文件:
__init__.py
availability_zones.py # 区域设置的工具函数
baserpc.py # 基础 RPC 客户端/服务端实现
block_device.py # 块设备映射
cache_utils.py # oslo_cache 封装
config.py # 解析命令行参数
context.py # 贯穿 Nova 的所有请求的上下文
crypto.py # 包装标准加密数据元素
debugger.py # pydev 调试
exception.py # 基础异常类
exception_wrapper.py # 封装异常类
filters.py # 基础过滤器
i18n.py # 集成 oslo_i18n
loadables.py # 可加载类
manager.py # 基础 Manager 类
middleware.py # 更新 oslo_middleware 的默认配置选项
monkey_patch.py # eventlet 猴子补丁
policy.py # 策略引擎
profiler.py # 调用 OSProfiler
quota.py # 每个项目的资源配额
rpc.py # RPC 操作相关的工具函数
safe_utils.py # 不会导致循环导入的工具函数
service.py # 通用节点基类,用于在主机上运行的所有工作者
service_auth.py # 身份认证插件
test.py # 单元测试基础类
utils.py # 工具函数
version.py # 版本号管理
weights.py # 权重插件
wsgi.py # 管理 WSGI 应用的服务器类
setup.cfg 配置文件,[entry_points] 小节指定了 nova 各个组件入口
console_scripts =
nova-api = nova.cmd.api:main
nova-api-metadata = nova.cmd.api_metadata:main
nova-api-os-compute = nova.cmd.api_os_compute:main
nova-compute = nova.cmd.compute:main
nova-conductor = nova.cmd.conductor:main
nova-manage = nova.cmd.manage:main
nova-novncproxy = nova.cmd.novncproxy:main
nova-policy = nova.cmd.policy:main
nova-rootwrap = oslo_rootwrap.cmd:main
nova-rootwrap-daemon = oslo_rootwrap.cmd:daemon
nova-scheduler = nova.cmd.scheduler:main
nova-serialproxy = nova.cmd.serialproxy:main
nova-spicehtml5proxy = nova.cmd.spicehtml5proxy:main
nova-status = nova.cmd.status:main
wsgi_scripts =
nova-api-wsgi = nova.api.openstack.compute.wsgi:init_application
nova-metadata-wsgi = nova.api.metadata.wsgi:init_application
2. nova-api
nova-api 对外提供 RESTful API,没有对内的 RPC 。
nova/api/ 目录结构
__init__.py
auth.py # 身份认证中间件
compute_req_id.py # x-compute-request-id 中间件(oslo_middleware)
metadata/ # Metadata API
openstack/ # Nova v2.1 API
validation/ # 请求体验证
wsgi.py # WSGI 原语(请求、应用、中间件、路由、加载器)
openstack 目录中包含 WSGI 基础架构的代码,一些 WSGI 中间件,以及如何解析请求与分发请求的核心代码。
nova/api/openstack/compute/ 包含 Controller 实现,Resource 对象将 API 映射到相应的 Controller 方法上。
__init__.py
api_version_request.py # 版本验证
auth.py # noauth 中间件
common.py # 信息查询的工具函数
compute/ # 每个 API 的入口点
identity.py # 验证项目是否存在
requestlog.py # 请求日志中间件
urlmap.py # url 映射
versioned_method.py # 版本信息
wsgi.py # WSGI 相关抽象类
wsgi_app.py # WSGI 应用程序初始化方法
API 请求路由
nova-api 读取 etc/nova/api-paste.ini
并加载 WSGI 程序,最终 API 入口点都位于 nova.api.openstack.compute
中
[composite:osapi_compute]
use = call:nova.api.openstack.urlmap:urlmap_factory
/: oscomputeversions # version API
/v2: oscomputeversion_legacy_v2 # v2 API
/v2.1: oscomputeversion_v2 # v2.1 API
# v21 is an exactly feature match for v2, except it has more stringent
# input validation on the wsgi surface (prevents fuzzing early on the
# API). It also provides new features via API microversions which are
# opt into for clients. Unaware clients will receive the same frozen
# v2 API feature set, but with some relaxed validation
/v2/+: openstack_compute_api_v21_legacy_v2_compatible
/v2.1/+: openstack_compute_api_v21
[composite:openstack_compute_api_v21]
use = call:nova.api.auth:pipeline_factory_v21 # 加载中间件
keystone = cors http_proxy_to_wsgi compute_req_id faultwrap request_log sizelimit osprofiler bees_profiler authtoken keystonecontext osapi_compute_app_v21
# DEPRECATED: The [api]auth_strategy conf option is deprecated and will be
# removed in a subsequent release, whereupon this pipeline will be unreachable.
noauth2 = cors http_proxy_to_wsgi compute_req_id faultwrap request_log sizelimit osprofiler bees_profiler noauth2 osapi_compute_app_v21
[app:osapi_compute_app_v21]
paste.app_factory = nova.api.openstack.compute:APIRouterV21.factory # 入口
nova/api/openstack/compute/routes.py
中的 APIRouterV21 主要用来完成路由规则的创建,其中 ROUTE_LIST 保存了 URL 与 Controller 之间的映射关系。
APIRouterV21 基于 ROUTE_LIST,使用 Routes 模块作为 URL 映射的工具,将各个模块所实现的 API 对应的 URL 注册到 mapper 中,并把每个资源都封装成 nova.api.openstack.wsgi.Resource
对象,当解析 URL 请求时,可以通过 URL 映射找到 API 对应的 Resource 对象。
# Router 类对 WSGI routes 模块进行了简单的封装
class APIRouterV21(base_wsgi.Router):
"""Routes requests on the OpenStack API to the appropriate controller
and method. The URL mapping based on the plain list `ROUTE_LIST` is built
at here.
"""
def __init__(self, custom_routes=None):
""":param custom_routes: the additional routes can be added by this
parameter. This parameter is used to test on some fake routes
primarily.
"""
super(APIRouterV21, self).__init__(nova.api.openstack.ProjectMapper())
if custom_routes is None:
custom_routes = tuple()
for path, methods in ROUTE_LIST + custom_routes:
# NOTE(alex_xu): The variable 'methods' is a dict in normal, since
# the dict includes all the methods supported in the path. But
# if the variable 'method' is a string, it means a redirection.
# For example, the request to the '' will be redirect to the '/' in
# the Nova API. To indicate that, using the target path instead of
# a dict. The route entry just writes as "('', '/)".
if isinstance(methods, six.string_types):
self.map.redirect(path, methods)
continue
for method, controller_info in methods.items():
# TODO(alex_xu): In the end, I want to create single controller
# instance instead of create controller instance for each
# route.
controller = controller_info[0]()
action = controller_info[1]
self.map.create_route(path, method, controller, action)
@classmethod
def factory(cls, global_config, **local_config):
"""Simple paste factory, :class:`nova.wsgi.Router` doesn't have one."""
return cls()
nova/api/wsgi.py
解析 URL 映射,通过 _dispatch
回调,调用 Resource 对象的 call 方法,最终通过请求调用 API 对应的模块中的方法。
# 路由
class Router(object):
"""WSGI middleware that maps incoming requests to WSGI apps."""
def __init__(self, mapper):
"""Create a router for the given routes.Mapper.
Each route in `mapper` must specify a 'controller', which is a
WSGI app to call. You'll probably want to specify an 'action' as
well and have your controller be an object that can route
the request to the action-specific method.
Examples:
mapper = routes.Mapper()
sc = ServerController()
# Explicit mapping of one route to a controller+action
mapper.connect(None, '/svrlist', controller=sc, action='list')
# Actions are all implicitly defined
mapper.resource('server', 'servers', controller=sc)
# Pointing to an arbitrary WSGI app. You can specify the
# {path_info:.*} parameter so the target app can be handed just that
# section of the URL.
mapper.connect(None, '/v1.0/{path_info:.*}', controller=BlogApp())
"""
self.map = mapper
# 使用 routes 模块关联 mapper 和 _dispatch
# routes.middleware.RoutesMiddleware 设置 environ 信息
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
self.map)
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, req):
"""Route the incoming request to a controller based on self.map.
If no match, return a 404.
"""
# 根据 mapper 将请求路由到 WSGI 应用(资源)
# 每个资源会在 __call__ 方法中根据 HTTP 请求的 URL 路由到对应 Controller 上的方法(Action)
return self._router
@staticmethod
@webob.dec.wsgify(RequestClass=Request)
def _dispatch(req):
"""Dispatch the request to the appropriate controller.
Called by self._router after matching the incoming request to a route
and putting the information into req.environ. Either returns 404
or the routed WSGI app's response.
"""
# 根据 HTTP 请求的 environ 信息找到 URL 对应的 Controller
match = req.environ['wsgiorg.routing_args'][1]
if not match:
return webob.exc.HTTPNotFound()
app = match['controller']
return app
API 实现
nova/api/openstack/compute/
目录包含每个 API 对应的 Controller 实现,Resource 对象将请求的 API 映射到相应的 Controller 方法上。
以 keypairs.py
(密钥对管理扩展)为例,公共方法包含 create、delete、show、index
,多个实现对应不同的 Microversion(使用 @wsgi.Controller.api_version
装饰器)
@wsgi.expected_errors:API 允许的错误返回码
@validation.query_schema:请求对应的 json schema
@wsgi.response:API 请求正常返回码
@wsgi.action:注册 action
Microversion 用于实现兼容性。nova/api/openstack/compute/schemas
包含允许的 json schema
,表示接受的键值对及其类型。
通过方法接口可以得到 webob.Request 对象,从 Request 对象中可以获取其他请求参数,用于执行对应的操作。keypairs.py
class KeypairController(wsgi.Controller):
"""Keypair API controller for the OpenStack API."""
_view_builder_class = keypairs_view.ViewBuilder
def __init__(self):
super(KeypairController, self).__init__()
self.api = compute_api.KeypairAPI()
@wsgi.Controller.api_version("2.10")
@wsgi.response(201)
@wsgi.expected_errors((400, 403, 409))
@validation.schema(keypairs.create_v210)
def create(self, req, body):
...
@wsgi.Controller.api_version("2.2", "2.9") # noqa
@wsgi.response(201)
@wsgi.expected_errors((400, 403, 409))
@validation.schema(keypairs.create_v22)
def create(self, req, body): # noqa
...
nova-conductor
使用 RPC 的子组件通常包含以下文件:
- api.py 对 RPC 接口进行封装,类似提供 SDK
- rpcapi.py 暴露给其他内部组件的 RPC 接口,RPC 客户端
- manager.py 处理 RPC API 调用
nova-compute 访问数据库的操作都要由 nova-conductor 代理,用 nova/conductor/manager.py
的 ConductorManager 类完成,出于安全性考虑,nova-conductor 和 nova-compute 不能部署在同一服务器上。
nova/objects
定义了 nova object,封装数据库 CURD 操作,每个类对应数据库中的一张表。
nova-scheduler
nova-scheduler 执行调度决策,nova-compute 收集并更新主机数据,实时写入数据库(周期任务)。
nova/scheduler/filters
包含所有的过滤器实现,用于过滤不符合条件的主机;nova/scheduler/weights
包含所有的权重实现,用于计算权重并排序。
3. 启动流程
nova-api 启动入口 nova.cmd.api:main
def main():
config.parse_args(sys.argv) # 解析参数
logging.setup(CONF, "nova") # 设置日志
objects.register_all() # 注册 nova object
gmr_opts.set_defaults(CONF) # 设置 oslo_reports
if 'osapi_compute' in CONF.enabled_apis:
# NOTE(mriedem): This is needed for caching the nova-compute service
# version.
objects.Service.enable_min_version_cache()
log = logging.getLogger(__name__)
# 生成报告的机制 Guru Meditation Report (GMR)
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
# oslo_service.ProcessLauncher
launcher = service.process_launcher()
started = 0
# 根据 paste-ini 文件创建 WSGI 应用
for api in CONF.enabled_apis:
should_use_ssl = api in CONF.enabled_ssl_apis
try:
# nova.service.WSGIService 初始化 WSGI 程序
server = service.WSGIService(api, use_ssl=should_use_ssl)
# oslo_service.ProcessLauncher 创建子进程启动服务
launcher.launch_service(server, workers=server.workers or 1)
started += 1
except exception.PasteAppNotFound as ex:
log.warning("%s. ``enabled_apis`` includes bad values. "
"Fix to remove this warning.", ex)
if started == 0:
log.error('No APIs were started. '
'Check the enabled_apis config option.')
sys.exit(1)
# 等待子进程终止
launcher.wait()
nova.service.WSGIService
的初始化函数实例化 nova.wsgi.Server
,启动函数实际调用了 nova.wsgi.Server
的 start 方法。
其中的 self._socket 使用 eventlet.listen
创建,最后使用 utils 中封装的 spawn 函数启动 WSGI 程序
class Server(service.ServiceBase):
"""Server class to manage a WSGI server, serving a WSGI application."""
...
def start(self):
"""Start serving a WSGI application.
:returns: None
"""
# The server socket object will be closed after server exits,
# but the underlying file descriptor will remain open, and will
# give bad file descriptor error. So duplicating the socket object,
# to keep file descriptor usable.
dup_socket = self._socket.dup()
dup_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
dup_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)
...
self._server = utils.spawn(**wsgi_kwargs)
nova-conductor 启动入口 nova.cmd.conductor:main
def main():
config.parse_args(sys.argv)
logging.setup(CONF, "nova")
objects.register_all()
gmr_opts.set_defaults(CONF)
objects.Service.enable_min_version_cache()
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
# nova.service.Service 实例化 Service 对象
server = service.Service.create(binary='nova-conductor',
topic=rpcapi.RPC_TOPIC)
workers = CONF.conductor.workers or processutils.get_worker_count()
# oslo_service.launch 创建 launcher
service.serve(server, workers=workers)
# 调用 launcher.wait 等待子进程终止
service.wait()
nova.service.Service 初始化函数接受 manager 对象,通过监听消息队列启用 RPC 服务;设置定期任务报告状态,并写入数据库。
- nova-compute
- nova-conductor
- nova-scheduler
RPC 服务启动时创建 rpc_client 用于发送消息,创建 rpc_server 用于接收消息,分派执行。
1. rpc_client
nova/cmd/conductor.py
实际创建 Service 实例
server = service.Service.create(binary='nova-conductor',
topic=rpcapi.RPC_TOPIC)
nova/service.py
初始化函数
# 创建 _driver
self.servicegroup_api = servicegroup.API()
# 动态导入 manager 类
manager_class = importutils.import_class(self.manager_class_name)
if objects_base.NovaObject.indirection_api:
# 创建 RPCClient
conductor_api = conductor.API()
# 等待 nova-conductor 启动
conductor_api.wait_until_ready(context.get_admin_context())
nova/servicegroup/api.py
创建 _driver
driver_class = _driver_name_class_mapping[CONF.servicegroup_driver]
self._driver = importutils.import_object(driver_class,
*args, **kwargs)
nova/conductor/rpcapi.py
设置 rpc_client
def __init__(self):
super(ConductorAPI, self).__init__()
target = messaging.Target(topic=RPC_TOPIC, version='3.0')
version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.conductor,
CONF.upgrade_levels.conductor)
serializer = objects_base.NovaObjectSerializer()
# rpc client
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
nova/baserpc.py
设置 rpc_client
def __init__(self, topic):
super(BaseAPI, self).__init__()
target = messaging.Target(topic=topic,
namespace=_NAMESPACE,
version='1.0')
version_cap = self.VERSION_ALIASES.get(CONF.upgrade_levels.baseapi,
CONF.upgrade_levels.baseapi)
self.client = rpc.get_client(target, version_cap=version_cap)
2. rpc_server
nova/cmd/conductor.py
使用 Service 实例启动服务
# oslo_service.launch 创建 launcher
service.serve(server, workers=workers)
# 调用 launcher.wait 等待子进程终止
service.wait()
nova/service.py
实际调用 oslo_service 的 launch 函数,创建绿色线程(greenthread)或进程,最终调用 Service 实例的 start 方法
def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = service.launch(CONF, server, workers=workers,
restart_method='mutate')
nova/service.py
Service 实例的 start 方法创建 rpc_server 和 dispatcher;设置周期任务
# 创建 rpc server 以及 dispatcher
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
...
if self.periodic_enable:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
self.tg.add_dynamic_timer(self.periodic_tasks,
initial_delay=initial_delay,
periodic_interval_max=
self.periodic_interval_max)
收到消息后主要由 oslo_messaging 进行解析和处理,核心是 oslo_messaging/rpc/dispatcher.py
incoming 是 AMQP 消息格式
def dispatch(self, incoming):
"""Dispatch an RPC message to the appropriate endpoint method.
:param incoming: incoming message
:type incoming: IncomingMessage
:raises: NoSuchMethod, UnsupportedVersion
"""
message = incoming.message
ctxt = incoming.ctxt
method = message.get('method')
args = message.get('args', {})
namespace = message.get('namespace')
version = message.get('version', '1.0')
# NOTE(danms): This event and watchdog thread are used to send
# call-monitoring heartbeats for this message while the call
# is executing if it runs for some time. The thread will wait
# for the event to be signaled, which we do explicitly below
# after dispatching the method call.
completion_event = eventletutils.Event()
watchdog_thread = threading.Thread(target=self._watchdog,
args=(completion_event, incoming))
if incoming.client_timeout:
# NOTE(danms): The client provided a timeout, so we start
# the watchdog thread. If the client is old or didn't send
# a timeout, we just never start the watchdog thread.
watchdog_thread.start()
found_compatible = False
for endpoint in self.endpoints:
target = getattr(endpoint, 'target', None)
if not target:
target = self._default_target
if not (self._is_namespace(target, namespace) and
self._is_compatible(target, version)):
continue
if hasattr(endpoint, method):
if self.access_policy.is_allowed(endpoint, method):
try:
# 分派,调用函数
return self._do_dispatch(endpoint, method, ctxt, args)
finally:
completion_event.set()
if incoming.client_timeout:
watchdog_thread.join()
found_compatible = True
if found_compatible:
raise NoSuchMethod(method)
else:
raise UnsupportedVersion(version, method=method)
oslo_messaging/rpc/dispatcher.py
调用函数
def _do_dispatch(self, endpoint, method, ctxt, args):
ctxt = self.serializer.deserialize_context(ctxt)
new_args = dict()
for argname, arg in args.items():
new_args[argname] = self.serializer.deserialize_entity(ctxt, arg)
func = getattr(endpoint, method)
result = func(ctxt, **new_args)
return self.serializer.serialize_entity(ctxt, result)
发送消息的实现都在 nova/conductor/rpcapi.py
中,cctxt.call 同步调用,cctxt.cast 异步调用
def object_class_action_versions(self, context, objname, objmethod,
object_versions, args, kwargs):
cctxt = self.client.prepare()
return cctxt.call(context, 'object_class_action_versions',
objname=objname, objmethod=objmethod,
object_versions=object_versions,
args=args, kwargs=kwargs)
def cache_images(self, ctxt, aggregate, image_ids):
version = '1.21'
if not self.client.can_send_version(version):
raise exception.NovaException('Conductor RPC version pin does not '
'allow cache_images() to be called')
cctxt = self.client.prepare(version=version)
cctxt.cast(ctxt, 'cache_images', aggregate=aggregate,
image_ids=image_ids)
由 oslo_messaging/rpc/client.py
实现
def cast(self, ctxt, method, **kwargs):
"""Invoke a method and return immediately. See RPCClient.cast()."""
msg = self._make_message(ctxt, method, kwargs)
msg_ctxt = self.serializer.serialize_context(ctxt)
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, msg_ctxt, msg,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
if self.target.fanout:
raise exceptions.InvalidTarget('A call cannot be used with fanout',
self.target)
msg = self._make_message(ctxt, method, kwargs)
msg_ctxt = self.serializer.serialize_context(ctxt)
timeout = self.timeout
if self.timeout is None:
timeout = self.conf.rpc_response_timeout
cm_timeout = self.call_monitor_timeout
self._check_version_cap(msg.get('version'))
try:
result = \
self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
call_monitor_timeout=cm_timeout,
retry=self.retry,
transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
return self.serializer.deserialize_entity(ctxt, result)
关于周期任务,nova/scheduler/manager.py
中使用 @periodic_task.periodic_task
装饰的方法将会被周期调用,从 scheduler 的调试日志可以看到周期任务的运行
********************************************************************* log_opt_values /home/jck/.local/lib/python3.6/site-packages/oslo_config/cfg.py:2591
2021-05-18 05:53:17.030 3501 DEBUG oslo_service.periodic_task [req-66b43add-49c7-4f33-8f6b-1e33cb9f0123 - - - - -] Running periodic task SchedulerManager._run_periodic_tasks run_periodic_tasks /home/jck/.local/lib/python3.6/site-packages/oslo_service/periodic_task.py:211
2021-05-18 05:53:39.072 3500 DEBUG oslo_service.periodic_task [req-8436b3e2-96d1-4f15-8ae8-b596cee05536 - - - - -] Running periodic task SchedulerManager._run_periodic_tasks run_periodic_tasks /home/jck/.local/lib/python3.6/site-packages/oslo_service/periodic_task.py:211
...
对应于 nova/scheduler/manager.py
文章来源:https://www.toymoban.com/news/detail-479810.html
@periodic_task.periodic_task(spacing=CONF.scheduler.periodic_task_interval,
run_immediately=True)
def _run_periodic_tasks(self, context):
self.driver.run_periodic_tasks(context)
执行周期任务的有 nova-scheduler 和 nova-compute ,主要功能是计算节点 nova-compute 上报资源信息,nova-scheduler 读取数据库,更新资源信息缓存。文章来源地址https://www.toymoban.com/news/detail-479810.html
到了这里,关于openstack nova 源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!