Shared data structures between parallel workers in racket?

We are working on parallelizing a long computation in racket so that it can take advantage of multiple cores while sharing data structures, and are hoping for some advice from the community.

The computation consists of many work units, memoizing intermediate steps in each work unit to avoid duplicating effort when completing other work units. The cache has a big impact, sometimes reducing runtime by a factor of 20 in serial operation.

To parallelize the computation we have created a controller function, which spawns a place for each parallel worker. The controller sends each idle worker a work unit through the appropriate channel, and then receives results through the channel once the worker has completed the work. Intermediate steps are also sent back to the controller through place channels, which then sends them to the other workers so they can update their caches.

Sending data structures for cache updates over the channels takes so long that in many cases the parallel implementation runs slower than the serial implementation, and we gain no advantage from parallelizing the computations in this way.

Is there a good way to share a data structure between parallel workers that would allow us to take advantage of multiple cores?

5 Likes

It is really difficult to say something general, so if you can describe more about the data and the dependencies between different units of work, it is more likely that somebody can give you hints on potential tweaks.

Do you have a worker per core or a worker per task?
The former is what I would expect, the latter could waste a lot of resources if you have say 20000 tasks but only 8 / 16 / 64 cores (whatever it is).
If you can tell us some numbers (e.g. core counts and amount of tasks, how much is shared between tasks) and how the tasks could be described with a graph that would make the whole discussion more useful.

If you have high core counts you should take a look at loci as a replacement to places that works better in that scenario.

I also made a topic about something related that might be of interest:

My biggest question however is: does every task need to access one single data structure?
If so, then that seems like a problem that isn't very parallelize-able.
In a better case szenario you want to be able to cluster tasks into groups that are tightly coupled and then figure out how those get one part done and then send their results to groups that are later down in the "pipeline".

Also this topic kind of screams for a shout-out of Amdahl's law - Wikipedia

1 Like

I tried to parallelize long computations myself in Racket using places (and futures too), so far with no success. That is, the "parallel" version ran slower than the "single" CPU thread version, or, in the case of futures it resulted in code that was very complex and fragile (having to use unsafe operations).

I am curious if anyone has actual practical experience with these, outside the core Racket team, of course :slight_smile:

My only successful use of places was when I moved a CPU intensive operation to a separate place so the application GUI thread remains responsive. The end result is almost as fast as the single CPU thread version, but the GUI is now more responsive, so I kept it.

I think, places work best when the "units of computation" are very large in terms of time, but very small in terms data communication, the size and amount of messages that flow through the place channels. You can use make-shared-fxvector and make-shared-flvector to create shared data structures which are not serialized through the place channels, but, there is no way that I know of, to synchronize access to the data, so they should only be used to pass data around and avoid serializing it through the place channel.

Not sure how much this helps,
Alex.

4 Likes

raco setup uses places by default.

One way I've used places is to run a server listening on port 80 and redirecting HTTP to HTTPS, while the main place listened on port 443.

In principle, futures ought to be a better fit than places when you need shared data structures. In practice, the folklore has been that futures are mostly useful for numeric computations and that you will likely need to use the future visualizer to find surprising unintended synchronization points (like + as opposed to fl+, IIRC). That's from the BC era, though. Racket CS brought vastly improved support for futures running in parallel, because many more Chez Scheme datastructures and functions are thread-safe at the OS level. I'd be very interested to hear about benchmarks or experience with futures on Racket CS.

2 Likes

I would definitely suggest using futures here. Sharing r/w structures (like cache) between futures is a bit tricky, but definitely feasible. With CS it got even easier.
Generally speaking you should be able to either divide the work between the same number of futures as is the number of cores of the CPU you run on and in order to use caching just make sure you use some futures-aware locking - probably just CAS operations, do not try using fsemaphores as mutexes like I did some time ago - now I agree that they are not well-suited for that (although I managed to use them reliably with a few tricks here and there).
For the work distribution, I usually use a very simple syntax macro[1] or similar.
For shared mutable data structure between future - in my case it was a work queue - I used a custom queue created with the help of others at racket-users@ ML. You can see it here[2]
The busy-wait loop looks horrific, but in reality it is not - due to futures' semantics. I can imagine the cache can be handled similarly.

