庄周梦蝶

生活、程序、未来

Pipeline based on clojure core.async

| Comments

Show me the code:

(require ' [ clojure.core.async :as async :refer :all ])
(defn input [ source ]
  (when source
    (<!! (source))))

(defn output [x]
  (go x))

(defn default-handler [x]
  (output x))

(defn default-processor [handler source]
  (when-let [x (input source)]
    (handler x)))

(defn pipeline-element [& opts]
  (let [{:keys [handler processor]
         :or {handler default-handler
              processor default-processor}} opts]
    (fn [ source ]
      (fn []
        (processor handler source)))))

(defmacro | [& fns]
  `(-> ~@fns))

Then create a pipeline to add line number for lines read from stdin:

(def producer ((pipeline-element :processor (fn [handler _]
                                                  (handler (read-line)))) nil))
(def line-filter (let [line (atom 1)]
                  (pipeline-element :handler (fn [x]
                                               (let [x (format "%5d %s" @line x)]
                                                 (swap! line inc)
                                                 (output x))))))

(def consumer (pipeline-element :handler (fn [x]
                                               (println x))))

(def x (| producer line-filter consumer))

Output:

user=> (x)
  #_=> hello world
    1 hello world
nil
user=> (x)
  #_=> again
    2 again
nil

core.async is really awesome!

A middleware to merge routes in compojure

| Comments

有时候,我们可能定义很多个不同的route,有的可能有context,有的没有,有的是动态请求,有的是静态请求,那么就有组合route的需求,利用compojure的routing函数即可做到:

 (use 'compojure.core)
 (defn merge-routes [& handlers]
     (fn [req]
         (apply routing req handlers)))

使用:

(defroutes api-routes
    (context "/v1" [] our-routes))
(defroutes static-routes
    (route/resources "/console"))
(def app
    (handler/site (merge-routes api-routes static-routes) ......))

发布MetaQ 1.4.4

| Comments

沉寂很久,终于可以发布MetaQ 1.4.4,要感谢很多朋友,在此不一一列举,非常感谢支持。

MetaQ 1.4.4更新日志

1.配置变更:

  • 添加新选项stat,可设置全局是否开启实时统计,默认为false

  • 添加新选项updateConsumerOffsets,当消费者的offset不在Broker的数据范围内,是否强制更新消费者的offset为当前最大offset。默认为false。测试开发环境建议开启此选项,生产环境不建议。

  • 添加新选项loadMessageStoresInParallel,是否在启动的时候并行加载数据,提高启动速度,默认为false。

2.协议变更:

  • put协议添加checksum,服务端将根据checksum校验producer发送的消息数据是否完整。

  • sync协议新增checksum,类似put

  • stats协议新增reset和help命令,分别用于重设实时统计数据和打印帮助信息。

3.易用和稳定性改进:

  • 新增local模式启动服务器,它将启动一个内置的zookeeper服务器并启动metaq服务器,方便用户测试开发。生产环境仍然建议用单独的zookeeper集群。./metaServer.sh start local

  • 添加服务端启动时的配置参数校验,帮助检查配置错误。

  • metaServer.sh新增slave-status命令,用于查看异步复制状态。

  • 更优雅地关闭服务器,避免数据损坏。

  • 当服务器端列表变动的时候,尽量避免producer发送消息异常。

  • 当同步复制的slave服务端无法服务时,master服务器将尽快返回失败异常给producer。

  • 改进consumer的负载均衡实现,更快速和稳定地实现负载均衡。

  • 修复producer可能出现的连接内存泄漏。

4.性能改进:

  • 改进group commit实现

5.新功能,主要改进在Java客户端的API上:

  • Java客户端新增RejectConsumptionHandler接口,当消费消息尝试多次失败后将调用此接口的rejectConsumption方法处理。

  • Java客户端MessageConsumer接口新增setRejectConsumptionHandler方法用于设置拒绝消费处理器。

  • Message对象新增readOnly属性,可设置消息是否只读。在PartitionSelector的getPartition方法中,消息将强制为只读。

  • MessageListener接口的recieveMessages方法可抛出InterruptedException的受控异常,响应中断。当中断发生时正在消费的消息将被回滚。

6.其他:

  • 升级gecko到1.1.2版本,修复可能存在的并发错误。

7.升级指南:

  • 1.4.3版本的服务端和客户端,可直接升级到1.4.4,保持兼容;1.4.4之前版本的服务端和客户端,需首先升级服务端,等服务端完全升级完毕之后,才可开始升级客户端。

下载和文档

Java客户端升级

使用Maven很简单:

<dependency>
    <groupId>com.taobao.metamorphosis</groupId>
    <artifactId>metamorphosis-client</artifactId>
    <version>1.4.4</version>
</dependency>

新版delicious上线遇到的几个教训

| Comments

重新设计的delicious.com已经上线,好坏评价都有,俺不参与。 这次上线遇到几个问题,我感觉值得小小总结下,给自己提个醒,以后不能犯这样的错误。

首先是,容量规划很重要。我们的后端系统严重依赖zookeeper集群,但是zookeeper集群只有3台。我们都知道zookeeper可用的前提是过半的node是正常work的,那么3台机器只允许当掉一台,5台机器就允许当掉两台,以此类推。因此通常会推荐将zookeeper的集群设置为奇数台,因此如果希望能在当掉N台机器仍然保证集群可用,那就必须部署一个2N+1的集群。Delicious.com这个zk集群可能是临时搭建的,并没有考虑到容灾的情况,不过3台至少还是能保证一定程度的稳定性,更大的问题在于我们的机器配置用的还是AWS的micro机型,512M内存的机器,导致每个zookeeper的JVM不能开更大的内存(后面我们还发现原来连JVM参数都没有设置,用的还是默认的堆大小),在后续服务陆续添加后,导致zookeeper的压力很大,full gc非常频繁,从而导致client connection频繁超时断开,恶性循环下,整个服务几乎不可用。而后台服务不可用的后果,就是整个网站的可用性也没有了。用户怨声载道。 最终,我们还是不得不临时扩容,增加zookeeper集群到5台,并且将机器型号升级到更大内存和硬盘。目前zookeeper 3.4的升级还不支持热部署,只有通过停机维护来实现。整个过程还由于其他一些意外因素拖延了更久。这个过程给我的教训就是,上线前最好对各个系统有个大概的评估,不能简单地信赖别人,如果稍微注意下zookeeper集群的机器型号等信息,也许我们不会犯下这么低级的错误。

其次一个严重的问题是代码上的BUG,现象是访问个人的profile页面,却看到别人的信息。聪明点的开发者肯定能猜到原因是什么,我这里卖个关子。像这样的BUG,我想主要是因为新系统的开发者对某些信息做了先入为主的假设,没有调查遗留代码而是为了简便就做出决定。也给我提个醒,在做老系统的迁移过程中,要特别注意遗留代码的逻辑和模型,而不是想当然。

其他一些问题,不好在这里讨论,不过我想,应该改变自己的心态,抱怨问题是没有用的,还是要行动起来,做一些力所能及的工作。

PS. Delicious.com的Solr升级到Solr 4.0 Cloud,一个月运行下来还是很稳定的。并且Solr Cloud可以容忍zookeeper集群的意外当机,保留上一次稳定的状态,在这次故障中,表现出乎意料的好。

发布xmemcached 1.3.9

| Comments

Xmemcached距离上次发布接近半年,终于可以发布一下1.3.9版本,主要的改进如下:

  1. 添加resetStats方法到统计MBean,用于清零统计数据。
  2. 添加setEnableHealSession方法到XmemcachedClient和Builder,用于启用或者禁止连接修复(当连接意外断开的时候,默认仍然是启用)。
  3. 优化二进制协议的set操作,通过批量合并操作,在测试中set性能有接近36%的提升。
  4. 修复自定KeyProvider无法与incr方法同时工作的bug。
  5. 修复客户端统计不准确。
  6. 减少在高负载环境下的内存消耗,加快垃圾回收。
  7. 添加了一个新的环境变量:xmemcached.selector.pool.size用于设置网络层的reactor数目,默认为CPUsx2
  8. 同时限制文本协议和二进制协议的key大小为250字节。

如果你在使用maven等构建工具,可以通过下列依赖来使用1.3.9版本:

<dependency>
  <groupId>com.googlecode.xmemcached</groupId>
  <artifactId>xmemcached</artifactId>
  <version>1.3.9</version>
</dependency>

或者你可以从googlecode下载: https://code.google.com/p/xmemcached/downloads/list

Compojure的辅助宏(续)

| Comments

前一篇文章提到Compojure的几个辅助宏defhandler,def-signin-handler和check-params,定义起handler function变得方便了不少,但是每次用check-params写参数检查之类的前置条件还是比较繁琐,例如:

(defhandler signin [username password]
  (check-params
    (empty? username) "Username is required."
    (empty? password) "Password is required."
    :else
    (do-validation...)))

每次都要这么写check-params仍然是比较郁闷的事情,其实我们完全可以将这些前置的检查条件作为handler的元信息附加进去,我们都知道Clojure的函数支持契约式编程,通过指定:pre:post就可以设定函数的前置和后置条件,一个简单例子:

(defn square [x]
     {:pre [(number? x)]
      :post [(pos? x)]}
     (* x x))

:pre是前置条件,要求x必须为数字,:post为后置条件,要求返回结果为正数(其实这个条件是错误的,为啥?),测试看看:

user=> (square "a")
AssertionError Assert failed: (number? x)  user/square (NO_SOURCE_FILE:6)
user=> (square 0)
AssertionError Assert failed: (pos? x)  user/square (NO_SOURCE_FILE:12)
user=> (square 3)
9

可以看到前置条件和后置条件都起作用了。内置于语言的契约式编程,让Clojure程序可以做到更健壮。

回到原来的话题,同样,我们也希望defhandler的这些前置条件也可以类似函数的“契约”那样配置,如果不满足前置条件就返回失败,前置条件都满足才执行handler的body部分,这就需要对defhandler宏做一个改造,将check-params内置进来。具体的改造过程不说,我还是直接给结果,这个宏整体的结构是跟defn这个核心宏是一样的,首先我们将原来的defhandler宏改名为defhandler0,新定义一个宏称为defhandler:

(defmacro defhandler0
    [name args & body]
    `(defn ~name [req#]
        (let [{:keys ~args :or {~'req req#}} (:params req#)]
        ~@body)))   
(defn has-pre? [fdecl]
    (and (map? (first fdecl)) (:pre (first fdecl))))
(def defhandler (fn defhandler [&form &env name & fdecl]
                  (let [m (if (string? (first fdecl))
                            {:doc (first fdecl)}
                            {})
                        m (if (map? (first fdecl))
                            (conj m (first fdecl))
                            m)
                        m (conj (if (meta name) (meta name) {}) m)
                        fdecl (if (map? (first fdecl))
                                (next fdecl)
                                fdecl)
                        args (if (vector? (first fdecl))
                               (first fdecl)
                               [])
                        fdecl (if (vector? (first fdecl))
                                (next fdecl)
                                fdecl)
                        pre (when (has-pre? fdecl)
                              (:pre (first fdecl)))
                        fdecl (if (has-pre? fdecl)
                                (next fdecl)
                                fdecl)]
                    (if pre
                      (list `defhandler0 (with-meta name m) args (concat (cons `check-params
                                                                          pre)
                                                                    (cons :else fdecl)))
                      (list* `defhandler0 (with-meta name m) args fdecl)))))
(.setMacro (var defhandler))

macroexpand下,可以看出来原理很简单,就是检测有没有:pre条件,有的话将这些条件放到check-params里,将body放到check-params的:else部分。

那么,现在前面的signin handler简化为:

(defhandler signin [username password]
  {:pre [ (empty? username) "Username is required."
          (empty? password) "Password is required."]}
  (do-validation...))

将参数检查这些前置条件都转移到:pre里面,核心的body只处理正常的逻辑,整体看起来清晰顺眼。defhandler不仅可以定义:pre,其实也可以关联metadata和doc:

(defhandler signin "Signin handler" [username password] ...)
(defhandler signin {:doc "Signin handler" :added "0.1"} [username password] ...)

也就是跟defn功能类似了。眼尖的朋友可能会注意到,defhandler宏其实有个隐患,如果参数vector后面是一个map,并且包含:pre就会被认为是前置条件,那么handler正常的返回结果就不能同时是个map并且包含:pre,这是个小小的限制。

前一篇文章提到的def-signin-handler的改造也与此类似,两者的代码重复怎么消除,就请读者自行尝试吧。

我的编程语言选择

| Comments

最近修改了我的个人简介,说到我最主要的编程语言是Clojure/Java/Ruby。这不是假话,最近我经常用的就是这三门语言。从使用上,我想谈谈他们的区别。

对于Clojure,我们公司已经用它开发了不少产品,比如过去的美味集的后端Web Service就是用Clojure写的,最近才放出来让大家看见的美味爱读和新的美味书签,更是从零开始的Clojure项目。Clojure的优点我就不多费唇舌了,它的高生产力有目共睹;使用好了是一把锋利的瑞士军刀,用不好也伤人伤己。我对它的定位介于Ruby和Java之间,说它轻量级吧,语言非常轻量级,核心小,写出来的代码少,但是却跑在重量级的JVM上,一个启动时间就能让你吐血。说它重量级吧,人家写出来的代码确实精炼优雅,Java跟它比起来太累赘繁琐。我在微博上看到,假设真有一天一行代码一块切糕的话,我估计Java程序员是最早发达的。Clojure语言轻量级,平台重量级,那么作为Java语言的补充甚至替代,完全胜任。特别是在最常见的90%的开发任务上,用Clojure都绝对比Java开发效率更高,同时能维持住性能上的需求。

那么剩下的10%是些什么样的任务呢? 我总结下,主要是这么几类。首先是网络通讯,跟IO打交道的这一块,用Clojure去写一个Netty或者Mina,不是不可能,但是很难做到高性能和简洁之间的平衡。原因在于IO本身是一种副作用,而Clojure将副作用隔离的方式(Ref,Agent,Atom,Binding etc.) 很难——如果不是不能,做到高效处理。而且在表达上,与命令式的IO操作不匹配。IO上的操作,都很适合用命令式语言的方式来表达,包括协议解析这一块用clojure写也相当痛苦,不过这块更多是我想讲的第二点。第二,算法型的应用,因为大多数算法的描述和衡量都是基于冯.诺伊曼模型的,时间复杂度和空间复杂度的表述也都是命令式的,用Clojure实现不是完全不可能,但是仍然是相当困难的,你需要转换思维并仔细衡量不可变模型带来的代价。因此我通常会比较建议将算法的核心用Java写,然后Clojure调用之。第三,部分并发应用。首先是STM模型,虽然它可以让你避免死锁、活锁等并发编程的常见陷阱,但是同时它需要保存历史记录,需要重试,这个代价在事务冲突严重的时候会凸显出来。其次,仍然有一些场景我们仍然需要锁来保证一致性,特别是一些涉及到外部资源访问的时候,这些资源如果不支持重试,那只有利用锁来做同步。最后,java.util.concurrent的包实在太优秀,你不能忽略它。这样的并发应用,也许你可以仍然使用clojure来调用java API,但是这跟Clojure的哲学不完全一致,有时候仍然是用Java写的比较方便。其实上面谈到的,更本质上的应该是函数式编程的问题,而非仅仅是Clojure的问题。

我什么时候用Ruby呢?写脚本的时候,比如干一些数据库导入导出、solr索引更新的时候,我就想到了Ruby。因为写这类程序,基本上是写一段,跑一下,看看是否ok,结果正确就再前进一步,最终来搞定整个事情。在这个过程中,Ruby丰富的类库和友好的API,可以帮助我快速的试验试错,并且Ruby 1.9的启动实在是比JVM好太多了。写这种脚本程序,启动速度是一个很关键的考量,因为你要不断地启动、运行、查看结果。也许你会说Clojure REPL也可以啊,但是REPL还是不适合这种增长式的编程,也不是一个舒适的编程环境。Ruby可以写的非常性感,Rails就是一个证明,不过因为我写的都些一次写就就抛弃的程序,对这块倒没有什么讲究。有一个我很尊敬的同事,他写的Ruby脚本就比我更有DSL的感觉。强烈感觉我应该去更新下我的Ruby技能。

最后是老本行Java了,除了美国那边的项目和我的一些开源项目(如MetaQxmemcached)外,我几乎都不碰Java了。虽然有IDE帮忙,但是Java的整个生态圈都太重量级了,过去号称轻量级的Spring在我看来都太重量级了,还有maven这样怪兽级的构建工具(需要整整一本书来讲解,看看leiningen吧)。回头看,Java整个生态圈过度工程化的倾向太严重了,什么东西都是正儿八经、正襟危坐着,从来没有一种轻松编程的氛围,每个人搞的都是牛B的庞大项目。而其实,我想要的是只是一个CRUD网站。我想这套东西都是因为Java一开始替代的的是COBOL这样的语言,更多地想在企业级应用上发力,导致整个社群的文化都是严肃方正的。

那么未来呢?列了个编程语言学习计划,挑选的出发点都是从开拓不同的思维出发,如果一门语言没有带来新的启迪甚至范式,那是完全不值得学习的。从个人角度讲,我比较看好Go和R,前者语言上中规中矩,但是能吸引传统写C/CPP的系统程序员,以及想向底层发展的Java码农的目光;后者是因为data mining的需求热潮,会有相当长期和稳定的需求,但是需要很牢固的统计和数学基础。而另一个有趣的现象是Node.js社群的活跃程度,前端们通过node.js找到了写后端鄙视后端码农们的黄金机会,创作能量都可以将世界末日推迟。

Compojure的辅助宏

| Comments

Compojure是clojure世界的web MVC框架事实上的标准,它使用很简单,核心的概念就一个routes,比如一个hello world级别的例子:

(ns hello-world
    (:use compojure.core)
    (:require [compojure.route :as route]))

(defroutes app
    (GET "/" [] "<h1>Hello World</h1>")
    (route/not-found "<h1>Page not found</h1>"))

它支持RESTFul风格的route设计,例如:

(GET "/users/:uid" [uid] (get-user uid))
(POST "/users" [name email] (add-user name email))

如果route和add-user,get-user都是同一个namespace的,这一切都还ok,只不过你需要写3次参数列表,两次在route定义的地方,一次在这些函数定义的地方,或者你也可以将这些函数直接写这routes定义的地方,但是这样看起来就不大爽利,更大的问题是不利于模块的清晰划分。当handler function比较多的时候,拆分namespace是很自然的选择,那么你的代码可能是这样:

(ns user)
(defn get-user [uid] ...)
(defn add-user [name email] ...)

(ns handler)
(GET "/users/:uid" [uid] (user/get-user uid))
(POST "/users" [name email] (user/add-user name email))

仍然需要在定义handler function和route的地方将参数列表写上三遍。或者你可以只写一次,这时候handler function默认接受一个参数http request,然后自己解析参数,也许是这样:

(ns user)
(defn get-user [req]
    (let [uid (-> req :params :uid)]
    ...)
(defn add-user [req]
    (let [name (-> req :params :name)
          email (-> req :params :email)]
     ...))

(ns handler)
(GET "/users/:uid"  [] user/get-user)
(POST "/users" [] user/add-user)

这样一来,route定义的地方得到了简化,但是handler function却变的复杂起来,需要解析参数,也许可以利用destructring来自动解构函数,但是仍然不够直观。

接下来,我们就让一个宏来帮我们解决问题吧,希望既能少写代码,又能跟过去一样直观:

(defmacro defhandler
    [name args & body]
    `(defn ~name [req#]
        (let [{:keys ~args :or {~'req req#}} (:params req#)]
        ~@body)))

这个宏很简单,调用defhandler最终会调用defn生成一个handler function,只不过我们利用destructring帮我们多做了点工作:将args的参数列表自动跟(:params req)匹配起来,同时,如果args里有req这个名称的参数,我们将它关联到实际的request对象,这时候定义get-user,add-user变的和以前一样直观:

(ns user)
(defhandler get-user [uid] ...)
(defhandler add-user [name email] ...)    

(ns handler)
(GET "/users/:uid"  [] user/get-user)
(POST "/users" [] user/add-user)

注意到,我们只是用defhandler替换了defn,其他都没有改变,route定义保持简单和直观。

更进一步,有的handler function需要登录才能使用,我们可以定义一个def-signin-handler,要求request必须有cookie(这个宏的原始版本属于我的同事孙宁):

(defn fail [msg] ...render error message)
(defmacro def-signin-handler [name args & body]
    `(defn ~name [req#]
        ;;这里只判断cookie是否为nil,实际应用还需做合法性校验,防止伪造
        (if (not (nil? (-> req# :cookies (get "my_cookie"))))
            (let [{:keys ~args :or {~'req req#}} (:params req#)]
                ~@body)
            (fail "User doesn't sign in."))))

接下来你可以利用def-signin-handler定义需要登录的handler function,它会自动判断用户是否登录(通过cookie),然后决定是否继续执行。

最后,奉上一个基于cond改造的参数校验宏:

(ns my-ns.util)
(defn fail [msg] ...render error message)
(defmacro check-params
  [& clauses]
  (when clauses
    (list 'if (first clauses)
          (if (next clauses)
            (if (next (next clauses))
              (fail (second clauses))
              (second clauses))
            (throw (IllegalArgumentException.
                    "check-params requires an even number of forms")))
          (cons 'my-ns.util/check-params (next (next clauses))))))

使用例子:

(defhandler signup [username password]
   (check-params
     (empty? username) "User name is required."
     (empty? password) "Password is required."
     :else
     (do ...check user name password and return result.)))