记一次 libcgroup 配置失败

cgroup 配置失败解决方案

看过之前博客的同学应该知道,我一直使用的 libcgroup 来进行 cgroup 配置,简单方便。
最近遇到了一个报错,很坑,记录一下。

报错

接到反馈说有个环境在产品升级之后, cgconfig.service 无法启动,当时的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
[root@yiran-test 21:31:59 ~]$cat /etc/cgconfig.conf
# yiran cgroups configuration

group . {
cpuset {
cpuset.memory_pressure_enabled = "1";
}
}

group yiran {
cpuset {
cpuset.cpus = "0,1,2,3,4,5";
cpuset.mems = "0-1";
cpuset.cpu_exclusive = "1";
cpuset.mem_hardwall = "1";
}
}

group yiran/bb-main {
cpuset {
cpuset.cpus = "0";
cpuset.mems = "0-1";
cpuset.cpu_exclusive = "1";
cpuset.mem_hardwall = "1";
}
}

group yiran/bb-io {
cpuset {
cpuset.cpus = "1";
cpuset.mems = "0-1";
cpuset.cpu_exclusive = "1";
cpuset.mem_hardwall = "1";
}
}

group yiran/aa-main {
cpuset {
cpuset.cpus = "2";
cpuset.mems = "0-1";
cpuset.cpu_exclusive = "1";
cpuset.mem_hardwall = "1";
}
}

group yiran/others {
cpuset {
cpuset.cpus = "3";
cpuset.mems = "0-1";
cpuset.cpu_exclusive = "1";
cpuset.mem_hardwall = "1";
}
}


group yiran/app {
cpuset {
cpuset.cpus = "4,5";
cpuset.mems = "0-1";
cpuset.cpu_exclusive = "0";
cpuset.mem_hardwall = "1";
}
}

group qemu {
cpuset {
cpuset.cpus = "6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47";
cpuset.mems = "0-1";
cpuset.cpu_exclusive = "0";
cpuset.mem_hardwall = "1";
}
}

咋一看,配置文件看上去是正确的,除了最后一个组的 cpuset.cpus 配置略长,但是也没错,按道理应该服务正常启动才对,尝试重启服务查看服务报错信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@yiran-test 21:33:55 ~]$systemctl restart cgred
[root@yiran-test 21:33:59 ~]$systemctl restart cgconfig
Job for cgconfig.service failed because the control process exited with error code. See "systemctl status cgconfig.service" and "journalctl -xe" for details.
[root@yiran-test 21:34:05 ~]$systemctl status cgconfig
● cgconfig.service - Control Group configuration service
Loaded: loaded (/usr/lib/systemd/system/cgconfig.service; enabled; vendor preset: disabled)
Active: failed (Result: exit-code) since 四 2019-04-11 21:34:05 CST; 4s ago
Process: 6744 ExecStop=/usr/sbin/cgclear -l /etc/cgconfig.conf -L /etc/cgconfig.d -e (code=exited, status=3)
Process: 11465 ExecStart=/usr/sbin/cgconfigparser -l /etc/cgconfig.conf -L /etc/cgconfig.d -s 1664 (code=exited, status=109)
Main PID: 11465 (code=exited, status=109)

4月 11 21:34:05 yiran-test systemd[1]: Starting Control Group configuration service...
4月 11 21:34:05 yiran-test cgconfigparser[11465]: /usr/sbin/cgconfigparser; error loading /etc/cgconfig.conf: Failed to remove a non-empty group
4月 11 21:34:05 yiran-test cgconfigparser[11465]: Error: failed to set /sys/fs/cgroup/cpuset/qemu/cpuset.cpus: Invalid argument
4月 11 21:34:05 yiran-test systemd[1]: cgconfig.service: main process exited, code=exited, status=109/n/a
4月 11 21:34:05 yiran-test systemd[1]: Failed to start Control Group configuration service.
4月 11 21:34:05 yiran-test systemd[1]: Unit cgconfig.service entered failed state.
4月 11 21:34:05 yiran-test systemd[1]: cgconfig.service failed.

发现在重启 cgconfig 时报错,报错信息如下:

1
2
4月 11 21:34:05 yiran-test cgconfigparser[11465]: /usr/sbin/cgconfigparser; error loading /etc/cgconfig.conf: Failed to remove a non-empty group
4月 11 21:34:05 yiran-test cgconfigparser[11465]: Error: failed to set /sys/fs/cgroup/cpuset/qemu/cpuset.cpus: Invalid argument

第一行说移除一个非空的 cgroup 失败,下一条提示 /sys/fs/cgroup/cpuset/qemu/cpuset.cpus 也就是我们觉得略微异常的 cgroup 参数无效,可是参数明明是正确配置的,为啥就无效了呢?

调查

尝试修改 cpuset.cpus ,将其调整为:

1
2
3
4
5
6
7
8
9
10
[root@yiran-test 21:36:32 ~]$tail /etc/cgconfig.conf

group qemu {
cpuset {
cpuset.cpus = "6,7,8,9,10";
cpuset.mems = "0-1";
cpuset.cpu_exclusive = "0";
cpuset.mem_hardwall = "1";
}
}

重启 cgconfig 服务:

1
2
3
4
5
6
7
8
9
10
11
12
[root@yiran-test 21:36:36 ~]$systemctl restart cgred
[root@yiran-test 21:36:55 ~]$systemctl restart cgconfig
[root@yiran-test 21:37:00 ~]$systemctl status cgconfig
● cgconfig.service - Control Group configuration service
Loaded: loaded (/usr/lib/systemd/system/cgconfig.service; enabled; vendor preset: disabled)
Active: active (exited) since 四 2019-04-11 21:37:00 CST; 3s ago
Process: 6744 ExecStop=/usr/sbin/cgclear -l /etc/cgconfig.conf -L /etc/cgconfig.d -e (code=exited, status=3)
Process: 17491 ExecStart=/usr/sbin/cgconfigparser -l /etc/cgconfig.conf -L /etc/cgconfig.d -s 1664 (code=exited, status=0/SUCCESS)
Main PID: 17491 (code=exited, status=0/SUCCESS)

4月 11 21:37:00 yiran-test systemd[1]: Starting Control Group configuration service...
4月 11 21:37:00 yiran-test systemd[1]: Started Control Group configuration service.

服务正常运行了,那么我推测可能是跟 cpuset.cpus 长度有关,这时候只能求助于 Google 啦。

很容易,我们找到了这个答案,RedHat 官方 KB 中的介绍:

Root Cause
Previously, the internal representation of a value of any cgroup subsystem parameter was limited to have the length of 100 characters at maximum. Consequently, the libcgroup library truncated the values longer than 100 characters before writing them to a file representing matching cgroup subsystem parameter in the kernel.

