Why `streaming` Is My Favourite Haskell Streaming Library

Posted on April 13, 2024 by Jack Kelly
Tags: coding, haskell

It’s really easy to misuse lazy I/O (e.g., hGetContents) in nontrivial Haskell programs. You can accidentally close a Handle before the computation which reads from it has been forced, and it’s hard to predict exactly when data will be produced or consumed by IO actions. Streaming libraries in Haskell avoid these problems by explicitly interleaving the yielding of data and execution of effects, as well as helping control the memory usage of a program by limiting the amount of data “in flight”.

A number of veteran Haskellers have built streaming libraries, and off the top of my head I’m aware of conduit, io-streams, iteratee, machines, pipes, streaming, and streamly. Of those, I think conduit, pipes, streaming, and streamly are the most commonly used ones today. It can be hard to know which library to choose when there’s so many options, so here is my heuristic:

  1. If you’re doing simple streaming (e.g., from a network connection straight into a file), use whatever your library uses (usually conduit); or
  2. If you’re doing anything more complicated, or you’re doing greenfield work, use streaming.

I’ll explain why after the jump.

The Shortlist

Pipes

From what I can tell, pipes and conduit are the most common industrial streaming libraries. pipes is beautiful and elegant, but despite its very extensive tutorial (complete with beautiful diagrams and composition laws), I have never been able to successfully write a nontrivial program using it. I think it is too complicated, and its author regrets the complexity of pipes’s core type:

I firmly subscribe to the principle of least power which says that you should use the simplest type or abstraction available that gets the job done instead of trying to shoehorn everything into the same “god type” or “god abstraction”. I learned this the hard way when I tried to shoehorn everything into my pipes package and realized that it was a huge mistake, so it’s not like I’m innocent in this regard. Don’t make the same mistake I did.

There are many attractive tools in the pipes ecosystem, but they all seem to use pretty high-powered type signatures that even now I struggle to follow. Example: to re-chunk a Producer using pipes-group, you use the chunksOf function, which has this type signature:

