Skip to content

Added output configurations. Related to #3170 #3543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[io] object child_process {

var env: js.UndefOr[js.Dictionary[String]] = js.undefined

var stdio: js.UndefOr[js.Any] = js.undefined
}

@js.native
Expand Down
57 changes: 47 additions & 10 deletions io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,42 @@ private[process] trait ProcessesCompanionPlatform {
def spawn(process: ProcessBuilder): Resource[F, Process[F]] =
Resource {
F.async_[(Process[F], F[Unit])] { cb =>
val spawnOptions = new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
}

val stdinOpt: js.Any = process.outputConfig.stdin match {
case StreamRedirect.Inherit => "inherit"
case StreamRedirect.Discard => "ignore"
case StreamRedirect.File(path) => "pipe"
case StreamRedirect.Pipe =>
}

val stdoutOpt: js.Any = process.outputConfig.stdout match {
case StreamRedirect.Inherit => "inherit"
case StreamRedirect.Discard => "ignore"
case StreamRedirect.File(path) => "pipe"
case StreamRedirect.Pipe =>
}

val stderrOpt: js.Any = process.outputConfig.stderr match {
case StreamRedirect.Inherit => "inherit"
case StreamRedirect.Discard => "ignore"
case StreamRedirect.File(path) => "pipe"
case StreamRedirect.Pipe =>
}

spawnOptions.stdio = js.Array(stdinOpt, stdoutOpt, stderrOpt)

val childProcess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
}
spawnOptions
)