Resolution
The maximal length of values of cgroup subsystem parameters in libcgroup has been extended to 4096 characters. As a result, libcgroup now handles values of cgroup subsystem parameters with any length correctly. (BZ#1549175)
RHEL 6 https://access.redhat.com/downloads/content/rhel---6/x86_64/169/libcgroup/0.40.rc1-26.el6/src/fd431d51/package (or newer) via RHBA-2018:1861
RHEL 7 libcgroup-0.41-20.el7 (or newer) via RHBA-2018:3058

官方给出的解决方式是通过升级 libcgroup 来解决,但是我不想这么做。

为什么?要知道在生产环境中,我们想要进行第三方软件包的升级是要经过层层测试的,等到测试完成不知道什么时候了,所以我们需要一个快速折中方案。

解决方案

我们知道,libcgroup 只是作为一个配置 cgroup 软件的一种,最终操作的都是 cgroup 实际挂载点下的配置文件,比如 CentOS 默认的 /sys/fs/cgroup/

那么我们来看下正常配置下 qemu 组中的 cpuset.cpus 配置长什么样:

1
2
[root@yiran-test 21:37:04 ~]$cat /sys/fs/cgroup/cpuset/qemu/cpuset.cpus
6-10

Ok,我们明明在 libcgroup 配置文件中写的是 6,7,8,9,10 ,在 cgroup 配置文件中就转换成了 6-10, 那么一切都简单了。

当我们配置 cgroup 如果 cpu processor id 是连续的,那么我们就可以通过 - 来连接起始和终止 id 就可以了,问题解决。

总结

libcgroup 作为 RedHat 官方指定的 cgroup 配置工具,没想到会出现这种问题,如果 cpuset.cpus 我们要设置非连续的 cpu processor id 的话,只能通过升级方式解决了。

apscheduler 源码阅读

简介

apscheduler 全称 Advanced Python Scheduler,调度器,主要功能如下:

  • 动态添加、删除任务
  • 暂停、恢复任务
  • 周期性调度:cron,date,interval

那么接下来我们根据官方示例,看看 apscheduler 是如何进行处理任务的。

示例版本为 2.1,因为在 2.1 版本包含目前 master 分支上的主要功能,简单易懂。

代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
yiran@zhouyirandeMacBook-Pro:~/Documents/git-repo/apscheduler
2.1 ✔ $ tree apscheduler
apscheduler
├── __init__.py
├── events.py
├── job.py
├── jobstores
│   ├── __init__.py
│   ├── base.py
│   ├── mongodb_store.py
│   ├── ram_store.py
│   ├── redis_store.py
│   ├── shelve_store.py
│   └── sqlalchemy_store.py
├── scheduler.py
├── threadpool.py
├── triggers
│   ├── __init__.py
│   ├── cron
│   │   ├── __init__.py
│   │   ├── expressions.py
│   │   └── fields.py
│   ├── interval.py
│   └── simple.py
└── util.py

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from datetime import datetime, timedelta

from apscheduler.scheduler import Scheduler
from apscheduler.jobstores.shelve_store import ShelveJobStore


def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)


if __name__ == '__main__':
scheduler = Scheduler(standalone=True)
scheduler.add_jobstore(ShelveJobStore('example.db'), 'shelve')
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_date_job(alarm, alarm_time, name='alarm',
jobstore='shelve', args=[datetime.now()])
print('To clear the alarms, delete the example.db file.')
print('Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass

Scheduler

上述示例很容易理解,首先对 Scheduler 实例化,然后添加 jobstore,定义一个名为 alarm 的 job,并指定其运行时间为当前时间 + 10s,将该 job 添加到 scheduler 中,添加 job 类型为 date_job,然后启动 scheduler。

可以看到我们所有的操作都是通过 scheduler 方法实现的,那么我们来看下 Scheduler 类具体实现了哪些功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Scheduler(object):
"""
This class is responsible for scheduling jobs and triggering
their execution.
"""

_stopped = True
_thread = None

def __init__(self, gconfig={}, **options):
self._wakeup = Event()
self._jobstores = {}
self._jobstores_lock = Lock()
self._listeners = []
self._listeners_lock = Lock()
self._pending_jobs = []
self.configure(gconfig, **options)

Scheduler 构造函数中,对一些变量进行初始化,这里要注意 self._wakeup ,后续的一些主要功能都是通过它来实现的。接下来看看 add_jobstore 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def add_jobstore(self, jobstore, alias, quiet=False):
self._jobstores_lock.acquire() # 请求锁
try:
if alias in self._jobstores:
raise KeyError('Alias "%s" is already in use' % alias)
self._jobstores[alias] = jobstore # 将 jobstore 别名作为 key,添加到 self._jobstores 中
jobstore.load_jobs() # 加载 jobstore 中所有 job
finally:
self._jobstores_lock.release() # 释放锁

# Notify listeners that a new job store has been added
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias)) # 事件通知

# Notify the scheduler so it can scan the new job store for jobs
if not quiet:
self._wakeup.set() # 将 Event 置为 True

add_jobstore 中,将 jobstore 添加到 scheduler 中,并加载当前 jobstore 中的所有任务,接下来将具体的 job 添加到 scheduler 中:

1
2
3
def add_date_job(self, func, date, args=None, kwargs=None, **options):
trigger = SimpleTrigger(date)
return self.add_job(trigger, func, args, kwargs, **options)

这里的 SimpleTrigger 只是多种 Trigger 中的一种,根据 Trigger 类型的不同,最主要的差别在于 get_next_fire_time 计算方式不同。

如果添加的任务是 interval_job,那么对应 Trigger 为 IntervalTrigger ;如果添加的任务是 cron_job,那么对应的 Trigger 为 CronTrigger

可以看到不同的任务只是 Trigger 计算方式不同,最终还是通过 add_job 方法,继续看:

1
2
3
4
5
6
7
8
9
10
11
12
def add_job(self, trigger, func, args, kwargs, jobstore='default',
**options):
job = Job(trigger, func, args or [], kwargs or {},
options.pop('misfire_grace_time', self.misfire_grace_time),
options.pop('coalesce', self.coalesce), **options) # 将 job 实例化
if not self.running: # 如果 scheduler 未启动,那么将其添加到等待队列中
self._pending_jobs.append((job, jobstore))
logger.info('Adding job tentatively -- it will be properly '
'scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, True) # 否则添加 job 到 jobstore 中
return job

继续看 _real_add_job 中的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def _real_add_job(self, job, jobstore, wakeup):
job.compute_next_run_time(datetime.now()) # 计算job 下次运行时间
if not job.next_run_time:
raise ValueError('Not adding job since it would never be run')

self._jobstores_lock.acquire() # 请求锁
try:
try:
store = self._jobstores[jobstore]
except KeyError:
raise KeyError('No such job store: %s' % jobstore)
store.add_job(job) # 添加 job
finally:
self._jobstores_lock.release()

# Notify listeners that a new job has been added
event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
self._notify_listeners(event) # 事件通知

logger.info('Added job "%s" to job store "%s"', job, jobstore)

# Notify the scheduler about the new job
if wakeup:
self._wakeup.set() # # 将 Event 置为 True

_real_add_job 中我们终于看到 store.add_job(job) ,至于 store.add_job 如何实现我们之后看 JobStore 再说。

现在我们已经给 scheduler 添加了 jobstore 和 job,那么看下 scheduler 是如何运行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def start(self):
...
# Schedule all pending jobs
for job, jobstore in self._pending_jobs: # 将 scheduler 未运行时添加的 job,即在等待队列中的 job 添加到 jobstore 中
self._real_add_job(job, jobstore, False)
del self._pending_jobs[:]

self._stopped = False
if self.standalone:
self._main_loop()
else:
self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.setDaemon(self.daemonic)
self._thread.start()

在 scheduler 运行时,会先将所有的 job 加载到 jobstore 中,然后调用 self._main_loop ,如果 Standalone 为 True,则会一直阻塞知道没有 job 需要运行,看看 self._main_loop 做了啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def _main_loop(self):
"""Executes jobs on schedule."""

logger.info('Scheduler started')
self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START)) # 事件通知

