庄周梦蝶

生活、程序、未来

声明:本博客所有文章,未经允许,禁止转载。谢谢。

Pipeline based on clojure core.async

| Comments

Show me the code:

(require ' [ clojure.core.async :as async :refer :all ])
(defn input [ source ]
  (when source
    (<!! (source))))

(defn output [x]
  (go x))

(defn default-handler [x]
  (output x))

(defn default-processor [handler source]
  (when-let [x (input source)]
    (handler x)))

(defn pipeline-element [& opts]
  (let [{:keys [handler processor]
         :or {handler default-handler
              processor default-processor}} opts]
    (fn [ source ]
      (fn []
        (processor handler source)))))

(defmacro | [& fns]
  `(-> ~@fns))

Then create a pipeline to add line number for lines read from stdin:

(def producer ((pipeline-element :processor (fn [handler _]
                                                  (handler (read-line)))) nil))
(def line-filter (let [line (atom 1)]
                  (pipeline-element :handler (fn [x]
                                               (let [x (format "%5d %s" @line x)]
                                                 (swap! line inc)
                                                 (output x))))))

(def consumer (pipeline-element :handler (fn [x]
                                               (println x))))

(def x (| producer line-filter consumer))

Output:

user=> (x)
  #_=> hello world
    1 hello world
nil
user=> (x)
  #_=> again
    2 again
nil

core.async is really awesome!

声明:本博客所有文章,未经允许,禁止转载。谢谢。

Clojure, 并发

« 第4次CNClojure聚会上的演讲稿(AVOSCloud) Xmemcached和MetaQ发布新版本 »