编程小记: bug、clojure 状态和 paxos
一个 Bug
前段时间观察我们 API 系统的 hystrix 监控,一直发现一个函数 cache/add
的调用特别的高,在整个集群范围内高峰的时候接近 3 到 4 万的 QPS,跟其他指标比起来非常的碍眼,极不正常。
抽了点时间专门调查了下,原来是不小心掉进去了 hystrix request cache 的坑里。
Hystrix Request Cache 的原理很简单,在同一个 RequestContext 里,对某个 command 调用同样的参数,第一次调用的结果将被缓存,后续的对同样参数的请求将直接返回第一次的结果,通过内存换效率,类似 clojure 的 memoize。
简单例子:
(require '[com.netflix.hystrix.core :refer [defcommand with-request-context]]))
(def call-times (atom 0))
(defcommand myinc
{:hystrix/cache-key-fn (fn [i] (str i))}
[i]
(swap! call-times inc) ;;统计调用次数
(+ 1 i))
(with-request-context
;;调用了两次 myinc
(myinc 1)
(myinc 1))
(println @call-times) ;; call-times 只统计了一次调用。
业务代码里有一段逻辑大概是这样:
(def get-or-create [k nv]
(if-let [v (get-value k)]
v
(if-not (add k nv)
(recur k nv)
nv)))
其中 get-value
是一个 hystrix command 设置了 cache-fn
启用了请求缓存。这段代码是尝试先从缓存里加载 k 对应的值,如果没有,就将 nv 存储到 k 键上,如果 add 存储成功,返回 nv,如果 add 失败,循环重试(表示有其他人 add 成功,我们可以重试 get-value)。
问题就出在 recur
循环上,因为 get-value
启用了请求缓存,那么循环调用 get-or-create
的时候因为仍然在同一个 RequestContext 里,导致 (get-value k)
一直为空,但是接下来的 add
也继续失败,不停地 recur
循环。后果就是 get-value
和 add
都被无限调用,并且耗费了大量 CPU。
解决起来也简单,在 recur 之前清空请求缓存即可:
(defn invalidate-get-cache [k]
(..
HystrixRequestCache
(getInstance (get-command-key)
(HystrixConcurrencyStrategyDefault/getInstance))
(clear k)))
(def get-or-create [k nv]
(if-let [v (get-value k)]
v
(if-not (add k nv)
(do
;;清空 get-value 请求缓存
(invalidate-get-cache k)
(recur k nv))
nv)))
volatile! 和 local.var
在 Clojure 1.7 之前,为了保存一个可变的状态,你的大部分选择是 atom,除非为了 STM 协作事务才使用 ref。但是 atom 严格的原子性导致它的效率在简单的场景里就不是特别合适,比如我只是保存一个局部的可变状态,它只是在局部范围内可变,收集或者统计一些状态,不会发布到外面,完全没有必要保证严格的原子性。还有配置型的全局状态,接近只读。
因此 Clojure 1.7 为了改善 transducer 的实现效率引入了新的可变状态保存器—— volatile,它的语义与 Java 里的 volatile 完全一样,仅保证可见性,不保证原子性:
(def val (volatile! 0))
@val
;;=> 0
(vswap! val inc)
;;=> 1
(vreset! val "nothing")
;;=> "nothing"
@val
;;=> "nothing"
不保证原子性的意思就是 (vswap! val inc)
这个递增调用在多线程环境下会产生不同步的结果。
在一些不需要原子操作的场合就非常适合替代 atom 了,比如全局状态、局部状态等。
但是,其实呢,这还不够, volatile 本身仍然有可见性的严格要求,每次读取都强制从 main memory 读取最新的值,如果我只是局部变量在用,或者完全不需要同步的场合里,一个更轻量级的状态保存器是有必要的。因此我写了个 local.var。它就更简单了,只是一个 Object 里保存了一个 value 值,没有任何同步的语义:
(require '[local.var :refer [transient-var! ltv-reset! ltv-swap! transient-var?]])
(let [sent (transient-var! false)]
;;send emails to client
;;......
(ltv-reset! sent true)
(if @sent
(println "Sent email successfully!")
(println "Sent email failed.")))
(def x (transient-var! 1))
@x ;; => 1
(deref x) ;; => 1
(ltv-reset! x 99) ;; => 99
@x ;; => 99
(ltv-swap! x inc) ;; => 100
(transient-var? x) ;; => true
@(future (ltv-reset! x 100)) ;; => IllegalAccessError Local transient var used by
;; non-owner thread local.var/ltv-reset!
并且类似 transient 集合那样,加了 Thread Owner 的保护,避免被多线程修改。
Paxos
最近连读了几篇一致性算法的论文。 Paxos 琢磨的最多,毕竟它没有像 RAFT 那样有清晰明确的算法步骤,围绕它的解释也有一大堆论文,made simple, made live, made pratical,乃至于要 made crazy 了。
这里稍微总结下我的理解。
第一, Paxos 解决什么问题? 简单地说就是在多个参与者的情况下确定一个值,并且这个值是唯一的。少数服从多数,超过一半的参与者确定的值,就可以代表整个群体的的确认值。中央决定了,就你来当领导。
第二,为了达到这个目标应该怎么做?我们分解下步骤:
- 每个人提出一些提议,以供大家选择,这称为 proposal 阶段。
- 每个人收到他人的提议,决定要不要接受,产生一个选择集。
- 在选择集合中确定唯一的一个,并让所有人知道。
对应到 paxos 过程就是其中的 proposal、accept 和 learn 三阶段。Proposal 阶段产生提议,结合 accept 阶段来确定唯一的值,最终 learn 阶段通知这个确定结果给所有参与者。
为了让选择收敛唯一,又引入了一个 MaxVote 机制,每一轮投票选择的值都是上一轮确定的最高提议编号的值,如果没有,则任意选择一个新值。哪怕有冲突导致多轮投票,确定后的值却不会变。
Paxos instance 是确定一个值,那么 multi paxos 就是确定多个值的过程。为了避免冲突频繁提升提议编号,加速达成一致的效率, multi paxos 自然而然地要求产生一个 leader proposer ,它来产生一系列提议并赋予编号,还可以缩减 prepare 阶段。
更进一步,我们在谈论值,那么值到底是什么? 结合实际的工程项目,需要跟 Replication State Machine 结合起来,简单来讲,值就是日志,paxos 的过程就是要决定一系列日志的顺序在所有参与机器之间保持一致,那么一致顺序的日志回放加上状态机变迁,我们可以让所有参与者里的状态机状态保持一致,也就是达到了在机器之间复制状态机的目的,这就是我们工程上想要的一致性。