self._wakeup.clear()
while not self._stopped:
logger.debug('Looking for jobs to run')
now = datetime.now()
next_wakeup_time = self._process_jobs(now) # 计算下次唤醒时间

if next_wakeup_time is not None:
wait_seconds = time_difference(next_wakeup_time, now)
logger.debug('Next wakeup is due at %s (in %f seconds)',
next_wakeup_time, wait_seconds)
try:
self._wakeup.wait(wait_seconds) # 等待 Event flag
except IOError: # Catch errno 514 on some Linux kernels
pass
self._wakeup.clear()
elif self.standalone:
logger.debug('No jobs left; shutting down scheduler')
self.shutdown() # 若 scheduler standalone 为 True 且 jobs 为空,则停止 scheduler
break
else:
logger.debug('No jobs; waiting until a job is added')
try:
self._wakeup.wait() # 等待 Event flag
except IOError: # Catch errno 514 on some Linux kernels
pass
self._wakeup.clear()

还记得上面提到的 Scheduler 构造函数中的 self._wakeup 么,它实际上是 threading.Event ,它的 wait 方法会一直 block 直到 Event flag 为 True,也就是我们上面看到的 self._wakeup.set() ,那么我们可以知道在 Scheduler 中有几种场景会置为 True:

  1. Scheduler.shutdown
  2. Scheduler.add_jobstore
  3. Scheduler._real_add_job

如果没有触发上述场景,则 _main_loop 会根据 jobs 的执行时间一直循环等待。

JobStore

在 apscheduler 中,JobStore 只是单纯的实现了 Job 相关的方法:

1
2
3
4
5
6
class JobStore(object):
def add_job(self, job):
def update_job(self, job):
def remove_job(self, job):
def load_jobs(self):
def close(self):

其中,对 job 的操作会根据 JobStore 类型的不同,而采用不同的序列化方式,比如在 MongoDBJobStore 中采用的是 bson.binary,而在其他 JobStore 比如 RedisJobStore 中采用的都是 pickle

Events

Scheduler 中,我们已经看到通过 threading.Event 来实现事件通知的,那么我们通知的 Event 都是在 apscheduler.events 中定义好的,比如:

1
2
3
4
5
6
7
8
9
class JobEvent(SchedulerEvent):
def __init__(self, code, job, scheduled_run_time, retval=None,
exception=None, traceback=None):
SchedulerEvent.__init__(self, code)
self.job = job
self.scheduled_run_time = scheduled_run_time
self.retval = retval
self.exception = exception
self.traceback = traceback

JobEvent 中,我们能看到 job 的执行时间,返回值,异常捕获等信息。如果看过之前关于 huey 博客 的同学应该知道,在 huey 中是可以直接通过 task id 获取 task 执行结果的,但是在 apscheduler 中,我们并没有直接获取该结果的方法,而是通过在 Scheduler 中的 add_listener 添加监听者,监控指定成功的 Job 获取该 Job 的返回值,感觉这里不太友好。

总结

到这里我们基本上已经将 apscheduler 的流程走了一遍,具体的 Trigger 计算时间的方法之后有机会单独写一下关于 cron,interval,date 的计算方法。

与 huey 相比,apscheduler 使用上要简单,但是简单也意味着功能的不足,比如获取 job 执行结果、job retry 机制等等。当然也有比较好的地方,apscheduler 在跟 web 框架比如 Flask,Django 集成的时候有一些第三方插件可以直接使用,不用像 Huey 一样要单独启动一个 consumer 进程,比较方便。

记一次 Python 编码踩坑

背景

一直知道 Python 容易踩编码的坑,尤其是 Python2,昨天第一次遇到,记一下。

起因

产品中有一个账号关联的功能,需要的参数大概有 host,port,user,passwd 这么几个参数,昨天发现一个环境中账号关联失败,看请求应该还没到账号认证那里就失败了,查看 rest-server 日志,并没有发现错误异常,api 也是正常返回的,通过其他方式验证账号是有效的,当时觉得很奇怪,没什么想法。

调查

既然 rest-server 中日志没有报错,那么看看服务是否有什么异常。
这里特意说名下,如果服务使用的是 gunicorn 或者 celery 等第三方库作为守护进程,有一些系统报错是不会记录到你的服务中的,而是会直接打印到系统中(messages or systemd)。

发现 systemctl statusjournal -u 有报错,报错内容如下:

1
2
3
4
5
4月 02 06:20:00 SCVM70 gunicorn[31225]: Traceback (most recent call last):
4月 02 06:20:00 SCVM70 gunicorn[31225]: File "/usr/lib64/python2.7/site-packages/gevent/threadpool.py", line 207, in _worker
4月 02 06:20:00 SCVM70 gunicorn[31225]: value = func(*args, **kwargs)
4月 02 06:20:00 SCVM70 gunicorn[31225]: error: getaddrinfo() argument 2 must be integer or string
4月 02 06:20:00 SCVM70 gunicorn[31225]: (<ThreadPool at 0x120ae50 0/5/10>, <built-in function getaddrinfo>) failed with error

字面意思是传递参数的类型不对,必须为 int 或者 string。

在 Chrome 中查看当时 API 请求参数,所有参数均为 string,我也实际用 int 或者 str 类型进行验证,账号验证都是可以成功的,感觉进入了死胡同。

原因

最后发现是编码的问题,传递的 port 参数是 unicode 编码,而 getaddrinfo() 需要的是 int 或者 string。

在 python2中,str 其实是 bytes,而不是 unicode,在代码中声明了编码方式为 utf-8,并将该参数存入到了 DB 中,导致下次请求传递的还是 DB 中的 utf-8 类型的 port,而不是 int 或者 string。

解决

这里解决的方式很简单,直接将 port 强制转换为 int 或者 string 就好。根本解决方式应该是在之后的代码编写过程中,规范变量类型,比如 port 就使用 int,而不是 str(unicode),防止出现未知错误。

huey 源码阅读

背景

最近读完了 《Redis 实战》对 Redis 有了一些了解,但是没有在实际项目中应用过,就想找一个使用 Redis 的项目来看看,找到 Huey 是因为之前使用过,趁机了解下具体实现。

简介

Huey 的定位是一个轻量级的任务队列,仅依赖于 Redis 作为任务相关信息存储,支持的功能有:

  • 多种 worker 执行方式:thread,process,greenlet
  • 支持多种任务类型:特定时间运行,周期性运行
  • 包含重试机制,可以指定重试次数及重试间隔
  • 支持任务锁

我们根据官方的示例,来看看 Huey 是如何处理任务的,目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
master ✔ $ pwd
/Users/yiran/Documents/git-repo/huey/examples/simple
yiran@zhouyirandeMacBook-Pro:~/Documents/git-repo/huey/examples/simple
master ✔ $ tree .
.
├── README
├── __init__.py
├── config.py
├── cons.sh
├── main.py
└── tasks.py

