Bensa.ch Blog

1BRC in Elixir in 20seconds

The other day while at work someone commented on Slack about the 1BRC, where you are supposed to read 1 billion rows from a single file and run some simple operations 1BRC

After reading a bit, I became really interested because I was seeing some langs like Java doing 6 seconds, C doing about 1.2 seconds and so on. And Elixir being a functional language and knowing that it likes to copy everything for inmutability, it got me wondering what’s the limit?

NOTE: I run a Ryzen 7 3700X with 8 cores 16 Threads, 32GB of RAM, and because why not in Windows 10

Other stuff that I’ve tried

I ended up having 4 different solutions here is the recap

At the end of the day, the problem was simply the message passing, it copies to much, I tried to use ETS and such but because I have to actually send the line to the GenServer using any kind of mutable storage was of no avail.

The GenServer was needed because I was using it to synchronize the read and writes to It’s state.

Getting complex

The basic idea can be seen like this:

I will be creating two different counters per station.

1counter = :counters.new(2, [:atomics])
2counter2 = :counters.new(2, [:write_concurrency])
1:persistent_term.put(:erlang.phash2(name), {name, counter, counter2, mutex})

Mutex

The implementation of the mutex is really straightforward

 1defp acquire(mutex) do
 2    if :counters.get(mutex, 1) == 0 do
 3      :counters.add(mutex, 1, 1)
 4    else
 5      acquire(mutex)
 6    end
 7end
 8
 9defp release(mutex) do
10    :counters.sub(mutex, 1, 1)
11end

Yes, I know, is a bit vague but I mean it does work. When a process does not get the acquire it will execute it as many times as the BEAM allows it. I did not decide to add any sleep nor pid synchronization because running tests no difference was made. (You can prove me wrong)

Here is the whole thing

 1defmodule Brc do
 2  @moduledoc false
 3
 4  # 1 000 000 000
 5  def run do
 6    "measurements.txt"
 7    |> File.read!()
 8    |> String.split("\n", trim: true)
 9    |> Stream.chunk_every(300_000)
10    |> Task.async_stream(fn block ->
11      Enum.each(block, &process_line/1)
12    end, ordered: false, timeout: :infinity)
13    |> Stream.run()
14
15    read_terms_and_build_output()
16  end
17
18  defp process_line(line) do
19    [name, measurement] = String.split(line, ";")
20    value = trunc(String.to_float(measurement) * 100)
21
22    case get_counters(name) do
23      {:ok, counter, counter2, _} ->
24        # init the counters
25        :counters.add(counter, 1, value)
26        :counters.add(counter, 2, value)
27        :counters.add(counter2, 1, value)
28        :counters.add(counter2, 2, 1)
29
30      {counter, counter2, mutex} ->
31        :counters.add(counter2, 1, value)
32        :counters.add(counter2, 2, 1)
33
34        acquire(mutex)
35        min = :counters.get(counter, 1)
36        max = :counters.get(counter, 2)
37
38        if value < min do
39          :counters.sub(counter, 1, min - value)
40        end
41
42        if value > max do
43          :counters.add(counter, 2, value - max)
44        end
45
46        release(mutex)
47    end
48  end
49
50  defp get_counters(name) do
51    key = :erlang.phash2(name)
52
53    {_, a, b, mutex} = :persistent_term.get(key)
54    {a,b, mutex}
55  rescue
56    _ ->
57      mutex = :counters.new(1, [:atomics])
58      counter = :counters.new(2, [:atomics])
59      counter2 = :counters.new(2, [:write_concurrency])
60      {:persistent_term.put(:erlang.phash2(name), {name, counter, counter2, mutex}), counter, counter2, mutex}
61  end
62
63  defp read_terms_and_build_output do
64    :persistent_term.get()
65    |> Enum.filter(fn {id, _} -> is_integer(id) end)
66    |> Enum.map(fn {_id, {name, counter, counter2, _}} ->
67      Task.async(fn ->
68        min = :counters.get(counter, 1)
69        max = :counters.get(counter, 2)
70        sum = :counters.get(counter2, 1)
71        count = :counters.get(counter2, 2)
72
73        {name, Float.round(min / 100, 2), Float.round(max / 100, 2), Float.round(sum / 100), count}
74      end)
75    end)
76    |> Task.await_many(:infinity)
77    |> List.flatten()
78    |> Enum.sort_by(&elem(&1, 0))
79    |> Enum.map(fn {name, min, max, sum, count} -> "#{name};#{min};#{Float.round(sum / count, 2)};#{max}\n" end)
80    |> then(&File.write("output.txt", &1))
81  end
82
83  defp acquire(mutex) do
84    if :counters.get(mutex, 1) == 0 do
85      :counters.add(mutex, 1, 1)
86    else
87      acquire(mutex)
88    end
89  end
90
91  defp release(mutex) do
92    :counters.sub(mutex, 1, 1)
93  end
94end

It takes about 18-20 secs to do everything, 4 secs just to read the File. It will scale with more cores really easy.

At the end I feel that this is really cheating, I mean you could replace this code with C, and it the solution will be exactly the same. But the goal is to do it as fast as possible.

For my part I don’t think that I will not continue this further I am happy I never used counters until now. I don’t think this could be further improved without using macros and building them to match the dataset and stuff like that. You could try to read the file faster, or spawn the tasks in a different way.

From my part that’s all.

Reply to this post by email ↪