本文是对 Victor Lavernko 在 Youtobe 上课程 EM algorithm: how it works 的总结和代码实践。
作者: @NaNg
首先, EM 算法是什么(what)呢?直接上一段 wiki 的介绍:
In statistics, an expectation–maximization (EM) algorithm is an iterative method to find maximum likelihood or maximum a posteriori (MAP) estimates of parameters in statistical models, where the model depends on unobserved latent variables.
从这段文字,我们可以看出来 EM 算法:
1. 用途是估计含有隐变量(latent variables)的统计模型参数。
2. 是一种基于迭代的方法,E 和 M 分别指 expectation 和 maximization。
让我们结合一个具体的例子来看一看它是怎么工作的(how):
假如我们有一系列观测值 $x_1 ... x_n$ ,它们来自于两个不同的且参数未知正态分布。我们的目标是利用这些观测值来对这两个分布的参数($\mu$, $\sigma$)进行估计。
我们可以在 Python 中对观测值进行模拟:
import numpy as np
# 假设真实的参数是这样的:
mu1, sigma1, n1 = 10, 10, 500
mu2, sigma2, n2 = 30, 5, 500
# 从分布1与分布2 产生观测值
X1 = np.random.normal(mu1, sigma1, n1)
X2 = np.random.normal(mu2, sigma2, n2)
# 把分布混合起来
X = np.hstack([X1, X2])
import matplotlib.pyplot as plt
import seaborn as sns
sns.distplot(X1, kde=True, rug=True)
sns.distplot(X2, kde=True, rug=True)
现在问题来了,假设我们并不知道$X$中的每个观测值来自于哪个分布,那么我们要怎样把这两个分布的参数估计出来呢?
不妨换一个方向来思考,假如说我们知道每个观测值来自于哪个分布,那么参数估计就会非常简单,算均值和方差嘛,稍微学过一点数学的都会:
$$ \mu_b = \frac{x_1 + x_2 +...+ x_n}{n_b} $$$$ \sigma^2_b = \frac{ (x_1-\mu_1)^2+...+(x_2-\mu_2)^2 }{ n_b } $$实际计算一下也很简单:
print("Estimated:")
print(f"mu1 = {X1.mean()}, sigma1 = {X1.std()}")
print(f"mu2 = {X2.mean()}, sigma2 = {X2.std()}")
print("Real:")
print(f"mu1 = {mu1}, sigma1 = {sigma1}")
print(f"mu2 = {mu2}, sigma2 = {sigma2}")
可以看到在已知观测来源的情况下估计的非常准确。但是如果我们不知道来源呢?
我们可以再反过来思考,假如我知道分布的参数($\mu$,$\sigma$),那么我们能否猜测出每个观测来自于哪个分布?
这就变成了一个概率计算的问题,比如 $x_i$ 来自于分布 $b$ 的概率 $P(b|x_i)$,我们可以直接套贝叶斯公式和高斯分布的概率密度函数来计算:
$$ P(b|x_i) = \frac{ P(x_i|b) P(b) } { P(x_i|b)P(b) + P(x_i|a)P(a) } $$$$ P(x_i|b) = \frac{1}{\sqrt{2\pi\sigma_b^2}} exp(-\frac{(x_i-\mu_b)^2}{2\sigma_b^2}) $$我们来实践一下:
from scipy.stats import norm
# 我们直接使用 scipy 中内置的概率目的计算函数
p_xb = norm.pdf(X, loc=mu2, scale=sigma2)
p_xa = norm.pdf(X, loc=mu1, scale=sigma1)
p_bx = p_xb / (p_xa + p_xb)
我们把每个 X 对应位置属于 分布 b 的概率画出来:
plt.scatter(x=X, y=p_bx, color='orange', alpha=.4)
plt.xlabel("x")
plt.ylabel("$P(b|x)$")
很好,除了交界处的一些值,基本上分的比较开。也就是说如果我们知道了分布的参数,每个观测属于来自于哪个分布是比较容易被计算出来的。
到这里我们似乎遇到了一种 "鸡生蛋,蛋生鸡" 的情形,估计分布参数需要知道每个观测的来源,而要对观测的来源进行计算又要依赖于分布的参数。要解决这个问题,我们可以采取一种迭代的策略,一开始先从一个不是很好的猜测开始,一步步地结合上面提到的两个方法对它进行优化,这就是 EM 算法的做法。
EM 算法:
我们来按照这个思路实现一下:
def Estep(X, mu1, sigma1, mu2, sigma2):
"""Expection: 计算概率,将观测分配给两个分布"""
p_xb = norm.pdf(X, loc=mu2, scale=sigma2)
p_xa = norm.pdf(X, loc=mu1, scale=sigma1)
p_bx = p_xb / (p_xa + p_xb)
X1 = X[p_bx < 0.5]
X2 = X[p_bx >= 0.5]
return X1, X2
def Mstep(X1, X2):
"""Maximization: 重新估计模型参数"""
mu1 = X1.mean()
sigma1 = X1.std()
mu2 = X2.mean()
sigma2 = X2.std()
return (mu1, sigma1), (mu2, sigma2)
def init_params(r_mu=(0, 50), r_sigma=(1, 20)):
"""指定范围,初始化估计"""
mu = np.random.randint(*r_mu)
sigma = np.random.randint(*r_sigma)
return mu, sigma
g1 = init_params()
g2 = init_params()
records_mu = [g1[0]]
records_sigma = [g1[1]]
max_iteration = 100
convergence_thresh = 1e-4
for _ in range(max_iteration):
x1, x2 = Estep(X, *g1, *g2)
g1, g2 = Mstep(x1, x2)
records_mu.append(g1[0])
records_sigma.append(g1[1])
if abs(g1[0] - records_mu[-2]) < convergence_thresh:
# 收敛,停止迭代
break
g1, g2 = sorted([g1, g2], key=lambda t:t[0])
print(f"mu1 = {g1[0]:.3f}, sigma1 = {g1[1]:.3f}")
print(f"mu2 = {g2[0]:.3f}, sigma2 = {g2[1]:.3f}")
可以看到,收敛以后得到的估计值和真实值还是比较接近的,当然由于我们的参数初时值是随机产生的所以如果重新运行得到的结果会有有所不同。如果把中间过程中的 $\mu$ 和 $\sigma$ 画出来:
fig, axes = plt.subplots(figsize=(12, 4), ncols=2)
axes[0].plot(records_mu)
axes[0].set_title("$\mu$")
axes[1].plot(records_sigma)
axes[1].set_title("$\sigma$")
我们可以看到随着迭代的进行,参数不断向真实值靠近直到收敛的过程。 通过这个非常简单的一维混合分布模型我们能够了解 EM 算法是如何工作的,总的来说思路大概是这样的:当我们要使用 EM 算法根据观测数据对一个包含隐变量的模型进行参数估计,首先产生一个随机或近似的估计值,然后利用这个近似的参数产生出当前状态下的似然(likelihood)函数,这一步称为 E-step。然后根据 E-step 的结果重新估计出使得当前似然函数达到最大(Maximization)的参数。这一步称之为 M-step。如此重复 E-step 和 M-step 直到估计参数达到稳定。
from graphviz import Digraph
dot = Digraph(comment='EM algorithm')
dot.graph_attr['rankdir'] = 'LR'
dot.node('P', "Parameters")
dot.node('L', "Likelihood")
dot.edge('P', 'L', label="Expectation")
dot.edge('L', 'P', label="Maximization")
dot