chunksOf ::
  Monad m =>
  -- | Chunk size
  Int ->
  Lens
    (Producer a' m x)
    (Producer a m x)
    (FreeT (Producer a' m) m x)
    (FreeT (Producer a m) m x)

I’ve been doing Haskell professionally for several years, and I still find that signature a little intimidating. I can see why it’s a Lens (it bundles the chunking/unchunking operations together), but if we’re doing optics, why isn’t it an Iso? I also get the vibe of why it’s using FreeT, but have zero muscle memory when it comes to working with that type.

Conduit

conduit, on the other hand, feels much more rough-and-ready, with additional concepts like “leftovers” adding incidental complexity to the design. Many libraries for common tasks like HTTP request/response use conduit, so it has a strong gravitational pull. For simple applications, it’s often easier to stick with conduit and not worry about converting to/from some “better” streaming library.

But every time I try to do something non-trivial with conduit, I feel like the API can do everything except solve my problem. I think this is partially due to the design of its core type: the ConduitT monad transformer, which lets you build computations which are part of a streaming pipeline:

data ConduitT i o m r
--            | | | |
--            | | | `- Final result type
--            | | `--- Monad in which it can perform actions
--            | `----- Output type (yielded downstream)
--            `------- Input type (awaited from upstream)

This design decision (which I think is inspired by a similar type in pipes) leads to (IMHO) an unfortunate splitting of the interface: I can never figure out whether a function should be implemented as its own conduit (and connected as a subsequent stage in the pipeline), or as a function which transforms streams.

A challenge for conduit wizards: write a function that can rechunk a ConduitT i ByteString m r into yielding fixed-length ByteString substreams, without buffering. I want to stream the data as I receive it, and only split a ByteString if it spans a chunk boundary. The amount of data streamed out for each chunk except the final one should be equal to a chunk size argument, but the final chunk might necessarily be smaller. I imagine that the type I’d want is a function to turn the conduit into a conduit-that-yields-conduits, which can be consumed piecemeal:

-- Possible type signature?
rechunk ::
  ConduitT i ByteString m r ->
  ConduitT i (ConduitT () ByteString m ()) m r

But I really have no idea how to do this with conduit or pipes, while streaming-bytestring can easily do it by repeatedly applying splitAt.

Streamly

streamly is a relatively new library in this space, and a very ambitious project. It makes really bold performance claims and uses aggressive under-the-hood concurrency to try and achieve them. It also provides a broad, unified toolbox of data streaming/concurrency/folding abstractions. I haven’t used it since a change in its file-watching functions broke a work program during a minor version update, leading us to remove it. That makes me shy away from it, but that was also a few years ago. It’s had a fresh release this year and is probably worth keeping an eye on.

Streaming

The streaming package has a radically simpler design than any of the others we’ve discussed so far. It uses two core types, Stream and Of:

-- f: The functor which contains the stream elements
-- m: The monad in which actions occur
-- r: The type of the final result
data Stream f m r = Step !(f (Stream f m r))
                  | Effect (m (Stream f m r))
                  | Return r

data Of a b = !a :> b deriving Functor

The most common instantiation you see for a Stream is f ~ Of a, giving Stream (Of a) m r. A simple example might be a stream that yields the integers 1, 2, 3 in order:

-- This is spelled out for clarity.
-- There are more concise ways to write this stream.
oneTwoThree :: Stream (Of Int) m ()
oneTwoThree =
  Step (1 :> Step (2 :> Step (3 :> Return ())))

The strictness annotations on the Step constructor and the left side of the Of constructor ensure that stream elements will be forced to WHNF as soon as they’re yielded. The payoff from making the f parameter to Stream an arbitrary Functor is that you can use more complicated structures as “stream elements” to achieve things like chunking with perfect streaming. Compare streaming’s chunksOf to the one from pipes-group above:

chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r

So if f ~ Of a, chunksOf 3 would turn a Stream (Of a) m r into a Stream (Stream (Of a) m) m r — a stream of streams, which is exactly the behaviour we want.

Because streaming doesn’t have the pipes/conduit concept of “horizontal composition”, it can unify “connecting a consumer” with “transforming the stream” — both are standard function application.

streaming has one major wart, and I think it’s a forgivable one: It has a separate type for streaming ByteStrings. The Streaming.ByteString.ByteStream type from streaming-bytestring is morally Stream (Of ByteString) m r, but with the ByteStrings {-# UNPACK #-}-ed directly into the data structure. This is such a common special case that I think it’s worth the specialisation.

Other Tricks With The Functor Parameter

I’ve praised simplicity a lot in this post, so now it’s time to really justify that f parameter to Stream. It would be much easier to bake the item directly into the Step constructor, but the additional f adds a lot of power. We’ve seen it used in chunksOf already, but it’s possible to do even more. I’ve been working on a decompressor for an old computer game, and streaming allowed me to say some pretty useful things:

-- | Attempt to parse the header records from the start of a byte
-- stream. On success, return the remainder of the stream.
decodeHeader ::
  Monad m =>
  ByteStream m r ->
  m (Either String (Header, ByteStream m r))

-- | Break up and decompress the input stream by zipping it with the
-- list of records from the header. You can process each byte stream
-- by using @Streaming.'Streaming.mapsM_'@.
decompressAll ::
  MonadIO m =>
  Header ->
  ByteStream m r ->
  Stream (Compose (Of Record) (ByteStream m)) m r

Let’s look into that Stream (Compose (Of Record) (ByteStream m)) m r type, and consider what we’ll actually have in the Step constructor of the result Stream:

Let me repeat that last part: as the stream iterates through the records, it yields the Record it’s decompressing, as well as the corresponding uncompressed ByteStream, and you must reach the end of the ByteStream before you can start on the next Record. This makes it very hard to accidentally buffer one record before starting on the next, and since I can’t even figure out how to do something as simple as a perfect rechunking of a ConduitT, I have no idea how you’d do this in pipes or conduit. Maybe I’m not just not smart enough.

streaming seems to make the easy jobs easy and the hard jobs possible, which is why it’s the one I reach for by default.

Previous Post
All Posts | RSS | Atom
Next Post
Copyright © 2024 Jack Kelly
Site generated by Hakyll (source)