注意,这个目录结构是 Huey 官方建议的,具体原因为:

Behind-the-scenes when you decorate a function with task() or periodic_task(), the function registers itself with a centralized in-memory registry. When that function is called, a reference is put into the queue (along with the arguments the function was called with, etc), and when that message is consumed, the function is then looked-up in the consumer’s registry. Because of the way this works, it is strongly recommended that all decorated functions be imported when the consumer starts up.

Task

Huey 支持通过 @task 装饰器的方式创建任务,示例如下:

1
2
3
4
5
6
7
8
from huey import RedisHuey

huey = RedisHuey('simple.test', blocking=True)

@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
return 'Counted %s beans' % num

从 huey.RedisHuey 创建了 huey 实例,我们看下 RedisHuey 是什么:

1
2
3
4
5
6
7
8
9
10
11
class RedisHuey(Huey):
def get_storage(self, read_timeout=1, max_errors=1000,
connection_pool=None, url=None, **connection_params):
return RedisStorage(
name=self.name,
blocking=self.blocking,
read_timeout=read_timeout,
max_errors=max_errors,
connection_pool=connection_pool,
url=url,
**connection_params)

在 huey.storage 中定义了 RedisHuey,继承自 Huey 类,重新实现了 get_storage 方法,我们看下 RedisStorage 是做什么用途的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class RedisStorage(BaseStorage):
redis_client = Redis

def __init__(self, name='huey', blocking=False, read_timeout=1,
max_errors=1000, connection_pool=None, url=None,
client_name=None, **connection_params):

if Redis is None:
raise ImportError('"redis" python module not found, cannot use '
'Redis storage backend. Run "pip install redis" '
'to install.')

if sum(1 for p in (url, connection_pool, connection_params) if p) > 1:
raise ValueError(
'The connection configuration is over-determined. '
'Please specify only one of the the following: '
'"url", "connection_pool", or "connection_params"')

if url:
connection_pool = ConnectionPool.from_url(
url, decode_components=True)
elif connection_pool is None:
connection_pool = ConnectionPool(**connection_params)
...

def clean_name(self, name):
def convert_ts(self, ts):
def enqueue(self, data):
def dequeue(self):
...
def get_errors(self, limit=None, offset=0):
def flush_errors(self):
def emit(self, message):
def listener(self):
def __iter__(self):

RedisStorage 继承自 Storage,主要用于所有任务相关信息的写入及读取,目前 Huey 中只实现了 Storage 介质,如果想使用其他方式(如 MongoDB)就需要自己实现了。

知道了 RedisHuey 跟 Huey 的主要区别只是在于 get_storage 方法不同,那么看看 task 方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def task(self, retries=0, retry_delay=0, retries_as_argument=False,
include_task=False, name=None, **task_settings):
def decorator(func):
"""
Decorator to execute a function out-of-band via the consumer.
"""
return TaskWrapper(
self,
func.func if isinstance(func, TaskWrapper) else func,
retries=retries,
retry_delay=retry_delay,
retries_as_argument=retries_as_argument,
include_task=include_task,
name=name,
**task_settings)
return decorator

我们看下 TaskWrapper 做了啥,一些变量复制忽略掉,关键的是这几行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class TaskWrapper(object):
def __init__(self, huey, func, retries=0, retry_delay=0,
retries_as_argument=False, include_task=False, name=None,
task_base=None, **task_settings):
...
self.task_class = create_task(
QueueTask if task_base is None else task_base,
func,
retries_as_argument,
name,
include_task,
**task_settings)
self.huey.registry.register(self.task_class)

def is_revoked(self, dt=None, peek=True):
return self.huey.is_revoked(self.task_class, dt, peek)

def revoke(self, revoke_until=None, revoke_once=False):
self.huey.revoke_all(self.task_class, revoke_until, revoke_once)

将函数 func 创建为一个任务,并将任务注册到 Huey 中。同时实现了一些方法,这些方法最终调用的都是在 Huey 类中实现的。

Consumer

1
2
3
4
5
6
7
8
9
10
root@yiran30250:~/project/huey/examples/simple
master ✔ $ cat cons.sh
#!/bin/bash
echo "HUEY CONSUMER"
echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl+C"
PYTHONPATH=.:$PYTHONPATH
export WORKER_CLASS=${1:-thread}
python ../../huey/bin/huey_consumer.py main.huey --workers=2 -v -s 10 -k $WORKER_CLASS -C

接下来我们看下 Consumer 是如何实现的,在进程启动脚本里,先忽略其他参数,我们指定了 huey 实例,并指定了 worker 数量为 2,我们看下是如何执行的:

huey_consumer.py

1
2
3
4
5
6
7
8
def consumer_main():
...
huey_instance = load_huey(args[0])
config.setup_logger()
consumer = huey_instance.create_consumer(**config.values)
consumer.run()
if __name__ == '__main__':
consumer_main()

加载 huey 实例,创建相应 Consumer 实例并运行,我们看下 Consumer 做了什么:

1
2
3
4
5
class Consumer(object):
"""
Consumer sets up and coordinates the execution of the workers and scheduler
and registers signal handlers.
"""

根据注释我们可以知道 Consumer 主要是启动了 worker 和 scheduler,并截获相应的信息进行处理。
在 huey/consumer.py 中定义了 Worker,Scheduler,Environment 类,其中 Environment 根据所指定的 worker 类型的不同,分为 thread,process,greenlet。

我们看下 Worker 是如何执行对应的任务的:

Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def loop(self, now=None):
task = None
exc_raised = True
try:
task = self.huey.dequeue() # 从 hue 中获取任务,也就是从 redis 中获取任务
...
if task:
self.delay = self.default_delay
self.handle_task(task, now or self.get_now()) # 如果获取到了任务,对任务进行处理
elif exc_raised or not self.huey.blocking:
self.sleep()

def handle_task(self, task, ts):
if not self.huey.ready_to_run(task, ts):
self.add_schedule(task) # 若任务没有到达执行时间,则添加到 schedule 中
elif not self.is_revoked(task, ts):
self.process_task(task, ts) # 若任务没有被取消,则执行任务
else:
self.huey.emit_task(
EVENT_REVOKED,
task,
timestamp=ts)
self._logger.debug('Task %s was revoked, not running', task) # 任务被取消,通知 event

def process_task(self, task, ts):
...
self.run_pre_execute_hooks(task) # 执行 pre hook 动作
...
task_value = self.huey.execute(task) # 具体执行 task
...
self.run_post_execute_hooks(task, task_value, exception) # 执行 post hook 动作
...

看到 self.huey.execute(task) 我们发现最终还是通过 Huey 中的方法 execute 来执行 task,那么我们看下是如何实现的:

1
2
3
4
5
6
def execute(self, task):
if not isinstance(task, QueueTask):
raise TypeError('Unknown object: %s' % task)

try:
result = task.execute()

发现这里调用的是 task.execute,这里的 task 是谁呢,是我们最开始提到的 TaskWrapper 么,在 TaskWrapper 中没有实现 execute 方法,这里其实是上面提到的 create_task 返回的类,我们看下 create_task 的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def create_task(task_class, func, retries_as_argument=False, task_name=None,
include_task=False, **kwargs):
def execute(self):
args, kwargs = self.data or ((), {})
if retries_as_argument:
kwargs['retries'] = self.retries
if include_task:
kwargs['task'] = self
return func(*args, **kwargs)

