neutron-server是目前接触的比较多的一个项目,抽空分析一下它是如何启动的。

服务启动方式:service neutron-server start
服务脚本路劲:/usr/lib/systemd/system/neutron-server.service
服务脚本neutron-server.service内容:

[Unit]
Description=OpenStack Neutron Server
After=syslog.target network.target

[Service]
Type=notify
User=neutron
ExecStart=/usr/bin/neutron-server --config-file /usr/share/neutron/neutron-dist.conf --config-dir /usr/share/neutron/server --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugin.ini --config-dir /etc/neutron/conf.d/common --config-dir /etc/neutron/conf.d/neutron-server --log-file /var/log/neutron/server.log
PrivateTmp=true
NotifyAccess=all
KillMode=process
Restart=on-failure
TimeoutStartSec=0

[Install]
WantedBy=multi-user.target

ExecStart就是启动该服务时执行的命令:

/usr/bin/neutron-server --config-file /usr/share/neutron/neutron-dist.conf --config-dir /usr/share/neutron/server --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugin.ini --config-dir /etc/neutron/conf.d/common --config-dir /etc/neutron/conf.d/neutron-server --log-file /var/log/neutron/server.log

/usr/bin/neutron-server中的内容为:

#!/usr/bin/python2
# PBR Generated from u'console_scripts'

import sys

from neutron.cmd.eventlet.server import main


if __name__ == "__main__":
    sys.exit(main())

来查看一下调用的main方法:

def main():
    server.boot_server(_main_neutron_server)

boot_server方法中主要是解析了传入的命令行参数和配置文件以及配置了程序的日志打印。在完成上述的操作后执行了传入的_main_neutron_server方法:
(解析命令行参数和配置文件的过程可以参照博客中另外一篇博文:http://910216.com/archives/oslo_config_code.html

def _main_neutron_server():
    if cfg.CONF.web_framework == 'legacy':
        wsgi_eventlet.eventlet_wsgi_server()
    else:
        wsgi_pecan.pecan_wsgi_server()

根据前面解析的选项web_framework的值决定启用哪一种类型的wsgi server。这里讲述legacy对应的wsgi server,至于pecan框架的wsgi server暂不讨论。

WSGI是一个web服务的接口规范,在WSGI规范下,web组件被分成三类:application, server, middleware。
WSGI的server端所做的工作仅仅是接收请求,传给application(做处理),然后将结果返回。

在eventlet_wsgi_server方法中:

def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    start_api_and_rpc_workers(neutron_api)

第一行代码加载了用来处理neutron api的wsgi应用,在serve_wsgi方法中会调用NeutronApiService的create方法和start方法:

    @classmethod
    def create(cls, app_name='neutron'):
        # Setup logging early
        config.setup_logging()
        service = cls(app_name)
        return service

    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)

create方法创建了一个NeutronApiService(自身)对象,这里面主要是指定了一个app_name为neutron。
start方法用来启动wsgi应用:

def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error(_LE('No known API applications configured.'))
        return
    return run_wsgi_app(app)

_run_wsgi方法也可以分为两个部分来看,一部分是配置部署wsgi app,一部分是启动wsgi app。
先来看部署的方法:

    def load_paste_app(app_name):
        """Builds and returns a WSGI app from a paste config file.
    
        :param app_name: Name of the application to load
        """
        loader = wsgi.Loader(cfg.CONF)
        app = loader.load_app(app_name)
        return app
    
    
    def load_app(self, name):
        """Return the paste URLMap wrapped WSGI application.

        :param name: Name of the application to load.
        :returns: Paste URLMap object wrapping the requested application.
        :raises: PasteAppNotFound

        """
        try:
            LOG.debug("Loading app %(name)s from %(path)s",
                      {'name': name, 'path': self.config_path})
            return deploy.loadapp("config:%s" % self.config_path, name=name)
        except LookupError:
            LOG.exception(_LE("Couldn't lookup app: %s"), name)
            raise PasteAppNotFound(name=name, path=self.config_path)

load_paste_app中创建了一个loader对象,通过这个对象的load_app方法真正实现部署。load_app方法调用了paste.deploy工具的loadapp方法,该方法需要指定一个用来部署wsgi app的配置文件,这里指定的是/etc/neutron.conf中api_paste_config指定的值或是从neutron配置目录中查找文件名为api-paste.ini的文件。一般该文件是保存在/usr/share/neutron/api-paste.ini。
该配置文件的内容如下所示:

