Thead-safe streams

Hi,

the following triggers a "Stream: reentrant or broken delay" error:

#lang racket/base
(require racket/stream)

(define K (stream-cons 1
                       (for/stream ([a (in-stream K)])
                         (let () (sleep 1) a))))

(build-list 2 (λ (_) (thread (λ () (stream-ref K 10)))))

I was expecting that other threads either wait or redo the calculation if the result is not ready yet.

My questions:

  • Is this the expected behavior.
  • If so, would it be possible to make this thread-safe and
  • does anybody know of a library that maybe already does this?

Kr,
b

For some more context, because this piqued my interest:

If you define the stream-access as @bdeket has, it fails--from what I can tell--because of a break in the expected sequence in which the elements of the stream are realized.

#lang racket/base

(require racket/stream)

(define (zzZ) (sleep 1) 1)

(define K (stream-cons (zzZ) K))

(define thds
  (build-list
   2
   (λ (_)
     (thread
      #:keep 'results
      (λ ()
        (stream-ref K 10))))))

(for/list ([thd (in-list thds)])
  (thread-wait thd))
stream: reentrant or broken delay
'(#<void> 1)

The stream-object has mutable state, which is guarded by the reentrant-error so that if the current first element is accessed at the wrong time, it will raise an arguments error.

;; stream-cons.rkt