attrs = {
'execute': execute,
'__module__': func.__module__,
'__doc__': func.__doc__}
attrs.update(kwargs)

if not task_name:
task_name = func.__name__

return type(task_name, (task_class,), attrs)

可以看到 execute 函数中最终执行的就是 func 自己, func(*args, **kwargs) ,通过 type 返回真正的 task 类,指定父类为 task_class (默认是 QueueTask)。

到这里我们就完成了整个流程:任务注册,任务调度,任务执行。那么我们再来看看 Huey 中的锁是怎么实现的。

Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class TaskLock(object):
"""
Utilize the Storage key/value APIs to implement simple locking. For more
details see :py:meth:`Huey.lock_task`.
"""
def __init__(self, huey, name):
self._huey = huey
self._name = name
self._key = '%s.lock.%s' % (self._huey.name, self._name)
self._huey._locks.add(self._key)

def __call__(self, fn):
@wraps(fn)
def inner(*args, **kwargs):
with self:
return fn(*args, **kwargs)
return inner

def __enter__(self):
if not self._huey._put_if_empty(self._key, '1'):
raise TaskLockedException('unable to set lock: %s' % self._name)

def __exit__(self, exc_type, exc_val, exc_tb):
self._huey._get_data(self._key)
1
2
def put_if_empty(self, key, value):
return self.conn.hsetnx(self.result_key, key, value)

在 Huey 中,任务锁的实现非常简单,单纯的利用了下 Redis 中的 HSETNX 机制:

Sets field in the hash stored at key to value, only if field does not yet exist. If key does not exist, a new key holding a hash is created. If field already exists, this operation has no effect.

总结

Huey 作为一个任务调度应用,整体代码量不多,但是有很多值得学习的地方,也许过段时间再看会有新的收获。

LVM 使用总结

全称 Logical Volume Manager,逻辑卷管理,简单的说就是能将物理磁盘统一管理,在现在这个满大街都在谈论分布式存储的年代,已经很少有人关注和使用它了,毕竟如果买个公有云上的虚拟机,挂载磁盘都已经得到 TP999+ 的稳定性加持,扩容什么的也只是一句话的事,完全没必要使用 LVM 了。
那么我今天为啥要写 LVM 呢,首先肯定是最近要使用它(清理它),其次,现在 RedHat 系列发行版仍将 LVM 作为系统安装的默认磁盘处理方式,对于我这种安装系统家常便饭的人,还是要了解下的。

本文不会包含 LVM 命令的使用。

概念

LVM 中有 3 个最重要的概念,分别是 PV,VG,LV,下面我分别来说一下:

PV

Physical Volumes,物理卷,属于 LVM 中最底层的单元,通常介质为磁盘,或者磁盘上的某个分区,多个 PV 可以组成 VG(卷组)。

VG

Volume Group,卷组,也是我们通常说的“池化”具体表现形式,VG 可以存在多个,我们可以根据具体的用途来划分 VG,来提供相应的性能保证,比如我们可以对 IO 密集型应用使用全 SSD 组成的 VG。

LV

Logical Volume,逻辑卷,也就是 LVM 中的 LV,LVM 暴露出来供我们使用的逻辑(虚拟)卷。我们可以将其当做一个普通的磁盘使用,可以在其上进行分区,格式化,磁盘读写等操作。

优缺点

简单的介绍了 LVM 的概念,那么我们来说下 LVM 的优缺点:

优点:

  • 随意扩(缩)容
  • 在线迁移 LV(平时用处不多,主要在 P2V 场景下使用,也就是通常文章中提到的“企业上云”必要的操作)
  • 快照,RAID 等高级功能

缺点

管理复杂,通常服务器上磁盘数量多,且磁盘类型未必能够统一(考虑大量公司中的利旧场景),那么我们管理起来就比较麻烦了,要针对所有磁盘创建 PV,然后加入到对应的 VG 中,最后再对外提供 LV,往往我们需要维护相应的功能来提供简单的借口供上层应用使用。

LVM RAID vs mdadm

在上述的优点中,我们提到了 RAID 功能。相信大部分同学都了解 RAID 的概念,通常我们提到的 RAID 都是由相应硬件(存储控制器)来提供,如果没有硬件怎么办呢?我们可以通过 mdadm 来实现软 RAID, 可以达到相应的效果,只是根据 OS 版本不同,软 RAID 的稳定性堪忧(Ubuntu 18.04 存在 Bug)。

那么同样都提供了软 RAID 功能,LVM 与 mdadm 相比,有什么不同呢?

首先,LVM 实现了完整的 RAID 功能,且 RedHat 也提供了完整的文档来描述各个功能的使用,是可以确定获得了官方支持的,那么社区里是怎么评论的呢?

从使用上来说,LVM 作为一款逻辑卷管理软件,虽然支持完成的软 RAID 功能,但是社区支持并不好,大家还是习惯于(信任)mdadm,毕竟 Do One Thing and Do It Well 准没错。mdadm 虽然我在使用上觉得很糟糕,但是在线上环境的表现,还是很完美的,目前 RAID1 还没出过问题,稳。

而且从性能上说,有人对 LVM 和 mdadm 进行过性能对比,总体来说 mdadm 综合表现还是要比 LVM 好一些的。

(最近在对 nvme 磁盘进行分区 raid1 时经常遇到失败的情况,现象是mdadm 进程自己退出,再次执行又 ok,还没仔细调研。)

总结

虽然现在分布式存储大火,各家云产品都在使用,但是单机磁盘管理上,LVM 还是很好用的,RedHat 也一直在单机磁盘管理上有所投入,比如在 RedHat7.5 上推出的 VDO 功能,还是要持续关注了解的。

参考链接

Wireguard 体验

从春节假期之后,因为一些原因,就没有再使用 SS 作为访问互联网的工具了,而是使用了 V2ray,使用 2个月下来,感受还是很好的,无论是速度还是稳定性都要比 SS 好很多。最近看到好多新闻说 Wireguard 客户端适配多了起来,比如 Win,Mac,Android,今天尝试一下。

Wireguard

WireGuard is a free and open-source software application and protocol that implements virtual private network (VPN) techniques to create secure point-to-point connections in routed or bridged configurations. It is run as a module inside the Linux kernel and aims for better performance than the IPsec and OpenVPN tunneling protocols.[2]

Linus 的评价: Maybe the code isn't perfect, but I've skimmed it, and compared to the horrors that are OpenVPN and IPSec, it's a work of art.

服务端配置

kernel 配置

环境配置为:CentOS Linux release 7.6.1810 (Core)

Kernel 版本: 4.20.3-1.el7.elrepo.x86_64

安装 kernel-devel kernel-headers,注意,如果使用的是 elrepo 源,则需要安装对应版本的 kernel-ml-devel kernel-ml-headers , 安装后状态:

1
2
3
4
# rpm -qa |grep kernel  |grep "4.20"
kernel-ml-4.20.3-1.el7.elrepo.x86_64
kernel-ml-devel-4.20.3-1.el7.elrepo.x86_64
kernel-ml-headers-4.20.3-1.el7.elrepo.x86_64