[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions_composite
/v2.0: neutronapi_v2_0

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors http_proxy_to_wsgi request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[composite:neutronversions_composite]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi neutronversions
keystone = cors http_proxy_to_wsgi neutronversions

[filter:request_id]
paste.filter_factory = oslo_middleware:RequestId.factory

[filter:catch_errors]
paste.filter_factory = oslo_middleware:CatchErrors.factory

[filter:cors]
paste.filter_factory = oslo_middleware.cors:filter_factory
oslo_config_project = neutron

[filter:http_proxy_to_wsgi]
paste.filter_factory = oslo_middleware.http_proxy_to_wsgi:HTTPProxyToWSGI.factory

[filter:keystonecontext]
paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

[filter:osprofiler]
paste.filter_factory = osprofiler.web:WsgiMiddleware.factory

在这个配置文件中配置了一个组合应用:neutron composite(一组应用的组合)
先看一下neutron composite的内容:

[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions_composite
/v2.0: neutronapi_v2_0

use = egg:Paste#urlmap表明使用了paste包中的urlmap这个中间件,这个中间件的功能就是把根据不同URL前缀将请求映射给不同的WSGI app,所以可以得出以下结论:

  • 把对/的访问交给neutronversions_composite定义的一组应用去处理
  • 把对/v2.0的访问交给neutronapi_v2_0定义的一组应用去处理

use可以使用以下几种形式:

  • egg: 使用一个URI指定的egg包中的对象
  • call: 使用某个模块中的可调用对象
  • config: 使用另外一个配置文件

use指定的对象调用后最终返回的应该是一个WSGI application。

接下来就只分析上述两组应用中的第二组应用:neutronapi_v2_0,另外一个类似的原理。
在neutronapi_v2_0组合应用中:

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors http_proxy_to_wsgi request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

use指定了一个可调用对象:neutron.auth:pipeline_factory,后面指定了两种授权策略:noauth和keystone,每个策略都是一组filter的组合。
先来看一下neutron.auth:pipeline_factory这个可调用对象的代码实现:

def pipeline_factory(loader, global_conf, **local_conf):
    """Create a paste pipeline based on the 'auth_strategy' config option."""
    pipeline = local_conf[cfg.CONF.auth_strategy]
    pipeline = pipeline.split()
    filters = [loader.get_filter(n) for n in pipeline[:-1]]
    app = loader.get_app(pipeline[-1])
    filters.reverse()
    for filter in filters:
        app = filter(app)
    return app

这个方法中的参数local_conf的内容就是neutronapi_v2_0所有的配置,包括讲到的两种授权策略noauth和keystone。neutron使用哪一种授权策略由配置文件中的配置项auth_strategy决定。
根据pipeline = local_conf[cfg.CONF.auth_strategy]我们可以知道noauth和keystone指定的是一个pipeline,pipeline是一个filters的链表,以一个application结束,这里是neutronapiapp_v2_0代表的app。
在上面的方法中,通过调用loader.get_filter从对应pipeline中依次调用filter的factory方法,并将结果存到列表filters中,然后通过调用loader.get_app调用app的factory方法创建application对象。

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

对应到代码:

@classmethod
    def factory(cls, global_config, **local_config):
        return cls(**local_config)

就是创建了一个APIRouter的对象。

最后按照反序遍历filters,调用自身(可能是函数,可能是callable的对像),并将app作为参数传入,将完成构造的app返回。

再回到开始的_run_wsgi方法:

def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)
    if not app:
        LOG.error(_LE('No known API applications configured.'))
        return
    return run_wsgi_app(app)

部署app完成以后,load_paste_app方法返回了该app对象,后面通过调用run_wsgi_app传入构造好的wsgi app:

def run_wsgi_app(app):
    server = wsgi.Server("Neutron")
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=_get_api_workers())
    LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"),
             {'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
    return server

在这个方法中创建了一个wsgi server对象,并调用了该对象的start方法,最终进入到_launch方法中:

    def start(self, application, port, host='0.0.0.0', workers=0):
        """Run a WSGI server with the given application."""
        self._host = host
        self._port = port
        backlog = CONF.backlog

        self._socket = self._get_socket(self._host,
                                        self._port,
                                        backlog=backlog)

        self._launch(application, workers)

    def _launch(self, application, workers=0):
        service = WorkerService(self, application, self.disable_ssl, workers)
        if workers < 1:
            # The API service should run in the current process.
            self._server = service
            # Dump the initial option values
            cfg.CONF.log_opt_values(LOG, logging.DEBUG)
            service.start()
            systemd.notify_once()
        else:
            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            api.context_manager.dispose_pool()
            # The API service runs in a number of child processes.
            # Minimize the cost of checking for child exit by extending the
            # wait interval past the default of 0.01s.
            self._server = common_service.ProcessLauncher(cfg.CONF,
                                                          wait_interval=1.0)
            self._server.launch_service(service,
                                        workers=service.worker_process_count)

通过上面的代码可以知道,传入的app被封装成了WorkerService,并根据传入的worker的大小决定这个service的启动方式:

  • worker < 1,直接调用service的start方法,在当前进程中执行
  • worker >=1,通过ProcessLauncher的launch_service方法来启动

(ProcessLauncher的主要作用是根据workers数量来fork对应数量的子进程,再在每个子进程中启动WorkerService。)
顺便可以看一下WorkerService是如何启动的,对应的代码如下:

    def start(self):
        super(WorkerService, self).start()
        # When api worker is stopped it kills the eventlet wsgi server which
        # internally closes the wsgi server socket object. This server socket
        # object becomes not usable which leads to "Bad file descriptor"
        # errors on service restart.
        # Duplicate a socket object to keep a file descriptor usable.
        dup_sock = self._service._socket.dup()
        if CONF.use_ssl and not self._disable_ssl:
            dup_sock = sslutils.wrap(CONF, dup_sock)
        self._server = self._service.pool.spawn(self._service._run,
                                                self._application,
                                                dup_sock)

可以看到是用了eventlet的greenpool来spawn一个greenthread来调用server对象的_run方法运行app

    def _run(self, application, socket):
        """Start a WSGI server in a new green thread."""
        eventlet.wsgi.server(socket, application,
                             max_size=self.num_threads,
                             log=LOG,
                             keepalive=CONF.wsgi_keep_alive,
                             socket_timeout=self.client_socket_timeout)

实际是调用了eventlet.wsgi.server方法。
到这里,api服务的启动就分析结束。

然后回到eventlet_wsgi_server方法:

def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    start_api_and_rpc_workers(neutron_api)

部署完neutron api的wsgi web server后,后面开始启动RPC。

执行start_api_and_rpc_workers方法并传入了上一步创建的WsgiService对象:neutron_api:

def start_api_and_rpc_workers(neutron_api):
    try:
        worker_launcher = service.start_all_workers()

        pool = eventlet.GreenPool()
        api_thread = pool.spawn(neutron_api.wait)
        plugin_workers_thread = pool.spawn(worker_launcher.wait)

        # api and other workers should die together. When one dies,
        # kill the other.
        api_thread.link(lambda gt: plugin_workers_thread.kill())
        plugin_workers_thread.link(lambda gt: api_thread.kill())

        pool.waitall()
    except NotImplementedError:
        LOG.info(_LI("RPC was already started in parent process by "
                     "plugin."))

        neutron_api.wait()

在这个方法中先调用了start_all_workers获取了worker_launcher对象,看一下start_all_workers方法:

def start_all_workers():
    workers = _get_rpc_workers() + _get_plugins_workers()
    return _start_workers(workers)

看方法名是先获取了rpc worker和plugins worker,然后启动了这些worker。

def _get_rpc_workers():
    plugin = directory.get_plugin()
    service_plugins = directory.get_plugins().values()

    if cfg.CONF.rpc_workers < 1:
        cfg.CONF.set_override('rpc_workers', 1)

    # If 0 < rpc_workers then start_rpc_listeners would be called in a
    # subprocess and we cannot simply catch the NotImplementedError.  It is
    # simpler to check this up front by testing whether the plugin supports
    # multiple RPC workers.
    if not plugin.rpc_workers_supported():
        LOG.debug("Active plugin doesn't implement start_rpc_listeners")
        if 0 < cfg.CONF.rpc_workers:
            LOG.error(_LE("'rpc_workers = %d' ignored because "
                          "start_rpc_listeners is not implemented."),
                      cfg.CONF.rpc_workers)
        raise NotImplementedError()

    # passing service plugins only, because core plugin is among them
    rpc_workers = [RpcWorker(service_plugins,
                             worker_process_count=cfg.CONF.rpc_workers)]

    if (cfg.CONF.rpc_state_report_workers > 0 and
            plugin.rpc_state_report_workers_supported()):
        rpc_workers.append(
            RpcReportsWorker(
                [plugin],
                worker_process_count=cfg.CONF.rpc_state_report_workers
            )
        )
    return rpc_workers

获取rpc worker的方法中,plugin = directory.get_plugin()是获取的core plugin,然后检查了配置中指定的rpc_worker,最小必须为1。之后检查了core plugin中是否实现了start_rpc_listeners方法,对core plugin来讲,这个方法是必须实现的。这个方法中关键的就是RpcWorker这个类的实例化。这个类和上文中的WorkerService一样继承子neutron_worker.NeutronWorker类,它的启动一样是通过start方法:

    def start(self):
        super(RpcWorker, self).start()
        for plugin in self._plugins:
            if hasattr(plugin, self.start_listeners_method):
                try:
                    servers = getattr(plugin, self.start_listeners_method)()
                except NotImplementedError:
                    continue
                self._servers.extend(servers)

在这里是遍历了所有的plugin(包括core plugin),调用了它们的start_listeners_method方法,如果没有则跳过。start_listeners_method通过方法名,我们也能知道这是插件用来监听消费RPC的实现。具体的实现原理等以后的文章再分析。
这个方法中最后还判断了一下配置中是否配置了rpc_state_report_workers,如果有并且core plugin支持的话,在返回对象rpc_worksers中追加了RpcReportsWorker的对象,这个RpcReportsWorker实际上就是继承自RpcWorker,只不过启动中调用plugin的方法名不一样,变成了start_rpc_state_reports_listener方法。

_get_rpc_workers方法看完了,看_get_plugins_workers方法:

def _get_plugins_workers():
    # NOTE(twilson) get_plugins also returns the core plugin
    plugins = directory.get_unique_plugins()

    # TODO(twilson) Instead of defaulting here, come up with a good way to
    # share a common get_workers default between NeutronPluginBaseV2 and
    # ServicePluginBase
    return [
        plugin_worker
        for plugin in plugins if hasattr(plugin, 'get_workers')
        for plugin_worker in plugin.get_workers()
    ]

这个方法中是检查了每一种plugin(不是每一个)中是否存在实现了get_workser方法,并将该方法返回的所有workers收集后返回。这里的get_workesr方法是插件用于实现自己的一些特殊需求的一个途径,以NeutronWorker类似的形式交由neutron创建子进程。

现在回到start_all_workers方法中

def start_all_workers():
    workers = _get_rpc_workers() + _get_plugins_workers()
    return _start_workers(workers)

两个收集workers的方法执行完了以后,下面就是启动的过程。

def _start_workers(workers):
    process_workers = [
        plugin_worker for plugin_worker in workers
        if plugin_worker.worker_process_count > 0
    ]

    try:
        if process_workers:
            worker_launcher = common_service.ProcessLauncher(
                cfg.CONF, wait_interval=1.0
            )

            # add extra process worker and spawn there all workers with
            # worker_process_count == 0
            thread_workers = [
                plugin_worker for plugin_worker in workers
                if plugin_worker.worker_process_count < 1
            ]
            if thread_workers:
                process_workers.append(
                    AllServicesNeutronWorker(thread_workers)
                )

            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            session.context_manager.dispose_pool()

            for worker in process_workers:
                worker_launcher.launch_service(worker,
                                               worker.worker_process_count)
        else:
            worker_launcher = common_service.ServiceLauncher(cfg.CONF)
            for worker in workers:
                worker_launcher.launch_service(worker)
        return worker_launcher
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log for '
                              'details.'))

先从所有的workers中筛选出了worker_process_count大于0的对象,其余的直接通过ServiceLauncher依次启动。
针对筛选出来的对象,则使用ProcessLauncher去启动。
这两者的区别主要是:

  • ServiceLauncher是将任务放到greenthread中运行
  • ProcessLauncher是将任务放到os fork出来的进程中运行。