(define (stream-force-first p)
  (cond
    [(eagerly-created-stream-first-forced? p)
     (unpack-multivalue (eagerly-created-stream-first p))]
    [else
     (define thunk (eagerly-created-stream-first p))
     (set-eagerly-created-stream-first! p reentrant-error)
     (define v (thunk->multivalue thunk))
     (set-eagerly-created-stream-first! p v)
     (set-eagerly-created-stream-first-forced?! p #t)
     (unpack-multivalue v)]))

You can kind of get around this by catching out-of-sequence accesses and retrying until it succeeds, but it seems less than optimal:

#lang racket/base

(require racket/stream)

(define (zzZ) (sleep 1) 1)

(define K (stream-cons (zzZ) K))

(define thds
  (build-list
   2
   (λ (_)
     (thread
      #:keep 'results
      (λ ()
        (let loop ()
          (with-handlers ([exn:fail? (lambda (exn) (loop))])
            (stream-ref K 10))))))))

(for/list ([thd (in-list thds)])
  (thread-wait thd))
'(1 1)

Edit: just so that the construction doesn't seem misleading, the same error occurs if the stream is constructed without self-reference:

(define K
  (for/stream ([_ (in-naturals)])
    (zzZ)))

You would think that (1) recognizing the state-ful nature of stream cells and (2) sequentializing access to the stream via concurrency control might solve the problem:

#lang racket/base
(require racket/stream)

(define K (stream-cons 1
                       (for/stream ([a (in-stream K)])
                         (let () (sleep 1) a))))

(define ch (make-channel))
(define access
  (thread
   (λ ()
     (let loop ()
       (define request (channel-get ch))
       (define K (car request))
       (define index (cdr request))
       (define result  (stream-ref K index))
       (channel-put ch result)
       (loop)))))

(define (stream-ref-concurrent st i)
  (channel-put ch (cons st i))
  (channel-get ch))

(define _
  (build-list 2
              (λ (i)
                (thread
                 (λ ()
                   (define result (stream-ref-concurrent K 10))
                   (eprintf "debugging thread ~a ~a\n" i result))))))

But when you run this you realize that something fails to do its job when the stream doesn't really exist . The "access thread" merely returns the given index.

Oh, that makes sense, right.

If you "guard" the access thread's output, you can go all the way:

#lang racket/base
(require racket/stream)

(define (zzZ) (sleep 1) 1)

(define K (stream-cons (zzZ) K))

(define ch (make-channel))
(define actual (gensym))
(define access
  (thread
   (λ ()
     (let loop ()
       (define request (channel-get ch))
       (define K (car request))
       (define index (cdr request))
       (define result (stream-ref K index))
       (channel-put ch (cons actual result))
       (loop)))))

(define (stream-ref-concurrent st i)
  (channel-put ch (cons st i))
  (define out (channel-get ch))
  (if (eq? actual (car out))
      (cdr out)
      (stream-ref-concurrent st i)))

(define thds
  (build-list 2
              (λ (i)
                (thread
                 #:keep 'results
                 (λ ()
                   (stream-ref-concurrent K 10))))))

(map thread-wait thds)
;=> '(1 1)

Edit: I realize that using a separate channel for requests vs. responses may be better:

#lang racket/base
(require racket/stream)

(define (zzZ n) (sleep 1) (* n n))

(define K
  (for/stream ([n (in-naturals)])
    (zzZ n)))

(define ask (make-channel))
(define ans (make-channel))

(define access
  (thread
   (λ ()
     (let loop ()
       (define request (channel-get ask))
       (define K (car request))
       (define index (cdr request))
       (define result (stream-ref K index))
       (channel-put ans (cons index result))
       (loop)))))

(define (stream-ref-concurrent st i)
  (channel-put ask (cons st i))
  (define out (channel-get ans))
  (cond
    [(= i (car out)) (cdr out)]
    [else
     (stream-ref-concurrent st i)]))

(define thds
  (build-list 3
              (λ (i)
                (thread
                 #:keep 'results
                 (λ ()
                   (define index (random 1 11))
                   (stream-ref-concurrent K index))))))

(map thread-wait thds)
;=> '(1 49 49)

Using only a single channel seems to block indefinitely at times (with this way of testing the index of the result you return).

I don't know if this is losing sight of the original point, however.

Thank you,

instead of a guard, a second channel also removes the problem mentioned (here via the thread channel):

#lang racket/base
(require racket/stream
         racket/match)

(define K (stream-cons 0
                       (for/stream ([a (in-stream K)])
                         (sleep 1) (+ a 1))))

(define ch (make-channel))
(define access
  (thread
   (λ ()
     (let loop ()
       (match-define (cons K index) (thread-receive))
       (define result  (stream-ref K index))
       (channel-put ch result)
       (loop)))))

(define (stream-ref-concurrent st i)
  (thread-send access (cons st i))
  (channel-get ch))

But this would mean redefining all stream functions to be able to be concurrent.
I'm still interested in knowing if, or why not this should be in the standard racket/stream library.

If I had to take a guess, it's because this is something that would be largely dependent on your needs and wouldn't necessarily have a "best" or even "good enough" generalization across different workloads/flows to be obvious.

Something that I think might be important here, is that because the stream itself needs to be stateful (otherwise you could just make a fresh stream each time), there needs to be a guard against out-of-sequence access of the stream values.

To paper over this, you have to address both the caching of the results (which the stream does) and the bookkeeping of the sources of the requests for access. Otherwise, you might get your wires crossed when retrieving results when you have a single access channel.

On the other hand, if you can ensure that only a single source ever receives the result of its request, the bookkeeping becomes implicit.

Ten-to-one, no one has thought to specialize for thread-safe access because of these points.

Someone like @sorawee might have a more authoritative answer--whom I mention because I remember reading about their work on constant-space streams, among other things.

This is one way you could do the "implicit bookkeeping", that I can think of:

#lang racket/base

(require
  fancy-app
  racket/match
  racket/stream)

(define ((call-with-concurrent-stream-thread stm^) op)
  (thread-wait
   (thread
    #:keep 'results
    (lambda ()
      (define from (current-thread))
      (thread-send stm^ (cons op (thread-send from _)))
      (thread-receive)))))

(define (make-concurrent-stream-access stm)
  (call-with-concurrent-stream-thread
   (thread
    (lambda ()
      (let loop ()
        (match-define (cons op recv)
          (thread-receive))
        (recv (op stm))
        (loop))))))

(define stm!
  (make-concurrent-stream-access
   (for/stream ([n (in-naturals)])
     (sleep 1) n)))

(define (thds)
  (build-list
   10
   (lambda (i)
     (thread
      (lambda ()
        (displayln (stm! (stream-ref _ (random 0 (+ i 1))))))))))

(time (for-each thread-wait (thds)))
0
0
2
2
3
5
5
5
5
4
cpu time: 656 real time: 5001 gc time: 15

I haven't been able to find any libraries which address this specifically, but I made only a cursory search through the online manuals.

Another, probably more helpful observation, is that it may be difficult to provide a thread-safe stream-abstraction which fits the general stream-interface of empty?, first, and rest. The key being rest because you share the stream when you apply the rest procedure.

I was curious of whether you could wrap the above in a struct to keep the API the same as normal streams, but it turns out that applying rest lets the stream escape, essentially, and then you sit with the same problem as before.

Maybe there is an obvious way to get around this, but I thought it was interesting.

#lang racket/base

(require
  fancy-app
  racket/match
  racket/stream)

(define ((call-with-concurrent-stream-thread stm^) op)
  (thread-wait
   (thread
    #:keep 'results
    (lambda ()
      (define from (current-thread))
      (thread-send stm^ (cons op (thread-send from _)))
      (thread-receive)))))

(define (make-concurrent-stream stm)
  (concurrent-stream
   (call-with-concurrent-stream-thread
    (thread
     (lambda ()
       (let loop ()
         (match-define (cons op recv)
           (thread-receive))
         (recv
          (case op
            [(empty?)
             (stream-empty? stm)]
            [(first)
             (stream-first stm)]
            [(rest)
             (make-concurrent-stream
              (stream-rest stm))]))
         (loop)))))))

(struct concurrent-stream (stm!)
  #:methods gen:stream
  [(define (stream-empty? stm)
     ((concurrent-stream-stm! stm) 'empty?))
   (define (stream-first stm)
     ((concurrent-stream-stm! stm) 'first))
   (define (stream-rest stm)
     ((concurrent-stream-stm! stm) 'rest))])

(define stm!
  (make-concurrent-stream
   (for/stream ([n (in-naturals)])
     (sleep 1) n)))

(define (thds)
  (build-list
   10
   (lambda (i)
     (thread
      (lambda ()
        (displayln
         (stream-ref stm! (random 0 (+ i 1)))))))))

(time (for-each thread-wait (thds)))
0
0
. . . . . . stream: reentrant or broken delay
stream: reentrant or broken delay
stream: reentrant or broken delay
. . stream: reentrant or broken delay
1
2
3
7

If you keep a counter for the current index with the concurrent-stream struct, you can avoid the sharing problem, which seems obvious in hindsight, although it seems like it might be rather inefficient.