For some more context, because this piqued my interest:
If you define the stream-access as @bdeket has, it fails--from what I can tell--because of a break in the expected sequence in which the elements of the stream are realized.
The stream-object has mutable state, which is guarded by the reentrant-error so that if the current first element is accessed at the wrong time, it will raise an arguments error.
;; stream-cons.rkt
(define (stream-force-first p)
(cond
[(eagerly-created-stream-first-forced? p)
(unpack-multivalue (eagerly-created-stream-first p))]
[else
(define thunk (eagerly-created-stream-first p))
(set-eagerly-created-stream-first! p reentrant-error)
(define v (thunk->multivalue thunk))
(set-eagerly-created-stream-first! p v)
(set-eagerly-created-stream-first-forced?! p #t)
(unpack-multivalue v)]))
You can kind of get around this by catching out-of-sequence accesses and retrying until it succeeds, but it seems less than optimal:
You would think that (1) recognizing the state-ful nature of stream cells and (2) sequentializing access to the stream via concurrency control might solve the problem:
#lang racket/base
(require racket/stream)
(define K (stream-cons 1
(for/stream ([a (in-stream K)])
(let () (sleep 1) a))))
(define ch (make-channel))
(define access
(thread
(λ ()
(let loop ()
(define request (channel-get ch))
(define K (car request))
(define index (cdr request))
(define result (stream-ref K index))
(channel-put ch result)
(loop)))))
(define (stream-ref-concurrent st i)
(channel-put ch (cons st i))
(channel-get ch))
(define _
(build-list 2
(λ (i)
(thread
(λ ()
(define result (stream-ref-concurrent K 10))
(eprintf "debugging thread ~a ~a\n" i result))))))
But when you run this you realize that something fails to do its job when the stream doesn't really exist . The "access thread" merely returns the given index.
instead of a guard, a second channel also removes the problem mentioned (here via the thread channel):
#lang racket/base
(require racket/stream
racket/match)
(define K (stream-cons 0
(for/stream ([a (in-stream K)])
(sleep 1) (+ a 1))))
(define ch (make-channel))
(define access
(thread
(λ ()
(let loop ()
(match-define (cons K index) (thread-receive))
(define result (stream-ref K index))
(channel-put ch result)
(loop)))))
(define (stream-ref-concurrent st i)
(thread-send access (cons st i))
(channel-get ch))
But this would mean redefining all stream functions to be able to be concurrent.
I'm still interested in knowing if, or why not this should be in the standard racket/stream library.
If I had to take a guess, it's because this is something that would be largely dependent on your needs and wouldn't necessarily have a "best" or even "good enough" generalization across different workloads/flows to be obvious.
Something that I think might be important here, is that because the stream itself needs to be stateful (otherwise you could just make a fresh stream each time), there needs to be a guard against out-of-sequence access of the stream values.
To paper over this, you have to address both the caching of the results (which the stream does) and the bookkeeping of the sources of the requests for access. Otherwise, you might get your wires crossed when retrieving results when you have a single access channel.
On the other hand, if you can ensure that only a single source ever receives the result of its request, the bookkeeping becomes implicit.
Ten-to-one, no one has thought to specialize for thread-safe access because of these points.
Someone like @sorawee might have a more authoritative answer--whom I mention because I remember reading about their work on constant-space streams, among other things.
This is one way you could do the "implicit bookkeeping", that I can think of:
Another, probably more helpful observation, is that it may be difficult to provide a thread-safe stream-abstraction which fits the general stream-interface of empty?, first, and rest. The key being rest because you share the stream when you apply the rest procedure.
I was curious of whether you could wrap the above in a struct to keep the API the same as normal streams, but it turns out that applying rest lets the stream escape, essentially, and then you sit with the same problem as before.
Maybe there is an obvious way to get around this, but I thought it was interesting.
If you keep a counter for the current index with the concurrent-stream struct, you can avoid the sharing problem, which seems obvious in hindsight, although it seems like it might be rather inefficient.