|
1 | 1 | <!DOCTYPE html PUBLIC ""
|
2 | 2 | "">
|
3 |
| -<html><head><meta charset="UTF-8" /><title>clojure.core.async.flow documentation</title><link rel="stylesheet" type="text/css" href="css/default.css" /><link rel="stylesheet" type="text/css" href="css/highlight.css" /><script type="text/javascript" src="js/highlight.min.js"></script><script type="text/javascript" src="js/jquery.min.js"></script><script type="text/javascript" src="js/page_effects.js"></script><script>hljs.initHighlightingOnLoad();</script></head><body><div id="header"><h2>Generated by <a href="https://github.com/weavejester/codox">Codox</a></h2><h1><a href="index.html"><span class="project-title"><span class="project-name"></span> <span class="project-version"></span></span></a></h1></div><div class="sidebar primary"><h3 class="no-link"><span class="inner">Project</span></h3><ul class="index-link"><li class="depth-1 "><a href="index.html"><div class="inner">Index</div></a></li></ul><h3 class="no-link"><span class="inner">Topics</span></h3><ul><li class="depth-1 "><a href="flow.html"><div class="inner"><span>core.async.flow</span></div></a></li></ul><h3 class="no-link"><span class="inner">Namespaces</span></h3><ul><li class="depth-1 "><a href="clojure.core.async.html"><div class="inner"><span>clojure.core.async</span></div></a></li><li class="depth-1 current"><a href="clojure.core.async.flow.html"><div class="inner"><span>clojure.core.async.flow</span></div></a></li><li class="depth-1 "><a href="clojure.core.async.flow.spi.html"><div class="inner"><span>clojure.core.async.flow.spi</span></div></a></li></ul></div><div class="sidebar secondary"><h3><a href="#top"><span class="inner">Public Vars</span></a></h3><ul><li class="depth-1"><a href="clojure.core.async.flow.html#var-create-flow"><div class="inner"><span>create-flow</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-futurize"><div class="inner"><span>futurize</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-inject"><div class="inner"><span>inject</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-lift*-.3Estep"><div class="inner"><span>lift*->step</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-lift1-.3Estep"><div class="inner"><span>lift1->step</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-pause"><div class="inner"><span>pause</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-pause-proc"><div class="inner"><span>pause-proc</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-ping"><div class="inner"><span>ping</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-ping-proc"><div class="inner"><span>ping-proc</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-process"><div class="inner"><span>process</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-resume"><div class="inner"><span>resume</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-resume-proc"><div class="inner"><span>resume-proc</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-start"><div class="inner"><span>start</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-stop"><div class="inner"><span>stop</span></div></a></li></ul></div><div class="namespace-docs" id="content"><h1 class="anchor" id="top">clojure.core.async.flow</h1><div class="doc"><pre class="plaintext"> |
| 3 | +<html><head><meta charset="UTF-8" /><title>clojure.core.async.flow documentation</title><link rel="stylesheet" type="text/css" href="css/default.css" /><link rel="stylesheet" type="text/css" href="css/highlight.css" /><script type="text/javascript" src="js/highlight.min.js"></script><script type="text/javascript" src="js/jquery.min.js"></script><script type="text/javascript" src="js/page_effects.js"></script><script>hljs.initHighlightingOnLoad();</script></head><body><div id="header"><h2>Generated by <a href="https://github.com/weavejester/codox">Codox</a></h2><h1><a href="index.html"><span class="project-title"><span class="project-name"></span> <span class="project-version"></span></span></a></h1></div><div class="sidebar primary"><h3 class="no-link"><span class="inner">Project</span></h3><ul class="index-link"><li class="depth-1 "><a href="index.html"><div class="inner">Index</div></a></li></ul><h3 class="no-link"><span class="inner">Topics</span></h3><ul><li class="depth-1 "><a href="flow.html"><div class="inner"><span>core.async.flow</span></div></a></li></ul><h3 class="no-link"><span class="inner">Namespaces</span></h3><ul><li class="depth-1 "><a href="clojure.core.async.html"><div class="inner"><span>clojure.core.async</span></div></a></li><li class="depth-1 current"><a href="clojure.core.async.flow.html"><div class="inner"><span>clojure.core.async.flow</span></div></a></li><li class="depth-1 "><a href="clojure.core.async.flow.spi.html"><div class="inner"><span>clojure.core.async.flow.spi</span></div></a></li></ul></div><div class="sidebar secondary"><h3><a href="#top"><span class="inner">Public Vars</span></a></h3><ul><li class="depth-1"><a href="clojure.core.async.flow.html#var-create-flow"><div class="inner"><span>create-flow</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-futurize"><div class="inner"><span>futurize</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-inject"><div class="inner"><span>inject</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-lift*-.3Estep"><div class="inner"><span>lift*->step</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-lift1-.3Estep"><div class="inner"><span>lift1->step</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-map-.3Estep"><div class="inner"><span>map->step</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-pause"><div class="inner"><span>pause</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-pause-proc"><div class="inner"><span>pause-proc</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-ping"><div class="inner"><span>ping</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-ping-proc"><div class="inner"><span>ping-proc</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-process"><div class="inner"><span>process</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-resume"><div class="inner"><span>resume</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-resume-proc"><div class="inner"><span>resume-proc</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-start"><div class="inner"><span>start</span></div></a></li><li class="depth-1"><a href="clojure.core.async.flow.html#var-stop"><div class="inner"><span>stop</span></div></a></li></ul></div><div class="namespace-docs" id="content"><h1 class="anchor" id="top">clojure.core.async.flow</h1><div class="doc"><pre class="plaintext"> |
4 | 4 | Note - Alpha, work-in-progress, names and other details are in flux
|
5 | 5 |
|
6 | 6 | A library for building concurrent, event driven data processing
|
|
81 | 81 | complete when done.</pre></div></div><div class="public anchor" id="var-lift*-.3Estep"><h3>lift*->step</h3><div class="usage"><code>(lift*->step f)</code></div><div class="doc"><pre class="plaintext">given a fn f taking one arg and returning a collection of non-nil
|
82 | 82 | values, creates a step fn as needed by process, with one input
|
83 | 83 | and one output (named :in and :out), and no state.</pre></div></div><div class="public anchor" id="var-lift1-.3Estep"><h3>lift1->step</h3><div class="usage"><code>(lift1->step f)</code></div><div class="doc"><pre class="plaintext">like lift*->step except taking a fn returning one value, which when
|
84 |
| -nil will yield no output.</pre></div></div><div class="public anchor" id="var-pause"><h3>pause</h3><div class="usage"><code>(pause g)</code></div><div class="doc"><pre class="plaintext">pauses a running flow |
| 84 | +nil will yield no output.</pre></div></div><div class="public anchor" id="var-map-.3Estep"><h3>map->step</h3><div class="usage"><code>(map->step {:keys [describe init transition transform]})</code></div><div class="doc"><pre class="plaintext">given a map of functions corresponding to step fn arities (see |
| 85 | +'process'), returns a step fn suitable for passing to 'process'. You |
| 86 | +can use this map form to compose the proc logic from disparate |
| 87 | +functions or to leverage the optionality of some of the entry |
| 88 | +points. |
| 89 | + |
| 90 | +The keys in the map are: |
| 91 | +:describe, arity 0 - required |
| 92 | +:init, arity 1 - optional, but should be provided if 'describe' returns :params. |
| 93 | +:transition, arity 2 - optional |
| 94 | +:transform, arity 3 - required</pre></div></div><div class="public anchor" id="var-pause"><h3>pause</h3><div class="usage"><code>(pause g)</code></div><div class="doc"><pre class="plaintext">pauses a running flow |
85 | 95 | </pre></div></div><div class="public anchor" id="var-pause-proc"><h3>pause-proc</h3><div class="usage"><code>(pause-proc g pid)</code></div><div class="doc"><pre class="plaintext">pauses a process
|
86 | 96 | </pre></div></div><div class="public anchor" id="var-ping"><h3>ping</h3><div class="usage"><code>(ping g & {:keys [timeout-ms], :or {timeout-ms 1000}})</code></div><div class="doc"><pre class="plaintext">pings all processes, returning a map of pid -> proc status and
|
87 | 97 | state, for those procs that reply within timeout-ms (default 1000)</pre></div></div><div class="public anchor" id="var-ping-proc"><h3>ping-proc</h3><div class="usage"><code>(ping-proc g pid & {:keys [timeout-ms], :or {timeout-ms 1000}})</code></div><div class="doc"><pre class="plaintext">like ping, but just pings the specified process
|
88 |
| -</pre></div></div><div class="public anchor" id="var-process"><h3>process</h3><div class="usage"><code>(process fn-or-map)</code><code>(process fn-or-map {:keys [workload timeout-ms], :or {timeout-ms 5000}, :as opts})</code></div><div class="doc"><pre class="plaintext">Given a function of four arities (0-3), aka the 'step-fn', or a map |
89 |
| -of functions corresponding thereto (described below), returns a |
90 |
| -launcher that creates a process compliant with the process |
| 98 | +</pre></div></div><div class="public anchor" id="var-process"><h3>process</h3><div class="usage"><code>(process step-fn)</code><code>(process step-fn {:keys [workload compute-timeout-ms], :as opts})</code></div><div class="doc"><pre class="plaintext">Given a function of four arities (0-3), aka the 'step-fn', |
| 99 | +returns a launcher that creates a process compliant with the process |
91 | 100 | protocol (see the spi/ProcLauncher doc).
|
92 | 101 |
|
93 |
| -The possible arities/entries for the step-fn/map are |
| 102 | +The possible arities for the step-fn are |
94 | 103 |
|
95 |
| -0 - :describe, |
96 |
| -1 - :init, |
97 |
| -2 - :transition |
98 |
| -3 - :transform. |
| 104 | +0 - 'describe', () -> description |
| 105 | +1 - 'init', (arg-map) -> initial-state |
| 106 | +2 - 'transition', (state transition) -> state' |
| 107 | +3 - 'transform', (state input msg) -> [state' output-map] |
99 | 108 |
|
100 | 109 | This is the core facility for defining the logic for processes via
|
101 | 110 | ordinary functions. Using a var holding a fn as the 'step-fn' is the
|
102 | 111 | preferred method for defining a proc, as it enables
|
103 | 112 | hot-code-reloading of the proc logic in a flow, and better names in
|
104 |
| -datafy. You can use the map form to compose the proc logic from |
105 |
| -disparate functions or to leverage the optionality of some of the |
106 |
| -entry points. |
| 113 | +datafy. |
107 | 114 |
|
108 |
| -arity 0, or :describe - required, () -> description |
| 115 | +arity 0 - 'describe', () -> description |
109 | 116 | where description is a map with keys :params :ins and :outs, each of which
|
110 | 117 | in turn is a map of keyword to doc string, and :workload with
|
111 | 118 | possible values of :mixed :io :compute. All entries in the describe
|
|
125 | 132 | the proc. It will also be called by the impl in order to discover
|
126 | 133 | what channels are needed.
|
127 | 134 |
|
128 |
| -arity 1, or :init - optional, (arg-map) -> initial-state |
| 135 | +arity 1 - 'init', (arg-map) -> initial-state |
129 | 136 |
|
130 |
| -init will be called once by the process to establish any initial |
| 137 | +The init arity will be called once by the process to establish any initial |
131 | 138 | state. The arg-map will be a map of param->val, as supplied in the
|
132 | 139 | flow def. The key ::flow/pid will be added, mapped to the pid
|
133 | 140 | associated with the process (useful e.g. if the process wants to
|
134 |
| -refer to itself in reply-to coordinates). init must be provided if |
135 |
| -'describe' returns :params. |
| 141 | +refer to itself in reply-to coordinates). |
136 | 142 |
|
137 | 143 | Optionally, a returned init state may contain the
|
138 | 144 | keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
|
|
143 | 149 | outside of it. Use :transition to coordinate the lifecycle of these
|
144 | 150 | external channels.
|
145 | 151 |
|
146 |
| -Optionally, _any_ returned state, whether from :init, :transition |
147 |
| -or :transform, may contain the key ::flow/input-filter, a predicate |
| 152 | +Optionally, _any_ returned state, whether from init, transition |
| 153 | +or transform, may contain the key ::flow/input-filter, a predicate |
148 | 154 | of cid. Only inputs (including in-ports) satisfying the predicate
|
149 | 155 | will be part of the next channel read set. In the absence of this
|
150 | 156 | predicate all inputs are read.
|
151 | 157 |
|
152 |
| -arity 2, or :transition - optional, (state transition) -> state' |
| 158 | +arity 2 - 'transition', (state transition) -> state' |
153 | 159 |
|
154 |
| -transition will be called when the process makes a state transition, |
155 |
| -transition being one of ::flow/resume, ::flow/pause or ::flow/stop |
| 160 | +The transition arity will be called when the process makes a state |
| 161 | +transition, transition being one of ::flow/resume, ::flow/pause |
| 162 | +or ::flow/stop |
156 | 163 |
|
157 |
| -With this fn a process impl can track changes and coordinate |
| 164 | +With this a process impl can track changes and coordinate |
158 | 165 | resources, especially cleaning up any resources on :stop, since the
|
159 | 166 | process will no longer be used following that. See the SPI for
|
160 | 167 | details. state' will be the state supplied to subsequent calls.
|
161 | 168 |
|
162 |
| -arity 3, or :transform - required, (state in-name msg) -> [state' output] |
| 169 | +arity 3 - 'transform', (state in-name msg) -> [state' output] |
163 | 170 | where output is a map of outid->[msgs*]
|
164 | 171 |
|
165 |
| -The transform fn will be called every time a message arrives at any |
| 172 | +The transform arity will be called every time a message arrives at any |
166 | 173 | of the inputs. Output can be sent to none, any or all of the :outs
|
167 | 174 | enumerated, and/or an input named by a [pid inid] tuple (e.g. for
|
168 | 175 | reply-to), and/or to the ::flow/report output. A step need not
|
169 | 176 | output at all (output or msgs can be empyt/nil), however an output _message_
|
170 | 177 | may never be nil (per core.async channels). state' will be the state
|
171 | 178 | supplied to subsequent calls.
|
172 | 179 |
|
173 |
| -process accepts an option map with keys: |
| 180 | +process also accepts an option map with keys: |
174 | 181 | :workload - one of :mixed, :io or :compute
|
175 | 182 | :compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
|
176 | 183 | will be used when getting the return from the future - see below
|
|
185 | 192 |
|
186 | 193 | When :io is specified, transform should not do extensive computation.
|
187 | 194 |
|
188 |
| -When :compute is specified (only allowed for :transform), each call |
189 |
| -to transform will be run in a separate thread. The process loop will |
190 |
| -run in an :io context (since it no longer directly calls transform, |
191 |
| -all it does is I/O) and it will submit transform to the :compute |
192 |
| -executor then await (blocking, for compute-timeout-ms) the |
193 |
| -completion of the future returned by the executor. If the future |
194 |
| -times out it will be reported on ::flow/error. |
| 195 | +When :compute is specified, each call to transform will be run in a |
| 196 | +separate thread. The process loop will run in an :io context (since |
| 197 | +it no longer directly calls transform, all it does is I/O) and it |
| 198 | +will submit transform to the :compute executor then await (blocking, |
| 199 | +for compute-timeout-ms) the completion of the future returned by the |
| 200 | +executor. If the future times out it will be reported |
| 201 | +on ::flow/error. |
195 | 202 |
|
196 | 203 | When :compute is specified transform must not block!</pre></div></div><div class="public anchor" id="var-resume"><h3>resume</h3><div class="usage"><code>(resume g)</code></div><div class="doc"><pre class="plaintext">resumes a paused flow
|
197 | 204 | </pre></div></div><div class="public anchor" id="var-resume-proc"><h3>resume-proc</h3><div class="usage"><code>(resume-proc g pid)</code></div><div class="doc"><pre class="plaintext">resumes a process
|
|
0 commit comments