安装 wireguard

yum install wireguard-dkms wireguard-tools

加载 Kernel module

modprobe wireguard && lsmod |grep wire

编辑 wireguard 配置文件

通过 ip ad 查看本机网卡名称,以 eth0 为例:

1
2
3
4
5
6
7
8
9
10
11
12
#mkdir /etc/wireguard && cd /etc/wireguard
# wg genkey # 生成 Private Key
#cat > /etc/wireguard/wg0.conf << EOF
[Interface]
PrivateKey = <Private Key>
Address = 10.0.0.1/24 # 服务端 IP 地址
ListenPort = 11111 # 连接端口
PostUp = iptables -A FORWARD -i wg0 -j ACCEPT; iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE; ip6tables -A FORWARD -i wg0 -j ACCEPT; ip6tables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
PostDown = iptables -D FORWARD -i wg0 -j ACCEPT; iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE; ip6tables -D FORWARD -i wg0 -j ACCEPT; ip6tables -t nat -D POSTROUTING -o eth0 -j MASQUERADE
SaveConfig = true

EOF

启动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#wg-quick up wg0
#wg
interface: wg0
public key: YNyypVL5wmYA/aaaaaa/VOz4c7BGALHgo= # 后续配置客户端用到
private key: (hidden)
listening port: 11111

# ip ad
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500
3: wg0: <POINTOPOINT,NOARP,UP,LOWER_UP> mtu 1420 qdisc noqueue state UNKNOWN group default qlen 1000
link/none
inet 10.0.0.1/24 scope global wg0
valid_lft forever preferred_lft forever

客户端配置

配置文件

1
2
3
4
5
6
7
8
9
10
# wg genkey # 生成客户端 PublicKey
[Interface]
PublicKey = <PublicKey>
Address = 10.0.0.2/24 # 客户端 IP 地址
DNS = 8.8.8.8 # 指定 DNS Server

[Peer]
PublicKey = YNyypVL5wmYA/aaaaaa/VOz4c7BGALHgo= # 服务端公钥
Endpoint = <Server ip>:11111 # 服务端公网 IP,端口为服务端配置端口
AllowedIPs = 0.0.0.0/0 # 允许 IP 段

服务端添加客户端信息

1
2
3
4
5
6
7
8
9
10
11
12
#wg set wg0 peer <客户端公钥> allowed-ips 10.0.0.2/24
## wg
interface: wg0
public key: YNyypVL5wmYA/aaaaaa/VOz4c7BGALHgo=
private key: (hidden)
listening port: 11111

peer: QRcE0sLvJib8MhWxxxxxxxxx61L0IdZis=
endpoint: xxxxxxx:yyy
allowed ips: 10.0.0.0/24
latest handshake: 1 minute, 9 seconds ago
transfer: 3.61 MiB received, 92.67 MiB sent

检查

在服务端查看主机路由信息:

1
2
3
# ip route show
default via 111.111.111.1 dev eth0
10.0.0.0/24 dev wg0 proto kernel scope link src 10.0.0.1

可以看到已经添加的 wireguard 网络默认路由是通过 10.0.0.1,所以我们需要开启地址转发功能:

1
2
3
#cat /etc/sysctl.conf
net.ipv4.ip_forward = 1
# sysctl -p

此时在服务端 ping 客户端 IP,也就是 10.0.0.2 ,检查状态:

1
2
3
4
# ping 10.0.0.2
PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
64 bytes from 10.0.0.2: icmp_seq=1 ttl=64 time=334 ms
64 bytes from 10.0.0.2: icmp_seq=2 ttl=64 time=335 ms

已经看到可以联通了。

整体上来说 Wireguard 配置上很简单,没有过多的配置文件,只是目前非 Linux 客户端都处于开发阶段,不推荐使用,尝鲜还是可以的。

配置文件示例

wg0

1
2
3
4
5
6
7
8
9
10
11
12
[Interface]
PrivateKey = 6G2jd1VVCRofQzkLxxxxxxxxxxxxx/3GJFTjzwxz62k=
Address = 10.0.0.1/24
PostUp = echo 1 > /proc/sys/net/ipv4/ip_forward; iptables -A FORWARD -i wg0 -j ACCEPT; iptables -A FORWARD -o wg0 -j ACCEPT; iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
PostDown = iptables -D FORWARD -i wg0 -j ACCEPT; iptables -D FORWARD -o wg0 -j ACCEPT; iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE
ListenPort = 11111
DNS = 8.8.8.8
MTU = 1420

[Peer]
PublicKey = nMUQDNdDMtMLslqmLpe/j9qqGNdJMxxxxxxxxxu5NjA=
AllowedIPs = 10.0.0.3/32

client.conf

1
2
3
4
5
6
7
8
9
10
11
[Interface]
PrivateKey = 8LnMNxNlvxZnnnBkMpd/hFl1xxxxxxxxxxW+5D4qUk=
Address = 10.0.0.3/24
DNS = 8.8.8.8
MTU = 1420

[Peer]
PublicKey = QWhJChuDG7HdNnSF4ximOoxxxxxxxxxxitl1mzIjo=
Endpoint = 114.187.221.217:11111
AllowedIPs = 0.0.0.0/0, ::0/0
PersistentKeepalive = 25

关于 On-Call

On-Call

要说具体的 On-Call,早在 2016年还在负责运维工作的时候,就是 On-Call 的状态,那时候虽然没有明确的规定,但是做得事情就是 On-Call。在年会之后,公司正式宣布执行 On-Call,并且我在上周又重新体会了下 On-Call,感觉涉及到的东西可太多了。

售后服务

在一家做 2B 市场的公司,销售所售卖的,或者说客户所购买的,远远不仅是你的软件。如果单论软件,那么有无数的公司产品比你牛,销售比你多,研发比你强,又怎么争得过呢。我理解 2B 公司最大的卖点是服务:软件安装了,产品运行了,业务上线了,之后呢?无穷尽的服务,小到一项报警项,大到节点故障,都是靠着服务堆起来的。

怎么才能服务好呢? 有一个良好的售后团队,或者说一个良好的售后体系支撑。因为我没有经历过太多的公司,只能就自己接触到的做 2B 公司的目前情况来说,没有见到那种让人眼前一亮的售后服务。

接触到的小公司售后团队(甚至一些国产2B 大厂)大多都是这么做的:

  1. 安装实施
  2. 定期巡检
  3. 故障处理

能做到上述 3 点尤其是第 2 点的并不多。我理解的 3 个阶段:
安装实施:了解客户环境,及时记录并沟通确定环境中不稳定的点,做好 PlanB,最终实施完成也要形成实施报告,无论是交付客户,还是公司内部之后的持续跟踪,都是只有好处没有坏处。

定期巡检:周期性与客户沟通进行巡检,很多客户在使用你的产品,其实他们是没有了解过多的使用上的内容的,毕竟产品说明书(使用文档)几百上千页,应该没有哪些真实用户会一点点的研究了解,大多只是出于使用上。那么这时就需要售后同学定期去进行巡检,帮助客户去发现问题,同时也是在教育用户去了解更多的产品细节。

故障处理:这里就涉及到今天聊得主题,当线上环境出现问题,怎么做?如何做?我理解的故障处理,无论 Bug 多难复现,无论操作多苛刻,都要第一时间恢复线上业务,如果业务不能恢复,其他一切免谈。