[1] rcge/futures.rkt · master · Dominik Joe Pantůček / rcge-tut25 · GitLab
[2] rcge/mfqueue.rkt · master · Dominik Joe Pantůček / rcge-tut25 · GitLab

2 Likes

Hi, thanks for your reply. I saw your multi-process racket post and it was very useful in getting a picture of different options out there.

You seem to want more details; I will simplify the numbers so they are round and easy to work with to make my point.

Let's say we have a computational task with 1000 work units, each take 100ms. Statistically, half the tasks can be resolved without computation if we have a cache because they share intermediate results with the other half. Now assume that the intermediate results necessary to resolve a task may be distributed through the results from other tasks, so we can only use them if we can see the cached results of everything that came before us.

I'm hoping the assumptions I am making here are sufficiently straightforward that you can fill in details. If you have any questions, or if I have made any mistakes, let me know and I will clarify.

Theoretical fastest timing bounds for our hypothetical computation:

A. Serial no cache: 100,000 ms

B. Serial cache: 50,000 ms

C. Parallel 2 workers no cache: 50,000 ms

D. Parallel 1,000 workers no cache: 100 ms (Amdahl's limit)

E. Parallel 2 workers cache: 25,000 ms (target)

Combining the cache with parallelization gives us significant advantage over either.

So when I implement this with places and use the message passing model to pass the cache updates to the workers. I discovered that cache update deltas take 1,000ms to transfer from the controller to the workers. If half the work units involve a cache update, this adds 50,000 ms and the last case becomes:

E. Parallel 2 workers Racket cache: 75,000 ms

In practice, E case for me is often slower than the cached case or the parallel case alone. It would be feasible to approach the speed of E, if I had an efficient mechanism for sharing the cache in Racket.

One possibility that is available in many language environments is concurrent access to data structures in shared memory, (e.g. https://www.cs.tau.ac.il/~shanir/concurrent-data-structures.pdf). You have to do updates to the structure carefully, locking the parts you are updating, but in many environments it is possible to combine a shared cache with parallel workers with a latency that doesn't destroy the advantages of parallelization.

I would like to hear from Racket developers or anyone who is interested in this problem: what is the best way to handle this in the current Racket environment? If a good solution doesn't exist, what would be the best way to develop one?

Thank you for your reply. Based on our somewhat similar experiences, I suspect there is a large class of algorithms that normally benefit from parallelization in other languages that because of their timing, won't benefit from parallelization in Racket, possibly because of the implementation of message passing in places. I am hoping there is a good solution here, and if there isn't, the Racket development community can create one soon. For my particular application, this is nearly a deal-breaker and I'd hate to have to port my solution to another language environment because of this..

Thank you @dominik.pantucek and @LiberalArtist for the suggestions to use futures. I avoided them because the documentation said they mostly work with numerics and there is a lot of non-numeric data structure manipulation here, but I will revisit them and if I have success I will post my results here.

1 Like

I'd be interested in your results, regardless of whether futures work for you or don't. :slight_smile:

2 Likes

Are these 100ms work units, really all 100ms (or is that some kind of average) and are they algorithmically of the same kind, or are they just similar and treated in a generalized way?

If theses tasks are heterogeneous but treated in a generic way, then I still think that there might be possibilities to instead treat each kind in a specific way to get speed ups.
(I mostly write this, because sometimes people write way to general code and then wonder why it is slow, not saying that you in particular do this)

No matter whether this is the case or not, here is another project that might be of interest:

Sadly currently sham does not seem to have a package that can be easily installed to start using it.
But I haven't actually spend time trying to use sham, so I don't know if it is easy or difficult to get started with it.
Also I mostly can give hints of what you could try, I haven't had a practical need to do high performance/throughput computations so my practical experience is limited, although I find it interesting (someday I will find the time or have a practical need).


I am not sure whether I believe these abstract numbers are that useful / can be connected to some real implementation. Why? I can't see the part that is potentially cache-able and the part that is actually doing the computation.

I also am not sure in what way caching is actually that helpful if all tasks could be run completely independently in 100ms in parallel, adding caching to that would likely increase latency.
Is your goal lower latency?

Also what can be shared between the tasks, what is the distribution of the parts that are shareable? Can it be predicted/grouped? Is there some heuristic that could group tasks that are likely to have good sharing of some sub-computation?

If these numbers were real I could say:
D implies that every task can be run completely independently.
E wants 25,000 ms with caching
Then my question would be: why don't you just use 4 workers, to roughly get 25,000 ms without caching?
Or if they are that independent maybe even use some kind of compute shader to implement D (half serious / half joke)

I think numbers like these may not be that helpful, as a basis of actually giving you advice, because we all have different ideas about what is calculated etc..
I like that you provided them, because they gave me some sense of how the problem looks, but it still could be a wide variety of different problems.


If neither futures, places, sham, shared-vectors, or memory mapped files/pages, solve your issue, then my next approach would probably be to look whether some lowlevel language (like C, rust, zig, etc.) can be used to build a shared library that does the actual work and use that with rackets ffi.

With really lowlevel code you want to ask yourself what the memory access patterns are, if cache misses can be avoided by tightly packed sequential arrays, then it may turn out that just recomputing a bunch of things is faster then trying to cache them by using non-local/scattered memory access patterns.

Also "Data Oriented Programming/Design" describes some ways how to write lowlevel code so that it fits the hardware, this page seems to have a bunch of links to further resources about its ideas: Mike Acton | Data Oriented Programming


I hope some of this is useful for you. I asked a bunch of questions, maybe some of those apply to your problem.

The early cutoff based on intermediate results reminds me of some of the designs from the paper "Build Systems Ă  la Carte" (which has admittedly been on my mind a lot recently).

If the cache is truly an optimization, it might work out to tolerate occasionally duplicating work to reduce synchronization around cache lookups and stores.

I definitely think futures are the think to try. The extensive caveats all date from the BC era: I just don't think there's been enough experience with futures on Racket CS to know what to write in place of the hard, often discouraging, numbers from BC. My understanding is that even mutable hash tables can be used from futures on CS: some operations on equal?-based tables may require synchronization, but the shouldn't "block" (in the sense of, "prevent the future from continuing until it is touched"). There are also options like box-cas! and vector-cas!.

One difference between futures and places is that each place always creates a new OS thread and essentially spawns a new instance of the Racket VM, so startup costs are high, and you always want to organize your work into no more than (processor-count) worker places. Futures, on the other hand, are scheduled by Racket on a pool of OS threads, so you may be able to express your units of work more directly, and let Racket take care of the scheduling.

For completeness, the module ffi/unsafe/os-thread exposes Chez Scheme's ability to launch OS threads, with concomitant restrictions on what code can be safely run in them. Outside of an FFI context (where it can be useful for dealing with limitations in foreign libraries, as in sqlite3-connect), I'd consider that a low-level escape hatch: if there's something ffi/unsafe/os-thread lets you do that doesn't work with futures, we should figure out how to make it work safely.

1 Like

A few yeas ago I parallelized a few of thee programs in The Computer Language Benchmarks Game, for example this uses places: regex-redux Racket #2 program (Benchmarks Game)

They don't share info. Each place does an isolated task and the result is just the concatenation of the results.

Places has a very long starting time, IIRC like half a second. It was an ole machine with an old version of Racket. I don't have the current numbers, but try to avoid creating too many. The rule of thumb is one per core, but in my example I used 2 + the main processes because it was faster in spite the test run in a 4 core machine.

If you have 1000 work units with 100ms, try to group the work in a few places instead of creating 1000 places.

Other option is to use futures. they have a shorter starting time and can share more info between them, but in my experience they are only nice for number crushing. Once you try to use hashes or allocate or any other nice stuff then they will block unless you are very careful.