poller - meetbill/shinken GitHub Wiki
启动日志
INFO: 2020-05-03 14:23:35,085: 140345027340000 [Shinken] [file]:/home/shinken/satellite.py [line]:1113 [func]:main Shinken 2.4.3
INFO: 2020-05-03 14:23:35,085: 140345027340000 [Shinken] [file]:/home/shinken/satellite.py [line]:1113 [func]:main Copyright (c) 2009-2014:
INFO: 2020-05-03 14:23:35,085: 140345027340000 [Shinken] [file]:/home/shinken/satellite.py [line]:1113 [func]:main Gabes Jean ([email protected])
INFO: 2020-05-03 14:23:35,085: 140345027340000 [Shinken] [file]:/home/shinken/satellite.py [line]:1113 [func]:main Gerhard Lausser, [email protected]
INFO: 2020-05-03 14:23:35,085: 140345027340000 [Shinken] [file]:/home/shinken/satellite.py [line]:1113 [func]:main Gregory Starck, [email protected]
INFO: 2020-05-03 14:23:35,086: 140345027340000 [Shinken] [file]:/home/shinken/satellite.py [line]:1113 [func]:main Hartmut Goebel, [email protected]
INFO: 2020-05-03 14:23:35,086: 140345027340000 [Shinken] [file]:/home/shinken/satellite.py [line]:1113 [func]:main License: AGPL
INFO: 2020-05-03 14:23:35,086: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:828 [func]:change_to_user_group Trying to initialize additional groups for the daemon
WARNING: 2020-05-03 14:23:35,086: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:833 [func]:change_to_user_group Cannot call the additional groups setting with initgroups (Operation not permitted)
INFO: 2020-05-03 14:23:35,086: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:497 [func]:check_parallel_run Stale pidfile exists ([Errno 3] No such process), Reusing it.
INFO: 2020-05-03 14:23:35,086: 140345027340000 [Shinken] [file]:/home/shinken/http_daemon.py [line]:292 [func]:__init__ Opening HTTP socket at http://0.0.0.0:7771
INFO: 2020-05-03 14:23:35,086: 140345027340000 [Shinken] [file]:/home/shinken/http_daemon.py [line]:168 [func]:run Initializing a wsgiref backend with 16 threads
INFO: 2020-05-03 14:23:35,086: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:437 [func]:register_local_log Using the local log file './data/log/pollerd.log'
INFO: 2020-05-03 14:23:36,894: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:609 [func]:daemonize Printing stored debug messages prior to our daemonization
INFO: 2020-05-03 14:23:36,894: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:611 [func]:daemonize Successfully changed to workdir: /home/users/wangbin34/test/shinken-2.4.3/data
INFO: 2020-05-03 14:23:36,895: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:611 [func]:daemonize Opening pid file: /home/users/wangbin34/test/shinken-2.4.3/data/pollerd.pid
INFO: 2020-05-03 14:23:36,895: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:611 [func]:daemonize Redirecting stdout and stderr as necessary..
INFO: 2020-05-03 14:23:36,895: 140345027340000 [Shinken] [file]:/home/shinken/daemon.py [line]:611 [func]:daemonize We are now fully daemonized :) pid=8893
INFO: 2020-05-03 14:23:36,898: 1148664160 [Shinken] [file]:/home/shinken/daemon.py [line]:942 [func]:http_daemon_thread Starting HTTP daemon
INFO: 2020-05-03 14:23:36,899: 1148664160 [Shinken] [file]:/home/shinken/http_daemon.py [line]:236 [func]:run Using a 16 http pool size
INFO: 2020-05-03 14:23:36,915: 140345027340000 [Shinken] [file]:/shinken/daemon.py [line]:763 [func]:find_modules_path Modules directory: ./data/lib/modules
INFO: 2020-05-03 14:23:36,916: 140345027340000 [Shinken] [file]:/shinken/daemon.py [line]:763 [func]:find_modules_path Modules directory: ./data/lib/modules
INFO: 2020-05-03 14:23:36,916: 140345027340000 [Shinken] [file]:/shinken/daemon.py [line]:1030 [func]:wait_for_initial_conf Waiting for initial configuration
启动 poller 时如果没有启动 scheduler
# Use to wait conf from arbiter.
# It send us conf in our http_daemon. It put the have_conf prop
# if he send us something
# (it can just do a ping)
def wait_for_initial_conf(self, timeout=1.0):
logger.info("Waiting for initial configuration")
cur_timeout = timeout
# Arbiter do not already set our have_conf param
while not self.new_conf and not self.interrupted:
elapsed, _, _ = self.handleRequests(cur_timeout)
if elapsed:
cur_timeout -= elapsed
if cur_timeout > 0:
continue
cur_timeout = timeout
sys.stdout.write(".")
sys.stdout.flush()
main(shinken/satellite:main)
def main(self):
try:
for line in self.get_header():
logger.info(line)
self.load_config_file()
# Setting log level
logger.setLevel(self.log_level)
# Force the debug level if the daemon is said to start with such level
if self.debug:
logger.setLevel('DEBUG')
# Look if we are enabled or not. If ok, start the daemon mode
self.look_for_early_exit()
self.do_daemon_init_and_start()
self.do_post_daemon_init()
self.load_modules_manager()
# We wait for initial conf
self.wait_for_initial_conf()
if not self.new_conf: # we must have either big problem or was requested to shutdown
return
self.setup_new_conf()
# We can load our modules now
self.modules_manager.set_modules(self.modules_manager.modules)
self.do_load_modules()
# And even start external ones
self.modules_manager.start_external_instances()
# Allocate Mortal Threads
for _ in xrange(1, self.min_workers):
to_del = []
for mod in self.q_by_mod:
try:
self.create_and_launch_worker(module_name=mod)
# Maybe this modules is not a true worker one.
# if so, just delete if from q_by_mod
except NotWorkerMod:
to_del.append(mod)
for mod in to_del:
logger.debug("The module %s is not a worker one, "
"I remove it from the worker list", mod)
del self.q_by_mod[mod]
# Now main loop
self.do_mainloop()
except Exception:
self.print_unrecoverable(traceback.format_exc())
raise
循环
# Main loop for nearly all daemon
# the scheduler is not managed by it :'(
def do_mainloop(self):
while True:
self.do_loop_turn()
# If ask us to dump memory, do it
if self.need_dump_memory:
self.dump_memory()
self.need_dump_memory = False
if self.need_objects_dump:
logger.debug('Dumping objects')
self.need_objects_dump = False
# Maybe we ask us to die, if so, do it :)
if self.interrupted:
break
do_loop_turn(shinken/satellite.py:do_loop_turn)
def do_loop_turn(self):
logger.debug("Loop turn")
# Maybe the arbiter ask us to wait for a new conf
# If true, we must restart all...
if self.cur_conf is None:
# Clean previous run from useless objects
# and close modules
self.clean_previous_run()
self.wait_for_initial_conf() ============================> 等待配置
# we may have been interrupted or so; then
# just return from this loop turn
if not self.new_conf:
return
self.setup_new_conf()
# Now we check if arbiter speak to us in the pyro_daemon.
# If so, we listen to it
# When it push a conf, we reinit connections
# Sleep in waiting a new conf :)
# TODO: manage the diff again.
while self.timeout > 0:
begin = time.time()
self.watch_for_new_conf(self.timeout)
end = time.time()
if self.new_conf:
if self.graceful_enabled and self.switch_process() is True:
# Child successfully spawned, we're exiting
return
self.setup_new_conf()
self.timeout = self.timeout - (end - begin)
logger.debug(" ======================== ")
self.timeout = self.polling_interval
# Check if zombies workers are among us :)
# If so: KILL THEM ALL!!!
self.check_and_del_zombie_workers()
# But also modules
self.check_and_del_zombie_modules()
# Print stats for debug
for sched_id in self.schedulers:
sched = self.schedulers[sched_id]
for mod in self.q_by_mod:
# In workers we've got actions send to queue - queue size
for (i, q) in self.q_by_mod[mod].items():
logger.debug("[%d][%s][%s] Stats: Workers:%d (Queued:%d TotalReturnWait:%d)",
sched_id, sched['name'], mod,
i, q.qsize(), self.get_returns_queue_len())
# Before return or get new actions, see how we manage
# old ones: are they still in queue (s)? If True, we
# must wait more or at least have more workers
_type = self.__class__.my_type
wait_ratio = self.wait_ratio.get_load()
total_q = 0
for mod in self.q_by_mod:
for q in self.q_by_mod[mod].values():
total_q += q.qsize()
if total_q != 0 and wait_ratio < 2 * self.polling_interval:
logger.debug("I decide to up wait ratio")
self.wait_ratio.update_load(wait_ratio * 2)
# self.wait_ratio.update_load(self.polling_interval)
else:
# Go to self.polling_interval on normal run, if wait_ratio
# was >2*self.polling_interval,
# it make it come near 2 because if < 2, go up :)
self.wait_ratio.update_load(self.polling_interval)
wait_ratio = self.wait_ratio.get_load()
logger.debug("Wait ratio: %f", wait_ratio)
statsmgr.gauge('core.%s.wait-ratio' % _type, wait_ratio, 'queue')
# We can wait more than 1s if needed,
# no more than 5s, but no less than 1
timeout = self.timeout * wait_ratio
timeout = max(self.polling_interval, timeout)
self.timeout = min(5 * self.polling_interval, timeout)
statsmgr.gauge('core.%s.timeout' % _type, self.timeout, 'queue')
# Maybe we do not have enough workers, we check for it
# and launch the new ones if needed
self.adjust_worker_number_by_load()
# Manage all messages we've got in the last timeout
# for queue in self.return_messages:
self.get_workers_results()
# If we are passive, we do not initiate the check getting
# and return
if not self.passive:
# Now we can get new actions from schedulers
self.get_new_actions()
# We send all finished checks
# REF: doc/shinken-action-queues.png (6)
self.manage_returns()
# Get objects from our modules that are not worker based
self.get_objects_from_from_queues()
# Say to modules it's a new tick :)
self.hook_point('tick')
# Checks if memory consumption did not exceed allowed thresold
self.check_memory_usage()
被动监控(poller 往 scheduler 汇报数据)
http://127.0.0.1:7771/put_conf
POST
{
'arbiters': {},
'global': {
'poller_name': u'poller-master',
'http_proxy': '',
'use_timezone': 'NOTSET',
'statsd_host': u'localhost',
'max_plugins_output_length': 65536,
'manage_arbiters': False,
'statsd_types': None,
'passive': False,
'secret': '',
'satellitemap': {},
'polling_interval': 1,
'min_workers': 0,
'statsd_prefix': u'shinken',
'poller_tags': ['None'],
'max_workers': 0,
'statsd_interval': 5,
'api_key': '',
'statsd_enabled': False,
'statsd_port': 8125,
'modules': [],
'statsd_pattern': None,
'processes_by_worker': 256
},
'schedulers': {
0: {
'data_timeout': 120,
'name': u'scheduler-master',
'hard_ssl_name_check': False,
'instance_id': 0,
'timeout': 3,
'address': u'localhost',
'active': True,
'use_ssl': False,
'push_flavor': 559167,
'port': 7768
}
}
}
主动监控(scheduler 主动访问 poller 获取数据)
{
"conf": {
"arbiters": {},
"global": {
"poller_name": "poller-master",
"statsd_prefix": "shinken",
"use_timezone": "NOTSET",
"statsd_host": "localhost",
"max_plugins_output_length": 65536,
"manage_arbiters": false,
"statsd_types": "system,queue,object,perf",
"harakiri_threshold": null,
"passive": true, --------------------# 是否为被动监控
"secret": "",
"satellitemap": {},
"polling_interval": 1,
"results_batch": 0,
"min_workers": 0,
"http_proxy": "",
"poller_tags": ["None"],
"max_workers": 0,
"statsd_interval": 10,
"q_factor": 0,
"api_key": "",
"statsd_enabled": false,
"statsd_port": 8125,
"modules": [],
"max_q_size": 0,
"statsd_pattern": "shinken.{name}.{metric}",
"processes_by_worker": 256
},
"schedulers": {
"0": {
"data_timeout": 120,
"name": "scheduler-master",
"hard_ssl_name_check": false,
"instance_id": 0,
"timeout": 3,
"address": "localhost",
"active": true,
"use_ssl": false,
"push_flavor": 925172,
"port": 7768
}
}
}
}
self.http_daemon.register(self.interface)
self.http_daemon.register(self.brok_interface)
self.http_daemon.register(self.scheduler_interface)
self.http_daemon.register(self.istats)
self.interface【7个方法】
self.interface = IForArbiter(self)
remove_from_conf(self, sched_id)
what_i_managed(self)
wait_new_conf(self)
push_broks(self, broks)
get_external_commands(self)
got_conf(self)
push_host_names(self, sched_id, hnames)
self.brok_interface【1 个方法】
self.brok_interface = IBroks(self)
get_broks(self, bname)
self.scheduler_interface【2 个方法】
self.scheduler_interface = ISchedulers(self)
push_actions(self, actions, sched_id)
get_returns(self, sched_id)
self.istats【1 个方法】
self.istats = IStats(self)
get_raw_stats(self)
curl http://127.0.0.1:7771/api_full |json_pp
{
"push-actions" : { ===============================================> shinken/satellite.py:ISchedulers
"proto" : "push-actions(actions, sched_id)",
"need_lock" : true,
"method" : "POST",
"doc" : "Push new actions to the scheduler (internal)",
"encode" : "json"
},
"remove-from-conf" : { ============================================> shinken/satellite.py:IForArbiter
"proto" : "remove-from-conf(sched_id)",
"need_lock" : true,
"method" : "GET",
"doc" : "Remove a scheduler connection (internal)",
"encode" : "json"
},
"get-log-level" : {
"proto" : "get-log-level()",
"need_lock" : true,
"method" : "GET",
"doc" : "Get the current log level in [NOTSET, DEBUG, INFO, WARNING, ERROR, CRITICAL, UNKNOWN]",
"encode" : "json"
},
"api-full" : {
"proto" : "api-full()",
"need_lock" : true,
"method" : "GET",
"encode" : "json"
},
"api" : {
"proto" : "api()",
"need_lock" : true,
"method" : "GET",
"doc" : "List the api methods and their parameters",
"encode" : "json"
},
"get-returns" : { ===============================================> shinken/satellite.py:ISchedulers
"proto" : "get-returns(sched_id)",
"need_lock" : true,
"method" : "GET",
"doc" : "Get the returns of the actions (internal)",
"encode" : "json"
},
"got-conf" : { ============================================> shinken/satellite.py:IForArbiter
"proto" : "got-conf()",
"need_lock" : false,
"method" : "GET",
"doc" : "Does the daemon got configuration (receiver)",
"encode" : "json"
},
"push-broks" : { ============================================> shinken/satellite.py:IForArbiter
"proto" : "push-broks(broks)",
"need_lock" : false,
"method" : "POST",
"doc" : "Push broks objects to the daemon (internal)",
"encode" : "json"
},
"push-host-names" : { ============================================> shinken/satellite.py:IForArbiter
"proto" : "push-host-names(sched_id, hnames)",
"need_lock" : true,
"method" : "POST",
"doc" : "Push hostname/scheduler links (receiver in direct routing)",
"encode" : "json"
},
"set-log-level" : {
"proto" : "set-log-level(loglevel)",
"need_lock" : true,
"method" : "GET",
"doc" : "Set the current log level in [NOTSET, DEBUG, INFO, WARNING, ERROR, CRITICAL, UNKNOWN]",
"encode" : "json"
},
"get-running-id" : {
"proto" : "get-running-id()",
"need_lock" : false,
"method" : "GET",
"doc" : "Get the current running id of the daemon (scheduler)",
"encode" : "json"
},
"get-start-time" : {
"proto" : "get-start-time()",
"need_lock" : true,
"method" : "GET",
"encode" : "json"
},
"get-external-commands" : { ============================================> shinken/satellite.py:IForArbiter
"proto" : "get-external-commands()",
"need_lock" : false,
"method" : "GET",
"doc" : "Get the external commands from the daemon (internal)",
"encode" : "json"
},
"what-i-managed" : { ============================================> shinken/satellite.py:IForArbiter
"proto" : "what-i-managed()",
"need_lock" : false,
"method" : "GET",
"doc" : "Return the managed configuration ids (internal)",
"encode" : "json"
},
"have-conf" : {
"proto" : "have-conf()",
"need_lock" : false,
"method" : "GET",
"doc" : "Does the daemon got an active configuration",
"encode" : "json"
},
"ping" : {
"proto" : "ping()",
"need_lock" : false,
"method" : "GET",
"doc" : "Test the connection to the daemon. Returns: pong",
"encode" : "json"
},
"profiling-data" : {
"proto" : "profiling-data()",
"need_lock" : false,
"method" : "GET",
"doc" : "Profiling data",
"encode" : "json"
},
"wait-new-conf" : { ============================================> shinken/satellite.py:IForArbiter
"proto" : "wait-new-conf()",
"need_lock" : true,
"method" : "GET",
"doc" : "Ask the daemon to drop its configuration and wait for a new one",
"encode" : "json"
},
"get-raw-stats" : { ============================================> shinken/satellite.py:IStats
"proto" : "get-raw-stats()",
"need_lock" : true,
"method" : "GET",
"doc" : "Get raw stats from the daemon",
"encode" : "json"
},
"put-conf" : {
"proto" : "put-conf(conf)",
"need_lock" : true,
"method" : "POST",
"doc" : "Send a new configuration to the daemon (internal)",
"encode" : "json"
},
"get-broks" : { ============================================> shinken/satellite.py:IBroks
"proto" : "get-broks(bname)",
"need_lock" : true,
"method" : "GET",
"doc" : "Get broks from the daemon",
"encode" : "json"
}
}
curl http://127.0.0.1:7771/get_raw_stats | json_pp
{
"0" : [
{
"queue_number" : 0,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 1,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 2,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 3,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 4,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 5,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 6,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 7,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 8,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 9,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 10,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 11,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 12,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 13,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 14,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
},
{
"queue_number" : 15,
"return_queue_len" : 0,
"queue_size" : 0,
"module" : "fork",
"scheduler_name" : "scheduler-master"
}
]
}
ERROR: 2020-05-02 22:46:51,916: 140719055890144 [Shinken] [file]:/shinken-2.4.3/shinken/action.py [line]:309 [func]:execute__ Fail launching command: /usr/lib/nagios/plugins/check_ping -H localhost -w 1000,100% -c 3000,100% -p 1 [Errno 2] No such file or directory False
[call stack]:
<frame 0>:/shinken-2.4.3/shinken/action.py, line:309 func: self.command, exp, force_shell)
<frame 1>:/shinken-2.4.3/shinken/action.py, line:124 func: return self.execute__() # OS specific part
<frame 2>:/shinken-2.4.3/shinken/worker.py, line:175 func: r = chk.execute()
<frame 3>:/shinken-2.4.3/shinken/worker.py, line:279 func: self.launch_new_checks()
<frame 4>:/shinken-2.4.3/shinken/worker.py, line:234 func: self.do_work(s, returns_queue, c)
<frame 5>:/shinken-2.4.3/shinken/worker.py, line:98 func: real_work(*args)
<frame 6>:/home/.jumbo/lib/python2.7/multiprocessing/process.py, line:114 func: self._target(*self._args, **self._kwargs)
<frame 7>:/home/.jumbo/lib/python2.7/multiprocessing/process.py, line:258 func: self.run()
<frame 8>:/home/.jumbo/lib/python2.7/multiprocessing/forking.py, line:125 func: code = process_obj._bootstrap()
<frame 9>:/home/.jumbo/lib/python2.7/multiprocessing/process.py, line:130 func: self._popen = Popen(self)
<frame 10>:/shinken-2.4.3/shinken/worker.py, line:105 func: self._process.start()
<frame 11>:/shinken-2.4.3/shinken/satellite.py, line:516 func: w.start()
<frame 12>:/shinken-2.4.3/shinken/satellite.py, line:623 func: self.create_and_launch_worker(module_name=mod)
<frame 13>:/shinken-2.4.3/shinken/satellite.py, line:869 func: self.adjust_worker_number_by_load()
<frame 14>:/shinken-2.4.3/shinken/daemon.py, line:341 func: self.do_loop_turn()
<frame 15>:/shinken-2.4.3/shinken/satellite.py, line:1160 func: self.do_mainloop()
<frame 16>:/shinken-2.4.3/bin/shinken-poller, line:91 func: daemon.main()
<frame 17>:/shinken-2.4.3/bin/shinken-poller, line:99 func: main()
流程
satellite.py ====================> self.q_by_mod = {}
do_loop_turn 循环中执行 adjust_worker_number_by_load ,
创建 worker 623-------------------self.create_and_launch_worker(module_name=mod)
start worker 516----------------w.start()
|
|
V
worker.py:105 self._process.start()
|
|
V
worker.py:98 real_work() 启动个子进程,执行 real_work(),这的 real_work 是创建 worker 的时候传参进来的
|
|
V
worker.py:234 do_work() , 创建 worker 的时候,如果 target 为 None ,则默认为 work()
|
|
V
worker.py:251 do_worker()
|
|
V
worker.py:279 self.launch_new_checks()
|
| <---------------------worker.py self.checks.append(msg.get_data())
V
action.py execute__