It’s really easy to misuse lazy I/O (e.g., hGetContents
)
in nontrivial Haskell programs. You can accidentally close a
Handle
before the computation which reads from it has been
forced, and it’s hard to predict exactly when data will be produced or
consumed by IO
actions. Streaming libraries in Haskell
avoid these problems by explicitly interleaving the yielding of data and
execution of effects, as well as helping control the memory usage of a
program by limiting the amount of data “in flight”.
A number of veteran Haskellers have built streaming libraries, and
off the top of my head I’m aware of conduit
,
io-streams
,
iteratee
,
machines
,
pipes
,
streaming
,
and streamly
.
Of those, I think conduit
, pipes
,
streaming
, and streamly
are the most commonly
used ones today. It can be hard to know which library to choose when
there’s so many options, so here is my heuristic:
conduit
); orstreaming
.I’ll explain why after the jump.
From what I can tell, pipes
and conduit
are
the most common industrial streaming libraries. pipes
is
beautiful and elegant, but despite its very
extensive tutorial (complete with beautiful diagrams and composition
laws), I have never been able to successfully write a nontrivial program
using it. I think it is too complicated, and its
author regrets the complexity of pipes
’s core type:
I firmly subscribe to the principle of least power which says that you should use the simplest type or abstraction available that gets the job done instead of trying to shoehorn everything into the same “god type” or “god abstraction”. I learned this the hard way when I tried to shoehorn everything into my pipes package and realized that it was a huge mistake, so it’s not like I’m innocent in this regard. Don’t make the same mistake I did.
There are many attractive tools in the pipes
ecosystem,
but they all seem to use pretty high-powered type signatures that even
now I struggle to follow. Example: to re-chunk a Producer
using pipes-group
, you use the chunksOf
function, which has this type signature:
chunksOf ::
Monad m =>
-- | Chunk size
Int ->
Lens
Producer a' m x)
(Producer a m x)
(FreeT (Producer a' m) m x)
(FreeT (Producer a m) m x) (
I’ve been doing Haskell professionally for several years, and I still
find that signature a little intimidating. I can see why it’s a
Lens
(it bundles the chunking/unchunking operations
together), but if we’re doing optics, why isn’t it an Iso
?
I also get the vibe of why it’s using FreeT
,
but have zero muscle memory when it comes to working with that type.
conduit
, on the other hand, feels much more
rough-and-ready, with additional concepts like “leftovers” adding
incidental complexity to the design. Many libraries for common tasks
like HTTP request/response use conduit
, so it has a strong
gravitational pull. For simple applications, it’s often easier to stick
with conduit
and not worry about converting to/from some
“better” streaming library.
But every time I try to do something non-trivial with
conduit
, I feel like the API can do everything except solve
my problem. I think this is partially due to the design of its core
type: the ConduitT
monad transformer, which lets you build
computations which are part of a streaming pipeline:
data ConduitT i o m r
-- | | | |
-- | | | `- Final result type
-- | | `--- Monad in which it can perform actions
-- | `----- Output type (yielded downstream)
-- `------- Input type (awaited from upstream)
This design decision (which I think is inspired by
a similar type in pipes
) leads to (IMHO) an unfortunate
splitting of the interface: I can never figure out whether a function
should be implemented as its own conduit (and connected as a subsequent
stage in the pipeline), or as a function which transforms streams.
A challenge for conduit
wizards: write a function that
can rechunk a ConduitT i ByteString m r
into yielding
fixed-length ByteString
substreams, without buffering. I
want to stream the data as I receive it, and only split a
ByteString
if it spans a chunk boundary. The amount of data
streamed out for each chunk except the final one should be equal to a
chunk size argument, but the final chunk might necessarily be smaller. I
imagine that the type I’d want is a function to turn the conduit into a
conduit-that-yields-conduits, which can be consumed piecemeal:
-- Possible type signature?
rechunk ::
ConduitT i ByteString m r ->
ConduitT i (ConduitT () ByteString m ()) m r
But I really have no idea how to do this with conduit
or
pipes
, while streaming-bytestring
can easily do it by repeatedly applying splitAt
.
streamly
is a relatively new library in this space, and a very ambitious project.
It makes really bold performance claims and uses aggressive
under-the-hood concurrency to try and achieve them. It also provides a
broad, unified toolbox of data streaming/concurrency/folding
abstractions. I haven’t used it since a change in its file-watching
functions broke a work program during a minor version update, leading us
to remove it. That makes me shy away from it, but that was also a few
years ago. It’s had a fresh release this year and is probably worth
keeping an eye on.
The streaming
package has a radically simpler design
than any of the others we’ve discussed so far. It uses two core types,
Stream
and Of
:
-- f: The functor which contains the stream elements
-- m: The monad in which actions occur
-- r: The type of the final result
data Stream f m r = Step !(f (Stream f m r))
| Effect (m (Stream f m r))
| Return r
data Of a b = !a :> b deriving Functor
The most common instantiation you see for a Stream
is
f ~ Of a
, giving Stream (Of a) m r
. A simple
example might be a stream that yields the integers 1
,
2
, 3
in order:
-- This is spelled out for clarity.
-- There are more concise ways to write this stream.
oneTwoThree :: Stream (Of Int) m ()
=
oneTwoThree Step (1 :> Step (2 :> Step (3 :> Return ())))
The strictness annotations on the Step
constructor and
the left side of the Of
constructor ensure that stream
elements will be forced to WHNF as soon as they’re yielded. The payoff
from making the f
parameter to Stream
an
arbitrary Functor
is that you can use more complicated
structures as “stream elements” to achieve things like chunking with
perfect streaming. Compare streaming
’s chunksOf
to the one from pipes-group
above:
chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r
So if f ~ Of a
, chunksOf 3
would turn a
Stream (Of a) m r
into a
Stream (Stream (Of a) m) m r
— a stream of streams, which
is exactly the behaviour we want.
Because streaming
doesn’t have the
pipes
/conduit
concept of “horizontal
composition”, it can unify “connecting a consumer” with “transforming
the stream” — both are standard function application.
streaming
has one major wart, and I think it’s a
forgivable one: It has a separate type for streaming
ByteString
s. The Streaming.ByteString.ByteStream
type from streaming-bytestring
is morally Stream (Of ByteString) m r
, but with the
ByteString
s {-# UNPACK #-}
-ed directly into
the data structure. This is such a common special case that I think it’s
worth the specialisation.
Functor
ParameterI’ve praised simplicity a lot in this post, so now it’s time to
really justify that f
parameter to Stream
. It
would be much easier to bake the item directly into the
Step
constructor, but the additional f
adds a
lot of power. We’ve seen it used in chunksOf
already, but
it’s possible to do even more. I’ve been working on a decompressor
for an old computer game, and streaming
allowed me to
say some pretty useful things:
-- | Attempt to parse the header records from the start of a byte
-- stream. On success, return the remainder of the stream.
decodeHeader ::
Monad m =>
ByteStream m r ->
Either String (Header, ByteStream m r))
m (
-- | Break up and decompress the input stream by zipping it with the
-- list of records from the header. You can process each byte stream
-- by using @Streaming.'Streaming.mapsM_'@.
decompressAll ::
MonadIO m =>
Header ->
ByteStream m r ->
Stream (Compose (Of Record) (ByteStream m)) m r
Let’s look into that
Stream (Compose (Of Record) (ByteStream m)) m r
type, and
consider what we’ll actually have in the Step
constructor
of the result Stream
:
Step !(f (Stream f m r))
is how the constructor is
declared. Let’s call the argument to f
RestOfStream
to keep the noise down;
f
is
(Compose (Of Record) (ByteStream m))
, so we’re holding a
(Compose (Of Record) (ByteStream m)) RestOfStream
;
Compose
is from Data.Functor.Compose
:
newtype Compose f g a = Compose { getCompose :: f (g a)}
Expanding the newtype
shows that we’re holding something
representationally equal to
Of Record (ByteStream m RestOfStream)
;
Of
is the left-strict pair, so we get the
Record
from the archive (strictly) alongside a (lazy)
ByteStream
corresponding to its data. At the end of that
ByteStream
, we get the next record from the archive, and so
on until we stream out the entire file.
Let me repeat that last part: as the stream iterates through the
records, it yields the Record
it’s decompressing, as well
as the corresponding uncompressed ByteStream
, and you
must reach the end of the ByteStream
before you can start
on the next Record
. This makes it very hard to
accidentally buffer one record before starting on the next, and since I
can’t even figure out how to do something as simple as a perfect
rechunking of a ConduitT
, I have no idea how you’d do this
in pipes
or conduit
. Maybe I’m not just not
smart enough.
streaming
seems to make the easy jobs easy and the hard
jobs possible, which is why it’s the one I reach for by default.