Skip to content

Add resource management & exception handlers for folds #3011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/Streamly/Internal/Data/Fold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ module Streamly.Internal.Data.Fold
, module Streamly.Internal.Data.Fold.Combinators
, module Streamly.Internal.Data.Fold.Container
, module Streamly.Internal.Data.Fold.Window
, module Streamly.Internal.Data.Fold.Exception
)
where

import Streamly.Internal.Data.Fold.Combinators
import Streamly.Internal.Data.Fold.Container
import Streamly.Internal.Data.Fold.Exception
import Streamly.Internal.Data.Fold.Tee
import Streamly.Internal.Data.Fold.Type
import Streamly.Internal.Data.Fold.Window
Expand Down
199 changes: 199 additions & 0 deletions core/src/Streamly/Internal/Data/Fold/Exception.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
-- |
-- Module : Streamly.Internal.Data.Fold.Exception
-- Copyright : (c) 2025 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Fold.Exception
(
-- * Resources
afterIO
, before
, bracketIO

-- * Exceptions
, onException
)
where

------------------------------------------------------------------------------
-- Imports
------------------------------------------------------------------------------

import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Catch (MonadCatch)
import Streamly.Internal.Data.IOFinalizer (newIOFinalizer, runIOFinalizer)

import qualified Control.Monad.Catch as MC

import Streamly.Internal.Data.Fold.Step
import Streamly.Internal.Data.Fold.Type

------------------------------------------------------------------------------
-- Exceptions
------------------------------------------------------------------------------

{-

-- | Exception handling states of a fold
data HandleExc s f1 f2 = InitDone !s | InitFailed !f1 | StepFailed !f2

-- | @handle initHandler stepHandler fold@ produces a new fold from a given
-- fold. The new fold executes the original @fold@, if an exception occurs
-- when initializing the fold then @initHandler@ is executed and fold resulting
-- from that starts execution. If an exception occurs while executing the
-- @step@ function of a fold then the @stephandler@ is executed and we start
-- executing the fold resulting from that.
--
-- The exception is caught and handled, not rethrown. If the exception handler
-- itself throws an exception that exception is thrown.
--
-- /Internal/
--
{-# INLINE handle #-}
handle :: (MonadCatch m, Exception e)
=> (e -> m (Fold m a b))
-> (e -> Fold m a b -> m (Fold m a b))
-> Fold m a b
-> Fold m a b
handle initH stepH (Fold step1 initial1 extract1) = Fold step initial extract

where

initial = fmap InitDone initial1 `MC.catch` (fmap InitFailed . initH)

step (InitDone s) a =
let f = Fold step1 (return s) extract1
in fmap InitDone (step1 s a)
`MC.catch` (\e -> fmap StepFailed (stepH e f))
step (InitFailed (Fold step2 initial2 extract2)) a = do
s <- initial2
s1 <- step2 s a
return $ InitFailed $ Fold step2 (return s1) extract2
step (StepFailed (Fold step2 initial2 extract2)) a = do
s <- initial2
s1 <- step2 s a
return $ StepFailed $ Fold step2 (return s1) extract2

extract (InitDone s) = extract1 s
extract (InitFailed (Fold _ initial2 extract2)) = initial2 >>= extract2
extract (StepFailed (Fold _ initial2 extract2)) = initial2 >>= extract2

-}

-- | @onException action fold@ runs @action@ whenever the fold throws an
-- exception. The action is executed on any exception whether it is in
-- initial, step or extract action of the fold.
--
-- The exception is not caught, simply rethrown. If the @action@ itself
-- throws an exception that exception is thrown instead of the original
-- exception.
--
-- /Internal/
--
{-# INLINE onException #-}
onException :: MonadCatch m => m x -> Fold m a b -> Fold m a b
onException action (Fold step1 initial1 extract1 final1) =
Fold step initial extract final

where

initial = initial1 `MC.onException` action
step s a = step1 s a `MC.onException` action
extract s = extract1 s `MC.onException` action
final s = final1 s `MC.onException` action

-- | @bracketIO before after between@ runs @before@ and invokes @between@ using
-- its output, then runs the fold generated by @between@. If the fold ends
-- normally, due to an exception or if it is garbage collected prematurely then
-- @after@ is run with the output of @before@ as argument.
--
-- If @before@ or @after@ throw an exception that exception is thrown.
--
{-# INLINE bracketIO #-}
bracketIO :: (MonadIO m, MonadCatch m)
=> IO x -> (x -> IO c) -> (x -> Fold m a b) -> Fold m a b
bracketIO bef aft bet = Fold step initial extract final

where

initial = do
r <- liftIO $ bef
ref <- liftIO $ newIOFinalizer (aft r)
case bet r of
Fold step1 initial1 extract1 final1 -> do
res <- initial1 `MC.onException` liftIO (runIOFinalizer ref)
case res of
Partial s -> do
let fld1 = Fold step1 (pure (Partial s)) extract1 final1
pure $ Partial $ Tuple' ref fld1
Done b -> do
liftIO $ runIOFinalizer ref
pure $ Done b

step (Tuple' ref (Fold step1 initial1 extract1 final1)) a = do
res <- initial1
case res of
Partial s -> do
s1 <- step1 s a `MC.onException` liftIO (runIOFinalizer ref)
let fld1 = Fold step1 (pure s1) extract1 final1
pure $ Partial $ Tuple' ref fld1
Done b -> do
liftIO $ runIOFinalizer ref
pure $ Done b

extract (Tuple' ref (Fold _ initial1 extract1 _)) = do
res <- initial1
case res of
Partial s -> extract1 s `MC.onException` liftIO (runIOFinalizer ref)
Done b -> pure b

final (Tuple' ref (Fold _ initial1 _ final1)) = do
res <- initial1
case res of
Partial s -> do
val <- final1 s `MC.onException` liftIO (runIOFinalizer ref)
runIOFinalizer ref
pure val
Done b -> pure b

-- | Run a side effect whenever the fold stops normally, aborts due to an
-- exception or is garbage collected.
--
{-# INLINE afterIO #-}
afterIO :: (MonadIO m, MonadCatch m) => IO b -> Fold m a b -> Fold m a b
afterIO aft (Fold step1 initial1 extract1 final1) =
Fold step initial extract final

where

initial = do
ref <- liftIO $ newIOFinalizer aft
res <- initial1 `MC.onException` liftIO (runIOFinalizer ref)
pure $ case res of
Done b -> Done b
Partial s -> Partial $ Tuple' ref s

step (Tuple' ref s) a = do
res <- step1 s a `MC.onException` liftIO (runIOFinalizer ref)
pure $ case res of
Done b -> Done b
Partial s1 -> Partial $ Tuple' ref s1

extract (Tuple' ref s) =
extract1 s `MC.onException` liftIO (runIOFinalizer ref)

final (Tuple' ref s) = do
res <- final1 s `MC.onException` liftIO (runIOFinalizer ref)
liftIO $ runIOFinalizer ref
pure res


-- | Run a side effect before the initialization of the fold.
--
{-# INLINE before #-}
before :: Monad m => m x -> Fold m a b -> Fold m a b
before effect (Fold s i e f) = Fold s (effect *> i) e f
1 change: 1 addition & 0 deletions core/streamly-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ library
, Streamly.Internal.Data.Fold.Type
, Streamly.Internal.Data.Fold.Combinators
, Streamly.Internal.Data.Fold.Container
, Streamly.Internal.Data.Fold.Exception
, Streamly.Internal.Data.Fold.Tee
, Streamly.Internal.Data.Fold.Window

Expand Down
Loading