One queue/channel, several consumers (futures, places, ...)

I read Parallelism with places in the Racket Guide. As I understand the section, it isn't possible to send data over a single channel to multiple places, so that the places can act as "workers" and consume tasks from the channel when a place has finished previous work. Is this understanding correct?

If my understanding is correct, what would be an alternative way to achieve this consumption from the same queue/channel if I want actual parallelism, not just concurrency, as with threads?

If my understanding isn't correct, i.e. several places can get data from the same channel, how I would that look like in code?

I also read about futures, but that sounds as if it's likely that the tasks in the futures won't actually run in parallel.

It is possible. You can represent a cross-place asynchronous queue by creating a place channel pair, writing to one end, and sending the other end (the "read end") to the worker places. (Or vice versa, for gathering responses.) Here's an example:

#lang racket

(module worker racket
  (provide make-worker-place)

  ;; make-worker-place : Any PlaceChannel PlaceChannel -> Place
  (define (make-worker-place id task-in done-out)
    (define worker
      (place self
             (match (place-channel-get self)
               [(list id task-in done-out) (work self id task-in done-out)])))
    (place-channel-put worker (list id task-in done-out))
    worker)

  ;; work : Place Any PlaceChannel PlaceChannel -> Void
  (define (work self id task-in done-out)
    (let loop ()
      (sync (handle-evt task-in
                        (lambda (task)
                          (do-task id task)
                          (place-channel-put done-out task)
                          (loop)))
            (handle-evt self
                        (lambda (shutdown)
                          (printf "place ~s shutting down\n" id)
                          (place-channel-put self 'ok))))))

  ;; do-task : Any Any -> Void
  (define (do-task id task)
    (printf "place ~s got task: ~e\n" id task)
    (sleep (+ 1 (random))))
  )

(require 'worker)

;; Main place sends tasks out using task-out.
;; Workers receive tasks by reading from task-in.
(define-values (task-out task-in) (place-channel))

;; Workers acknowledge completed tasks by sending on done-out.
;; Main place counts completed tasks by reading from done-in.
(define-values (done-out done-in) (place-channel))

;; Create workers.
(define workers
  (for/list ([id (in-range 10)])
    (make-worker-place id task-in done-out)))

;; Put all tasks in the shared task channel.
(define TASKS #e1e2)
(for ([n (in-range TASKS)])
  (place-channel-put task-out n))
(printf "finished queueing tasks\n")

;; Wait until all tasks are done before exiting.
(for ([n (in-range TASKS)])
  (void (place-channel-get done-in)))

;; Shut down each worker.
(for ([worker (in-list workers)])
  (place-channel-put worker 'shutdown)
  (void (place-channel-get worker)))
2 Likes

It depends on the type of workload you want to run in parallel. For calculation-intensive workload, future are usually the best thing since the sliced bread. If there is any I/O, forget about them though.
If you recall my 3D raycaster, it uses futures to cast 11520 rays in 8 futures (1440 on each "core" of my i7) in parallel (per frame) and if I remove artificial frame-rate limiting, it keeps all 8 "cores" (HTs actually) ticking at 100% virtually all the time. It is not 8x faster though ... empirically measured to be more like 5-6x faster though. Hyperthreads are not real cores, than there are cache coherence problems and load of other parallelism-related phenomena which you have to think of.
Also with CS the "futures-safe" set of operations is much larger as allocations and GCs do not force everything back into RTT.

1 Like

Thank you!

I guess place-channel was the piece I needed. I focused so much on understanding the intro in the Racket Guide, that I forgot to check back for more APIs in the Racket Reference. :smiley:

I haven't understood everything in your code (yet), but I hope I will when I find the time to play with the code and am more focused. :slight_smile:

1 Like