val fs2Process = new UnsealedProcess[F] {
Expand All @@ -72,9 +97,21 @@ private[process] trait ProcessesCompanionPlatform {

def stdin = writeWritable(F.delay(childProcess.stdin))

def stdout = unsafeReadReadable(childProcess.stdout)
def stdout =
if (process.outputConfig.stdout == StreamRedirect.Pipe)
unsafeReadReadable(childProcess.stdout)
else
Stream.empty

def stderr =
if (process.outputConfig.stderr == StreamRedirect.Pipe)
unsafeReadReadable(childProcess.stderr)
else
Stream.empty

def mergedOutput: Stream[F, Byte] =
stdout.merge(stderr)

def stderr = unsafeReadReadable(childProcess.stderr)
}

val finalize = F.asyncCheckAttempt[Unit] { cb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import cats.effect.kernel.Resource
import cats.syntax.all.*
import fs2.io.CollectionCompat.*

import java.lang
import java.lang.ProcessBuilder.Redirect

private[process] trait ProcessesCompanionPlatform {
def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] {
Expand All @@ -37,7 +37,7 @@ private[process] trait ProcessesCompanionPlatform {
Resource
.make {
F.blocking {
val builder = new lang.ProcessBuilder((process.command :: process.args).asJava)
val builder = new java.lang.ProcessBuilder((process.command :: process.args).asJava)

process.workingDirectory.foreach { path =>
builder.directory(path.toNioPath.toFile)
Expand All @@ -49,6 +49,30 @@ private[process] trait ProcessesCompanionPlatform {
env.put(k, v)
}

process.outputConfig.stdin match {
case StreamRedirect.Inherit => builder.redirectInput(Redirect.INHERIT)
case StreamRedirect.Discard => builder.redirectInput(Redirect.DISCARD)
case StreamRedirect.File(path) =>
builder.redirectInput(Redirect.from(path.toNioPath.toFile))
case StreamRedirect.Pipe =>
}

process.outputConfig.stdout match {
case StreamRedirect.Inherit => builder.redirectOutput(Redirect.INHERIT)
case StreamRedirect.Discard => builder.redirectOutput(Redirect.DISCARD)
case StreamRedirect.File(path) =>
builder.redirectOutput(Redirect.to(path.toNioPath.toFile))
case StreamRedirect.Pipe =>
}

process.outputConfig.stderr match {
case StreamRedirect.Inherit => builder.redirectError(Redirect.INHERIT)
case StreamRedirect.Discard => builder.redirectError(Redirect.DISCARD)
case StreamRedirect.File(path) =>
builder.redirectError(Redirect.to(path.toNioPath.toFile))
case StreamRedirect.Pipe =>
}

builder.start()
}
} { process =>
Expand Down Expand Up @@ -89,7 +113,6 @@ private[process] trait ProcessesCompanionPlatform {
F.blocking(process.destroy()),
8192
)

}
}
}
Expand Down
41 changes: 38 additions & 3 deletions io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ sealed abstract class ProcessBuilder private {
*/
def workingDirectory: Option[Path]

/** Configures how stdout and stderr should be handled. */
def outputConfig: ProcessOutputConfig

/** @see [[command]] */
def withCommand(command: String): ProcessBuilder

Expand All @@ -67,17 +70,45 @@ sealed abstract class ProcessBuilder private {
/** @see [[workingDirectory]] */
def withCurrentWorkingDirectory: ProcessBuilder

/** @see [[outputMode]] */
def withOutputConfig(outputConfig: ProcessOutputConfig): ProcessBuilder
Comment on lines +73 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think separating it out into three config methods for stdin/stdout/stderr (like the JDK process builder) will make the API easier to use.


/* @param mode The mode for handling stdin
*/
def redirectInput(mode: StreamRedirect): ProcessBuilder =
withOutputConfig(outputConfig.copy(stdin = mode))

def redirectOutput(mode: StreamRedirect): ProcessBuilder =
withOutputConfig(outputConfig.copy(stdout = mode))

def redirectError(mode: StreamRedirect): ProcessBuilder =
withOutputConfig(outputConfig.copy(stderr = mode))

/** Starts the process and returns a handle for interacting with it.
* Closing the resource will kill the process if it has not already terminated.
*/
final def spawn[F[_]: Processes]: Resource[F, Process[F]] =
Processes[F].spawn(this)
}

sealed abstract class StreamRedirect
object StreamRedirect {
case object Pipe extends StreamRedirect
case object Inherit extends StreamRedirect
case object Discard extends StreamRedirect
final case class File(path: Path) extends StreamRedirect
}

final case class ProcessOutputConfig(
stdin: StreamRedirect = StreamRedirect.Pipe,
stdout: StreamRedirect = StreamRedirect.Pipe,
stderr: StreamRedirect = StreamRedirect.Pipe
)

object ProcessBuilder {

def apply(command: String, args: List[String]): ProcessBuilder =
ProcessBuilderImpl(command, args, true, Map.empty, None)
ProcessBuilderImpl(command, args, true, Map.empty, None, ProcessOutputConfig())

def apply(command: String, args: String*): ProcessBuilder =
apply(command, args.toList)
Expand All @@ -87,7 +118,8 @@ object ProcessBuilder {
args: List[String],
inheritEnv: Boolean,
extraEnv: Map[String, String],
workingDirectory: Option[Path]
workingDirectory: Option[Path],
outputConfig: ProcessOutputConfig
) extends ProcessBuilder {

def withCommand(command: String): ProcessBuilder = copy(command = command)
Expand All @@ -100,7 +132,10 @@ object ProcessBuilder {

def withWorkingDirectory(workingDirectory: Path): ProcessBuilder =
copy(workingDirectory = Some(workingDirectory))

def withCurrentWorkingDirectory: ProcessBuilder = copy(workingDirectory = None)
}

def withOutputConfig(outputConfig: ProcessOutputConfig): ProcessBuilder =
copy(outputConfig = outputConfig)
}
}
1 change: 0 additions & 1 deletion io/shared/src/main/scala/fs2/io/process/Processes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import cats.effect.IO
import cats.effect.LiftIO
import cats.effect.kernel.Async
import cats.effect.kernel.Resource

sealed trait Processes[F[_]] {

def spawn(process: ProcessBuilder): Resource[F, Process[F]]
Expand Down
55 changes: 54 additions & 1 deletion io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class ProcessSuite extends Fs2IoSuite {
"node",
"-e",
"console.log('good day stdout'); console.error('how do you do stderr')"
).spawn[IO]
)
.withOutputConfig(ProcessOutputConfig())
.spawn[IO]
.use { p =>
val testOut = p.stdout
.through(fs2.text.utf8.decode)
Expand All @@ -72,6 +74,57 @@ class ProcessSuite extends Fs2IoSuite {
}
}

test("merged stdout and stderr") {
ProcessBuilder("node", "-e", "console.log('merged stdout'); console.error('merged stderr')")
.withOutputConfig(
ProcessOutputConfig(stdout = StreamRedirect.Pipe, stderr = StreamRedirect.Pipe)
)
.spawn[IO]
.use { p =>
p.stdout
.through(fs2.text.utf8.decode)
.compile
.string
.assert(s => s.contains("merged stdout") && s.contains("merged stderr"))
}
}

test("file output") {
Files[IO].tempFile.use { path =>
ProcessBuilder("echo", "file output test")
.withOutputConfig(ProcessOutputConfig(stdout = StreamRedirect.File(path)))
.spawn[IO]
.use(_.exitValue)
.assertEquals(0) *>
Files[IO].readUtf8(path).compile.string.assertEquals("file output test\n")
}
}

test("ignored output") {
ProcessBuilder("echo", "ignored output")
.withOutputConfig(ProcessOutputConfig(stdout = StreamRedirect.Discard))
.spawn[IO]
.use(_.exitValue)
.assertEquals(0)
}

test("stdin piping") {
ProcessBuilder("cat")
.withOutputConfig(ProcessOutputConfig(stdin = StreamRedirect.Pipe))
.spawn[IO]
.use { p =>
val input = Stream
.emit("piped input test")
.through(fs2.text.utf8.encode)
.through(p.stdin)
.compile
.drain
val output =
p.stdout.through(fs2.text.utf8.decode).compile.string.assertEquals("piped input test")
input *> output
}
}

if (!isNative)
test("cat") {
ProcessBuilder("cat").spawn[IO].use { p =>
Expand Down
Loading