Recently I had occasion to experiment with streaming content from a Rails controller. I decided to try out Server Side Events just to see where the support was for them in Rails 4 and if they’d work well for my use case.

Rails doesn’t have an event loop you can easily plug in to, and Ruby has the GIL. So handling SSE requires some adjustments to make concurrency possible. Our options then are needing another server to manage the streaming requests (so, using another process) or using a multithreaded server like puma or passenger. I went with puma for simplicity’s sake. First though, what are the basic pieces we’re looking at?

Here’s the high-level view:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
	+client (js)--------+   +------+server (puma)+--+
	| +---------------+ +-> |                       |
	| |               | |   | +-------------------+ |
	| |  source = new | | <-+ |                   | |
	| |  EventSource  | |   | | main thread       | |
	| |               | |   | |                   | |
	| +---------------+ |   | +---------+---------+ |
	|                   |   |           |           |
	| +---------------+ |   |           |           |
	| |               | |   |  +--------v--------+  |
	| | source.       | +-> |  |                 |  |
	| | close()       | |   |  | stream thread   |  |
	| |               | | <-+  |                 |  |
	| +---------------+ |   |  +-----------------+  |
	+-------------------+   +-----------------------+

The sequence of execution is this: a view page is rendered by the main thread, on the view page a new EventSource is created which will read off the stream, meanwhile the stream thread is activated by the main thread when a request to its action comes through (this is triggered by the EventSource creation). Let’s talk about how a stream is closed (or not closed) a little further down. First let’s look at our controller.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ExampleController < ApplicationController
  include ActionController::Live
  LIVE_STREAM_UPDATE_RATE = 2

  def index
    # here we grab what existed at render time
    render locals: { models: Model.all }
  end

  def live_stream
    response.headers['Content-Type'] = 'text/event-stream'
    # here we start our time slicing
    previous_time = Time.now

    loop do
      # lets only grab new models since original render
      models = Model.models_since(previous_time)

      logger.info('streaming models')

      response.stream.write 'data: ' \
        "#{models.to_json}\n\n"

      # we increment our timer for next slice
      previous_time = Time.now

      # we reenter loop after a slight wait
      sleep LIVE_STREAM_UPDATE_RATE
    end
  rescue ActionController::Live::ClientDisconnected
    logger.info('client disconnected')
  ensure
    # as long as Ruby knows to stop #live_stream
    # method we will close the stream
    logger.info('closed stream')
    response.stream.close
  end
end

And here’s the matching piece of the puzzle on the client side (this is rendered in the index view):

1
2
3
4
5
6
7
8
9
var source = new EventSource('models/live_stream');

source.onmessage = function(e) {
  e = JSON.parse(e['data']);
  console.log(e);
  if(e.length == 0){
    return
  }
}

Right now these two pieces will just stream our content to the console. Within the js piece we could be outputting to the DOM instead. When I implemented that, I did so by rendering a template in the index view and then copying and inserting into that template in my js.

Now how do we stop the stream? In my case, I’m using turbolinks so the usual window events I’d rely on are obscured by that library’s own events. So I listen to page:fetch instead of onunload:

1
2
3
4
5
6
document.addEventListener("page:fetch", function() {
  if (typeof source !== 'undefined') {
    source.close();
    console.log('source closed');
  }
});

Here’s where the real weakness of this approach becomes clear. What if I interact with the site such that I don’t trigger page:fetch and close the stream? Say I want to performance test using ab or hit the page using curl? Well, streams will be created and never get cleaned up.

Even if we weren’t worried about these streams getting cleaned up this suggests another problem: what if we exhaust the thread pool given to us by puma? Now that we know it’s possible to create streams and not clean them up this should really worry us, because it becomes easily possible to create a bunch of streams no one is listening to that prevent us from creating new streams that would have an audience.

Even if that weren’t true, you can see the threads get exhausted very easily. If you use the above code so far and open the streaming page in 9 tabs you’ll see yourself waiting for a socket. As soon as another listener drops off, you’ll connect. We need some way to “cross the streams” as the Ghostbusters would say.

For my use case of truly live, non-ActiveRecord data, I was stuck: 1. because I needed to parameterize my streams in such a way that they could not be merged; 2. because my data source was such that I couldn’t tag each event in the originating stream such that it could be fanned out into distinct downstream channels. In other words there was no way to get rid of my loop or even just make it a single loop.

As far as I can tell based on existing articles/tutorials on this feature, the key to using this in practice is mainly to hook into the ActiveRecord lifecycle. Why? You have to get rid of the loop do somehow. From what I can see, those that do use SSE in Rails mainly use it via observers on models. Basically we’re giving up actual continuous streaming for discrete push events based on lifecycle events.

For the discrete pushes use case, I actually think Rails SSE is pretty nice–nice as in simple, easy. For that you can use sse-rails or rails-sse-engine. Alternatively, for the same use case but with a different implementation (you have to use faye or pusher with this) you could use render_sync.

Nowadays though I think the community is probably just moving on to ActionCable in Rails 5. Or if not that, websockets-rails or the options detailed here. That said, for my own use case, I think it probably would’ve been fine to just pull from the client side on a 2 second timer rather than push from the server on a 2 second timer. The nice thing about that is with concurrent users I can mostly hit a cache if my live data is updating less often than it is being requested.