Commit initial version
[guile-orca] / examples / distributed-reduce.scm
1 #!/usr/bin/env -S guile -s
2 !#
3
4 (add-to-load-path "..")
5
6 (use-modules (orca))
7 (use-modules (srfi srfi-1))
8 (use-modules (ice-9 regex))
9 (use-modules (ice-9 ftw))
10 (use-modules (ice-9 textual-ports))
11
12 (define proc max)
13 (define init-value 0)
14
15 (define (read-db-numbers db-lst)
16   (define db-numbers '())
17
18  (for-each (lambda (db)
19             (let ((port (open-file (format #f "./numbers-db/~a" db) "r")))
20              (let lp ((line (get-line port)))
21               (if (not (eof-object? line))
22                (begin
23                 (set! db-numbers (cons (string->number line) db-numbers))
24                 (lp (get-line port)))
25                (close-port port)))))
26            db-lst) db-numbers)
27
28 (define (apply-distributed-proc db-lst)
29  (reduce proc init-value (read-db-numbers db-lst)))
30
31 (rpc-start)
32
33 (define (partition-db-lists)
34   (define db-lst (scandir "./numbers-db" (lambda (f) (string-match "^.*\\.txt$" f))))
35   (define db-per-worker (euclidean-quotient (length db-lst) (rpc-worker-process-size)))
36   (define dbs '())
37
38   (when (= db-per-worker 0)
39     (error "insufficient db size"))
40
41   (let lp ((i (1- (rpc-worker-process-size))))
42    (if (>= i 1)
43      (begin
44        (set! dbs (cons (take db-lst db-per-worker) dbs))
45        (set! db-lst (drop db-lst db-per-worker))
46        (lp (1- i)))
47      (begin
48        (set! dbs (cons db-lst dbs))
49        (set! dbs (reverse dbs)))))
50   (map (lambda (k) (cons 'list k)) dbs))
51
52 (format #t "result is ~d~%"
53   (reduce proc init-value
54     (rpc-apply-scatter apply-distributed-proc (partition-db-lists))))
55
56 (rpc-finalize)