为什么 On-Call?

在我上述提到的 3 点中,1、2通常是由售后同学来完成,最重要的第 3 点通常是由研发同学修复。

在售后同学,遇到了线上故障:

  • 如何去了解这个故障的影响范围?
  • 如何准确全面的去提供这个故障相关信息?
  • 如何第一时间找到功能模块负责的相应研发同事?

上述问题在工作中经常遇到,在产品没有完全成熟前,需要售后同学对产品了解不仅限于使用,而需要更多的去了解产品内容,产品通过何种方式提供这个功能。

当然,在一个小公司中,我们无法要求更多,我们需要做的是,第一时间修复问题,恢复业务,于是有了 On-Call。

如何 On-Call?

轮值表:On-Call 的要求其实是一个售后岗位的基本职责,只是对于研发同学来说可能略微难过,7 24 小时 365 响应。具体落实下来就可能是以 7 * 24 为周期的轮值。这期间可能有很多要求,比如多久要接通电话,多久要接入远程等等。

故障时间线:可能一次故障设计到了多个功能模块的同学,那么我们要根据这个事故,编写文档,记录详细的时间线(当然我们现在做的并不好),至少可以让我们再进行任务交接时,了解到这个任务目前的状态,不至于一脸懵逼。永远保持这个事情处于跟踪状态。永远。哪怕是定了闹钟去提醒自己。

事故报告:之前没觉得事故报告的编写需要花费很多的时间,这周明显当头一棒,要想写好一份报告,大部分情况下需要花费的时间精力远远要比这次事故的解决方式多得多。这份报告不仅交付给客户,还能让我们针对后续的产品开发设计上避免踩坑。

后续工作:在恢复线上业务之后,我们针对该问题进行修复,无论是代码修复,还是文档修复,都需要多次验证,保证自己的代码是健全的(当然我往往考虑不周。。。)。

个人感受

在上周 On-Call 过程中,占用的时间远远超出预期,整整 2人天的时间。整个人精神紧绷,因为 On-Call 的事情永远是优先级最高的事情,你随时都面临着工作内容被打断的状态,也可能因为太久没有经历这种状态,导致我在 On-Call 期间本职工作的工作进度,工作效率都不能让自己满意,很多时候需要远离工位,进行彻底的放松,才能继续下去。
同时也让自己更多的注意代码内容,用更多的时间保证自己代码的稳定性是值得的,无论是对自己,还是对于其他同事都是一种负责的体现。

人,才是最大的 Bug。

参考链接

Python 调用 systemd watchdog 方法

systemd

在之前的博客中介绍过 systemd 的基本使用及通过 timer 来替换 crontab 的方法,今天来说一下如何调用 watchdog。

在 systemd 中,提供 watchdog 来检测服务状态状态,官方文档中描述这个功能为 “keep-alive ping”,我们可以在服务的启动配置中,添加 WatchdogSec 来指定 timeout 时间,在服务程序中通过发送 WATCHDOG=1 来不断的通知 systemd,服务处于正常状态,当超过 timeout 时间未收到 WATCHDOG=1 信号后,systemd 会根据 Restart 配置,决定是否自动重启服务。

示例

服务程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
root@yiran-30-250:/usr/lib/systemd/system
$ cat /root/project/watchdog/test.py
#!/usr/bin/python
# coding:utf-8
import os
import time
import socket
import logging

print("Test starting up...")
time.sleep(1) # 模拟执行真实业务
print("Test startup finished")

try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
addr = os.getenv("NOTIFY_SOCKET") # systemd default addr = "/run/systemd/notify"
if addr and addr[0] == "@":
addr = "\0" + addr[1:]
except Exception:
logging.error("Failed to get notify socket addr")

if sock and addr:
sock.connect(addr)
sock.sendall("READY=1") # 通知 systemd 服务启动完成

count = 0

while True:
print("Running...")
time.sleep(2)
if count <= 10:
sock.sendall("WATCHDOG=1") # 通知 systemd 服务正常运行
logging.info("Notify socket addr is:%s", addr)
logging.info("test.service watchdog timestamp update succeeded")
count += 1

服务 systemd 配置:

1
2
3
4
5
6
7
8
9
10
11
root@yiran-30-250:/usr/lib/systemd/system
$ cat test.service
[Unit]
Description=A test service written in Python

[Service]
Environment=PYTHONUNBUFFERED=true
ExecStart=/usr/bin/python /root/project/watchdog/test.py
Type=notify
Restart=always
WatchdogSec=5

我们启动服务观察下运行状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
root@yiran-30-250:/usr/lib/systemd/system
$ systemctl daemon-reload
root@yiran-30-250:/usr/lib/systemd/system
$ systemctl start test.service
root@yiran-30-250:/usr/lib/systemd/system
$ systemctl status test
● test.service - A test service written in Python
Loaded: loaded (/usr/lib/systemd/system/test.service; static; vendor preset: disabled)
Active: active (running) since 日 2019-03-10 10:16:39 CST; 21s ago
Main PID: 12202 (python)
Memory: 4.0M
CGroup: /system.slice/test.service
└─12202 /usr/bin/python /root/project/watchdog/test.py
# 在 Python 程序中,如果没有指定输出位置,会默认打到系统日志中
3月 10 10:16:41 yiran-30-250 python[12202]: Running...
3月 10 10:16:43 yiran-30-250 python[12202]: Running...
3月 10 10:16:45 yiran-30-250 python[12202]: Running...
3月 10 10:16:47 yiran-30-250 python[12202]: Running...
3月 10 10:16:49 yiran-30-250 python[12202]: Running...
3月 10 10:16:51 yiran-30-250 python[12202]: Running...
3月 10 10:16:53 yiran-30-250 python[12202]: Running...
3月 10 10:16:55 yiran-30-250 python[12202]: Running...
3月 10 10:16:57 yiran-30-250 python[12202]: Running...
3月 10 10:16:59 yiran-30-250 python[12202]: Running...

timeout 导致服务重启:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
root@yiran-30-250:/usr/lib/systemd/system
$ systemctl status test
● test.service - A test service written in Python
Loaded: loaded (/usr/lib/systemd/system/test.service; static; vendor preset: disabled)
Active: deactivating (stop-sigabrt) (Result: watchdog) since 日 2019-03-10 10:17:06 CST; 2ms ago
Main PID: 12202 (python)
Memory: 3.9M
CGroup: /system.slice/test.service
└─12202 /usr/bin/python /root/project/watchdog/test.py

3月 10 10:16:49 yiran-30-250 python[12202]: Running...
3月 10 10:16:51 yiran-30-250 python[12202]: Running...
3月 10 10:16:53 yiran-30-250 python[12202]: Running...
3月 10 10:16:55 yiran-30-250 python[12202]: Running...
3月 10 10:16:57 yiran-30-250 python[12202]: Running...
3月 10 10:16:59 yiran-30-250 python[12202]: Running...
3月 10 10:17:01 yiran-30-250 python[12202]: Running...
3月 10 10:17:03 yiran-30-250 python[12202]: Running...
3月 10 10:17:05 yiran-30-250 python[12202]: Running...
3月 10 10:17:06 yiran-30-250 systemd[1]: test.service watchdog timeout (limit 5s)!

