Commit initial version
[guile-orca] / orca.scm
1 ;;; Copyright (C) 2020  Ahmet Artu Yildirim
2 ;;;
3 ;;; orca is free software: you can redistribute it and/or modify
4 ;;; it under the terms of the GNU Lesser General Public License as
5 ;;; published by the Free Software Foundation, either version 3 of
6 ;;; the License, or (at your option) any later version.
7 ;;;
8 ;;; orca is distributed in the hope that it will be useful,
9 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
10 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 ;;; GNU Lesser General Public License for more details.
12 ;;;
13 ;;; You should have received a copy of the GNU Lesser General Public License
14 ;;; along with orca. If not, see <https://www.gnu.org/licenses/>.
15
16 (define-module (orca)
17   #:use-module (system foreign)
18   #:use-module (rnrs bytevectors)
19   #:use-module (ice-9 q)
20   #:use-module (ice-9 format)
21   #:use-module (srfi srfi-9)
22   #:use-module (srfi srfi-1)
23   #:use-module (srfi srfi-11)
24   #:use-module (srfi srfi-13)
25   #:use-module (srfi srfi-4 gnu)
26   #:use-module (ice-9 eval-string)
27   #:use-module (ice-9 streams)
28   #:use-module (orca config)
29   #:use-module (orca internal))
30
31 ;;------------------------------------------------------------------------------
32
33 ;;; Internal data structures
34
35 ;;------------------------------------------------------------------------------
36
37 (define DEFAULT_TAG 0)
38 (define MASTER_P_ID 0)
39 (define DATA_TAG 0)
40 (define MESSAGE_TAG 1)
41
42 (define stream-table (make-hash-table))
43
44 ;;------------------------------------------------------------------------------
45
46 ;;; Internal Messaging Functions
47
48 ;;------------------------------------------------------------------------------
49
50 (define (%finalize-and-exit)
51   (mpi-finalize)
52   (exit 0))
53
54 (export %finalize-and-exit)
55
56 ;;------------------------------------------------------------------------------
57
58 (define* (%accept-rpc-call source #:key (tag DEFAULT_TAG))
59   (let ((message (mpi-recv-string source tag)))
60     (mpi-send-string source (eval-message message) tag)))
61
62 (export %accept-rpc-call)
63
64 ;;------------------------------------------------------------------------------
65
66 ;;; Helper Functions
67
68 ;;------------------------------------------------------------------------------
69
70 (define (any-process-completed? s-result)
71   (cond
72    ((null? s-result) #f)
73    ((car s-result) (any-process-completed? (cdr s-result)))
74    (else #t)))
75
76 ;;------------------------------------------------------------------------------
77
78 (define (all-process-completed? s-result)
79   (cond
80    ((null? s-result) #t)
81    ((car s-result) #f)
82    (else (all-process-completed? (cdr s-result)))))
83
84 ;;------------------------------------------------------------------------------
85
86 (define (initialize-mpi)
87   (unless (mpi-initialized)
88     (mpi-init)))
89
90 ;;------------------------------------------------------------------------------
91
92 (define (quote-params params)
93   (map (lambda (param)
94          (cond
95           ((null? param) '(quote ()))
96           ((and
97             (pair? param)
98             (not (eq? 'list (car param))))
99            (append '(quote) (list param)))
100           (else param))) params))
101
102 (define (quote-params-for-exp exp)
103   `(,(car exp) ,@(quote-params (cdr exp))))
104
105 (define (quote-params-for-multiple-exp exps)
106   (map (lambda (exp)
107          (quote-params-for-exp exp))
108        exps))
109
110 ;;------------------------------------------------------------------------------
111
112 (define (broadcast-rpc-message datum)
113   (for-each
114    (lambda (p-id)
115      (mpi-isend-string p-id (exp->string datum) MESSAGE_TAG))
116    (iota (1- (%process-size)) 1)))
117
118 ;;------------------------------------------------------------------------------
119
120 (define (rpc-barrier)
121   (mpi-barrier))
122
123 ;;------------------------------------------------------------------------------
124
125 (define* (make-rpc-call dest s-exp #:key (tag DEFAULT_TAG))
126   (let ((message (exp->string s-exp)))
127     (mpi-send-string dest message tag)
128     (string->exp (mpi-recv-string dest tag))))
129
130 ;;------------------------------------------------------------------------------
131
132 (define* (make-rpc-async-call dest s-exp #:key (tag DEFAULT_TAG))
133   (let ((message (exp->string s-exp)))
134     (mpi-send-string dest message tag)
135     (mpi-irecv-string dest tag)))
136
137 ;;------------------------------------------------------------------------------
138
139 (define (wait-rpc-async request)
140   (let ((message (mpi-wait request)))
141     (string->exp message)))
142
143 ;;------------------------------------------------------------------------------
144
145 (define (set-streams! ns streams)
146   (let ((s-lst (hash-ref stream-table ns)))
147     (if s-lst
148         s-lst
149         (hash-set! stream-table ns streams))))
150
151 (define (has-empty-stream? streams)
152   (fold (lambda (current prev)
153           (if prev
154               prev
155               (stream-null? current))) #f streams))
156
157 (define (collect-streams ns streams)
158   (let ((result (map (lambda (s) (stream-car s)) streams)))
159     (hash-set! stream-table ns (map (lambda (s) (stream-cdr s)) streams))
160     result))
161
162
163 (define (%set-and-collect-streams ns streams)
164   (let ((reg-stream (set-streams! ns streams)))
165     (if (has-empty-stream? reg-stream)
166         #f
167         (collect-streams ns reg-stream))))
168
169 (export %set-and-collect-streams)
170
171 (define (%process-id)
172   "Return the id of the calling process."
173   (mpi-rank))
174
175 (define (%process-size)
176   "Return the size of the process group."
177   (mpi-size))
178
179 ;;------------------------------------------------------------------------------
180
181 ;;; Public Functions
182
183 ;;------------------------------------------------------------------------------
184
185 (define (rpc-worker-process-id)
186   "Return the id of the worker process."
187   (mpi-rank))
188
189 (export rpc-worker-process-id)
190
191 ;;------------------------------------------------------------------------------
192
193 (define (rpc-worker-process-size)
194   "Return the number of the worker processes."
195   (mpi-size))
196
197 (export rpc-worker-process-size)
198
199 ;;------------------------------------------------------------------------------
200
201 (define (rpc-is-master-process)
202   "Return #t if the process is master, #f otherwise"
203   (= MASTER_P_ID (%process-id)))
204
205 (export rpc-is-master-process)
206
207 ;;------------------------------------------------------------------------------
208
209 (define (rpc-make datum)
210   (broadcast-rpc-message `(%accept-rpc-call ,MASTER_P_ID))
211
212   (let ((my-message (exp->string datum)))
213     (for-each (lambda (request) (mpi-wait request))
214               (map
215                (lambda (p-id)
216                  (mpi-isend-string p-id my-message DATA_TAG))
217                (iota (1- (%process-size)) 1)))
218     (cons (string->exp (eval-message my-message))
219           (map (lambda (p-id)
220                  (string->exp (mpi-recv-string p-id DATA_TAG)))
221                (iota (1- (%process-size)) 1)))))
222
223 (export rpc-make)
224
225 ;;------------------------------------------------------------------------------
226
227 (define (rpc-scatter lst-datum)
228   (set! lst-datum (quote-params-for-multiple-exp lst-datum))
229
230   (broadcast-rpc-message `(%accept-rpc-call ,MASTER_P_ID))
231
232   (let lp ((cdr-datum (cdr lst-datum)) (p-id 1))
233     (unless (nil? cdr-datum)
234       (mpi-isend-string p-id (exp->string (car cdr-datum)) DATA_TAG)
235       (lp (cdr cdr-datum) (1+ p-id))))
236   (cons (string->exp (eval-message (exp->string (car lst-datum))))
237         (map (lambda (p-id)
238                (string->exp (mpi-recv-string p-id DATA_TAG)))
239              (iota (1- (%process-size)) 1))))
240
241 (export rpc-scatter)
242
243 ;;------------------------------------------------------------------------------
244
245 (define-syntax rpc-apply-bcast
246   (syntax-rules ()
247     ((rpc-apply-bcast proc exp ...)
248      (rpc-make `(proc ,@(quote-params (list exp ...)))))))
249
250 (export rpc-apply-bcast)
251
252 ;;------------------------------------------------------------------------------
253
254 (define-syntax rpc-apply-scatter
255   (syntax-rules ()
256     ((rpc-apply-scatter proc exp ...)
257      (rpc-scatter
258       (map (lambda (params) `(proc ,@params)) (zip exp ...))))))
259
260 (export rpc-apply-scatter)
261
262 ;;------------------------------------------------------------------------------
263
264 (define-syntax rpc-stream-map
265   (syntax-rules ()
266     ((rpc-stream-map proc stream1 ...)
267      (call/cc
268       (lambda (k)
269         (let* ((ns (symbol->string (gensym "orca")))
270                (datum `(%set-and-collect-streams ,ns (list stream1 ...)))
271                (result '())
272                (s-result '()))
273           (while #t
274             (set! s-result (rpc-make datum))
275             (if (any-process-completed? s-result)
276                 (break)
277                 (set! result
278                       (cons
279                        (apply proc
280                               (cons (lambda* (#:optional (ret #f))
281                                       (if ret
282                                           (k ret)
283                                           (k (reverse result))))
284                                     (apply zip s-result))) result))))
285           (reverse result)))))))
286
287 (export rpc-stream-map)
288
289 ;;------------------------------------------------------------------------------
290
291 (define-syntax rpc-stream-for-each
292   (syntax-rules ()
293     ((rpc-stream-for-each proc stream1 ...)
294      (call/cc
295       (lambda (k)
296         (let* ((ns (symbol->string (gensym "orca")))
297                (datum `(%set-and-collect-streams ,ns (list stream1 ...)))
298                (s-result '()))
299           (while #t
300             (set! s-result (rpc-make datum))
301             (if (any-process-completed? s-result)
302                 (break)
303                 (apply proc (cons k (apply zip s-result)))))
304           #t))))))
305
306 (export rpc-stream-for-each)
307
308 ;;------------------------------------------------------------------------------
309
310 (define-syntax rpc-stream-fold
311   (syntax-rules ()
312     ((rpc-stream-fold proc init stream1 ...)
313      (call/cc
314       (lambda (k)
315         (let* ((ns (symbol->string (gensym "orca")))
316                (datum `(%set-and-collect-streams ,ns (list stream1 ...)))
317                (result '())
318                (prev init)
319                (s-result '()))
320           (while #t
321             (set! s-result (rpc-make datum))
322             (if (any-process-completed? s-result)
323                 (break)
324                 (begin
325                   (set! result
326                         (apply proc
327                                (cons (lambda* (#:optional (ret #f))
328                                        (if ret
329                                            (k ret)
330                                            (k prev)))
331                                      (cons prev
332                                            (apply zip s-result)))))
333                   (set! prev result))))
334           result))))))
335
336 (export rpc-stream-fold)
337
338 ;;------------------------------------------------------------------------------
339
340 (define (rpc-finalize)
341   (broadcast-rpc-message '(%finalize-and-exit))
342   (mpi-finalize))
343
344 (export rpc-finalize)
345
346 ;;------------------------------------------------------------------------------
347
348 (define (rpc-start)
349   (unless (rpc-is-master-process)
350     (while #t
351       (let ((message (mpi-recv-string MASTER_P_ID MESSAGE_TAG)))
352         (eval (string->exp message) (interaction-environment))))))
353
354 (export rpc-start)
355
356 ;;------------------------------------------------------------------------------
357
358 (initialize-mpi)