Thead-safe streams

Hi,

the following triggers a "Stream: reentrant or broken delay" error:

#lang racket/base
(require racket/stream)

(define K (stream-cons 1
                       (for/stream ([a (in-stream K)])
                         (let () (sleep 1) a))))

(build-list 2 (λ (_) (thread (λ () (stream-ref K 10)))))

I was expecting that other threads either wait or redo the calculation if the result is not ready yet.

My questions:

  • Is this the expected behavior.
  • If so, would it be possible to make this thread-safe and
  • does anybody know of a library that maybe already does this?

Kr,
b

For some more context, because this piqued my interest:

If you define the stream-access as @bdeket has, it fails--from what I can tell--because of a break in the expected sequence in which the elements of the stream are realized.

#lang racket/base

(require racket/stream)

(define (zzZ) (sleep 1) 1)

(define K (stream-cons (zzZ) K))

(define thds
  (build-list
   2
   (λ (_)
     (thread
      #:keep 'results
      (λ ()
        (stream-ref K 10))))))

(for/list ([thd (in-list thds)])
  (thread-wait thd))
stream: reentrant or broken delay
'(#<void> 1)

The stream-object has mutable state, which is guarded by the reentrant-error so that if the current first element is accessed at the wrong time, it will raise an arguments error.

;; stream-const.rkt

(define (stream-force-first p)
  (cond
    [(eagerly-created-stream-first-forced? p)
     (unpack-multivalue (eagerly-created-stream-first p))]
    [else
     (define thunk (eagerly-created-stream-first p))
     (set-eagerly-created-stream-first! p reentrant-error)
     (define v (thunk->multivalue thunk))
     (set-eagerly-created-stream-first! p v)
     (set-eagerly-created-stream-first-forced?! p #t)
     (unpack-multivalue v)]))

You can kind of get around this by catching out-of-sequence accesses and retrying until it succeeds, but it seems less than optimal:

#lang racket/base

(require racket/stream)

(define (zzZ) (sleep 1) 1)

(define K (stream-cons (zzZ) K))

(define thds
  (build-list
   2
   (λ (_)
     (thread
      #:keep 'results
      (λ ()
        (let loop ()
          (with-handlers ([exn:fail? (lambda (exn) (loop))])
            (stream-ref K 10))))))))

(for/list ([thd (in-list thds)])
  (thread-wait thd))
'(1 1)

Edit: just so that the construction doesn't seem misleading, the same error occurs if the stream is constructed without self-reference:

(define K
  (for/stream ([_ (in-naturals)])
    (zzZ)))

You would think that (1) recognizing the state-ful nature of stream cells and (2) sequentializing access to the stream via concurrency control might solve the problem:

#lang racket/base
(require racket/stream)

(define K (stream-cons 1
                       (for/stream ([a (in-stream K)])
                         (let () (sleep 1) a))))

(define ch (make-channel))
(define access
  (thread
   (λ ()
     (let loop ()
       (define request (channel-get ch))
       (define K (car request))
       (define index (cdr request))
       (define result  (stream-ref K index))
       (channel-put ch result)
       (loop)))))

(define (stream-ref-concurrent st i)
  (channel-put ch (cons st i))
  (channel-get ch))

(define _
  (build-list 2
              (λ (i)
                (thread
                 (λ ()
                   (define result (stream-ref-concurrent K 10))
                   (eprintf "debugging thread ~a ~a\n" i result))))))

But when you run this you realize that something fails to do its job when the stream doesn't really exist . The "access thread" merely returns the given index.