journalctl 查看具体重启原因:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
3月 10 10:19:20 yiran-30-250 python[12303]: Running...
3月 10 10:19:22 yiran-30-250 python[12303]: Running...
3月 10 10:19:24 yiran-30-250 python[12303]: Running...
3月 10 10:19:26 yiran-30-250 python[12303]: Running...
3月 10 10:19:28 yiran-30-250 python[12303]: Running...
3月 10 10:19:29 yiran-30-250 systemd[1]: test.service watchdog timeout (limit 5s)!
3月 10 10:19:29 yiran-30-250 systemd[1]: test.service: main process exited, code=dumped, status=6/ABRT
3月 10 10:19:29 yiran-30-250 systemd[1]: Unit test.service entered failed state.
3月 10 10:19:29 yiran-30-250 systemd[1]: test.service failed.
3月 10 10:19:29 yiran-30-250 systemd[1]: test.service holdoff time over, scheduling restart.
3月 10 10:19:29 yiran-30-250 systemd[1]: Starting A test service written in Python...
3月 10 10:19:29 yiran-30-250 python[12324]: Test starting up...
3月 10 10:19:30 yiran-30-250 python[12324]: Test startup finished
3月 10 10:19:30 yiran-30-250 systemd[1]: Started A test service written in Python.

参考链接

Python 生成器使用

背景

在清理 Pocket 列表的时候,发现自己很早之前收藏过 dabeaz 在 2008 年 PyCon 关于生成器的 PPT 讲解,今天读完,有所收获。

在 PPT 中, dabeaz 通过一个具体的文件处理的例子,一步一步的讲解了程序的演进,具体代码可以在 Github 查看。

生成器

使用 yield 关键字的函数就是生成器。生成器在运行时生成值,所以只能迭代一次。生成器可以通过 next 关键字执行,通常我们通过 for 循环来迭代生成器,可以自动处理 StopIteration 情况。

一个简单的生成器例子:

1
2
3
4
5
6
7
8
def countdown(n):
while n > 0:
yield n
n -= 1
>>> for i in countdown(5):
... print(i, end=' ')
...
5 4 3 2 1

当我们调用生成器时,仅返回一个生成器对象,不会执行函数内容,只有当执行 __next__() 时函数才会真正执行。yield 会返回给调用者当前值,同时暂停执行,等待下一次调用 __next__() 继续执行。

协程

在 python 中通过生成器的方式来实现协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
In [7]: def recv_count():
...: try:
...: while True:
...: n = yield
...: print("T-minus", n)
...: except GeneratorExit:
...: print("boom")
...:
In [8]: r = recv_count()
In [9]: r.send(None) # init
In [10]: r.next() # yield 未接收到调用者发送具体值
('T-minus', None)
In [13]: r.send(1)
('T-minus', 1)
In [14]: r.send(2)
('T-minus', 2)
In [15]: r.send(3)
('T-minus', 3)

关于为什么要执行 r.send(None) ,可以看 PEP-342 中具体解释:

Because generator-iterators begin execution at the top of the generator’s function body, there is no yield expression to receive a value when the generator has just been created. Therefore, calling send() with a non-None argument is prohibited when the generator iterator has just started, and a TypeError is raised if this occurs (presumably due to a logic error of some kind). Thus, before you can communicate with a coroutine you must first call next() or send(None) to advance its execution to the first yield expression.

在此基础上进行扩展,我们写一个生产消费者模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
root@yiran-30-250:/tmp
$ cat a.py
#!/usr/bin/python3

def consumer():
r = ''
while True:
n = yield r
print('Consuming %s ...' % n)
r = '200 OK'
def producer(c):
c.send(None)
n = 0
while n < 5:
n += 1
print("Producing %s ..." % n)
r = c.send(n)
print("Consumer return: %s" % r)
c.close()
c = consumer()
producer(c)

root@yiran-30-250:/tmp
$ python3 a.py
Producing 1 ...
Consuming 1 ...
Consumer return: 200 OK
Producing 2 ...
Consuming 2 ...
Consumer return: 200 OK
Producing 3 ...
Consuming 3 ...
Consumer return: 200 OK
Producing 4 ...
Consuming 4 ...
Consumer return: 200 OK
Producing 5 ...
Consuming 5 ...
Consumer return: 200 OK

在 Python3.3 之后,添加了 yield from, asyncio 等关键字,在之后的博客中单独记录。

参考链接

Libvirt CPU 配置

Libvirt CPU 配置参数

我们先来看下 Libvirt 关于虚拟机 CPU 配置项:

  • match
  • check
  • mode
  • model

具体配置解释可以去 Libvirt 官方文档中查看,这里主要说一下 mode 参数,看一下 mode 具体含义及可选配置:

host-passthrough

Libvirt 通知 KVM 对 CPU 不做任何配置项修改,直通给虚拟机。因为虚拟机可以使用与物理主机相同 CPU 指令集,性能最好,相反,在虚拟机热迁移过程中,对目标主机 CPU 要求同型号同代。

host-model

本质上是根据物理主机 CPU 从 cpu_map.xml 文件中选择最匹配的 CPU 型号。由于CPU定义是在启动虚拟机之前复制的,因此可以在不同主机上使用完全相同的XML,同时仍然提供每个主机支持的最佳虚拟机 CPU。属于在功能与性能之间的平衡。

custom

不指定 mode 属性时的默认值。此模式使得无论虚拟机启动哪个主机,虚拟机都将看到相同的硬件,兼容性最好。

最佳选择

在不考虑虚拟机兼容性(热迁移)情况下,优先选择 host-passthrough ,综合考虑选择 host-model

OpenStack 中如果检测到 hypervisor 是 kvm-qemu ,则默认值为 host-model ;在 hypervisor 是其他类型时,默认值为 none,即由 hypervisor 自己选择。

我查看了自己两台 VPS 的 CPU 信息,如下:

Vultr

1
2
3
4
5
6
7
8
9
[root@vultr ~]# lscpu
Architecture: x86_64
厂商 ID: GenuineIntel
CPU 系列: 6
型号: 60
型号名称: Virtual CPU 71438xxxxxxx
超管理器厂商: KVM
虚拟化类型: 完全
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 syscall nx rdtscp lm constant_tsc rep_good nopl xtopology cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm invpcid_single pti fsgsbase bmi1 avx2 smep bmi2 erms invpcid xsaveopt arat

BWH

1
2
3
4
5
6
7
8
9
[root@bwh ~]# lscpu
Architecture: x86_64
厂商 ID: GenuineIntel
CPU 系列: 6
型号: 13
型号名称: QEMU Virtual CPU version (cpu64-rhel6)
超管理器厂商: KVM
虚拟化类型: 完全
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pse36 clflush mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm rep_good nopl pni pclmulqdq ssse3 cx16 pcid sse4_1 sse4_2 x2apic popcnt aes xsave avx f16c rdrand hypervisor lahf_lm fsgsbase smep xsaveopt

可以看到 BWH 的 CPU 比 Vultr CPU 指令集少了很多,而且 CPU 型号是 QEMU Virtual CPU version (cpu64-rhel6) 推断物理机 libvirt 配置是 custom ,果然便宜无好货。

参考链接