-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Picos compatible direct style interface to Lwt
- Loading branch information
Showing
10 changed files
with
231 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
(library | ||
(name picos_lwt) | ||
(public_name picos.lwt) | ||
(enabled_if | ||
(>= %{ocaml_version} 5.0.0)) | ||
(libraries | ||
(re_export picos) | ||
(re_export lwt))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
module type Sleep = sig | ||
(** Minimal signature for an implementation of {!sleep} using {!Lwt}. *) | ||
|
||
val sleep : float -> unit Lwt.t | ||
(** [sleep seconds] should return a cancelable promise that resolves after | ||
given number of [seconds] (unless canceled). *) | ||
end | ||
|
||
module type S = sig | ||
(** Direct style {!Picos} compatible interface to {!Lwt}. *) | ||
|
||
val run : forbid:bool -> (unit -> 'a) -> 'a Lwt.t | ||
(** [run ~forbid main] runs the [main] program implemented in {!Picos} as a | ||
promise with {!Lwt} as the scheduler. In other words, the [main] program | ||
will be run as a {!Lwt} promise or fiber. *) | ||
|
||
val await : (unit -> 'a Lwt.t) -> 'a | ||
(** [await thunk] awaits for the promise returned by [thunk ()] to resolve and | ||
returns the result. This should only be called from inside a fiber | ||
running inside {!start}. *) | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
include Intf | ||
|
||
module Make (Sleep : Sleep) : S = struct | ||
open Picos | ||
open Lwt.Infix | ||
|
||
let[@alert "-handler"] rec run : | ||
type a r. | ||
Fiber.t -> | ||
(a, r) Effect.Shallow.continuation -> | ||
(a, Exn_bt.t) Result.t -> | ||
r Lwt.t = | ||
fun fiber k v -> | ||
let effc (type a) : | ||
a Effect.t -> ((a, _) Effect.Shallow.continuation -> _) option = | ||
function | ||
| Fiber.Current -> Some (fun k -> run fiber k (Ok fiber)) | ||
| Fiber.Spawn r -> | ||
Some | ||
(fun k -> | ||
match Fiber.canceled fiber with | ||
| None -> | ||
List.iter | ||
(fun main -> | ||
let fiber = Fiber.create ~forbid:r.forbid r.computation in | ||
Lwt.async @@ fun () -> | ||
run fiber (Effect.Shallow.fiber main) (Ok ())) | ||
r.mains; | ||
run fiber k (Ok ()) | ||
| Some exn_bt -> run fiber k (Error exn_bt)) | ||
| Fiber.Yield -> | ||
Some | ||
(fun k -> | ||
match Fiber.canceled fiber with | ||
| None -> Lwt.pause () >>= fun () -> run fiber k (Ok ()) | ||
| Some exn_bt -> run fiber k (Error exn_bt)) | ||
| Computation.Cancel_after r -> | ||
Some | ||
(fun k -> | ||
match Fiber.canceled fiber with | ||
| None -> | ||
let sleep = | ||
Sleep.sleep r.seconds >>= fun () -> | ||
Computation.cancel r.computation r.exn_bt; | ||
Lwt.return_unit | ||
in | ||
let canceler = | ||
Trigger.from_action sleep () @@ fun _ sleep _ -> | ||
Lwt.cancel sleep | ||
in | ||
if Computation.try_attach r.computation canceler then | ||
Lwt.async @@ fun () -> sleep | ||
else Trigger.signal canceler; | ||
run fiber k (Ok ()) | ||
| Some exn_bt -> run fiber k (Error exn_bt)) | ||
| Trigger.Await trigger -> | ||
Some | ||
(fun k -> | ||
let promise, resolver = Lwt.wait () in | ||
let resume _trigger resolver _ = Lwt.wakeup resolver () in | ||
if Fiber.try_suspend fiber trigger resolver () resume then | ||
promise >>= fun () -> run fiber k (Ok (Fiber.canceled fiber)) | ||
else run fiber k (Ok (Fiber.canceled fiber))) | ||
| _ -> None | ||
in | ||
let handler = Effect.Shallow.{ retc = Lwt.return; exnc = Lwt.fail; effc } in | ||
match v with | ||
| Ok v -> Effect.Shallow.continue_with k v handler | ||
| Error exn_bt -> Exn_bt.discontinue_with k exn_bt handler | ||
|
||
let run ~forbid main = | ||
let computation = Computation.create () in | ||
let fiber = Fiber.create ~forbid computation in | ||
run fiber (Effect.Shallow.fiber main) (Ok ()) | ||
|
||
let await thunk = | ||
let computation = Computation.create () in | ||
let promise = | ||
Lwt.try_bind thunk | ||
(fun value -> | ||
Computation.return computation value; | ||
Lwt.return_unit) | ||
(fun exn -> | ||
Computation.cancel computation (Exn_bt.get_callstack 0 exn); | ||
Lwt.return_unit) | ||
in | ||
Lwt.async (fun () -> promise); | ||
let trigger = Trigger.create () in | ||
if Computation.try_attach computation trigger then begin | ||
match Trigger.await trigger with | ||
| None -> Computation.await computation | ||
| Some exn_bt -> | ||
Lwt.cancel promise; | ||
Exn_bt.raise exn_bt | ||
end | ||
else Computation.await computation | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
(** A functor for building a {!Picos} compatible direct style interface to | ||
{!Lwt} with given implementation of {{!Sleep} sleep}. | ||
This basically gives you an alternative direct style interface to | ||
programming with {!Lwt}. All the scheduling decisions will be made by | ||
{!Lwt}. *) | ||
|
||
include module type of Intf | ||
|
||
(** [Make (Sleep)] creates a {!Picos} compatible interface to {!Lwt} with given | ||
implementation of {{!Sleep} sleep}. | ||
For example, | ||
{[ | ||
module Picos_lwt_unix = Picos_lwt.Make (Lwt_unix) | ||
]} | ||
instantiates this functor using {!Lwt_unix.sleep} as the implemention of | ||
{{!Sleep.sleep} sleep}. *) | ||
module Make : functor (_ : Sleep) -> S |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
open Picos | ||
module Picos_lwt_unix = Picos_lwt.Make (Lwt_unix) | ||
|
||
let basics () = | ||
Lwt_main.run | ||
@@ Picos_lwt_unix.run ~forbid:false | ||
@@ fun () -> | ||
let computation = Computation.create () in | ||
let child = | ||
Computation.capture computation @@ fun () -> | ||
while true do | ||
Picos_lwt_unix.await (fun () -> Lwt_unix.sleep 0.01) | ||
done | ||
in | ||
Fiber.spawn ~forbid:false computation [ child ]; | ||
Computation.cancel_after computation ~seconds:0.05 | ||
(Exn_bt.get_callstack 0 Exit); | ||
match Computation.await computation with | ||
| () -> assert false | ||
| exception Exit -> () | ||
|
||
let () = | ||
[ ("Basics", [ Alcotest.test_case "" `Quick basics ]) ] | ||
|> Alcotest.run "Picos_lwt" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
open Picos | ||
open Picos_stdio | ||
|
||
let () = | ||
let[@alert "-handler"] rec propagate () = | ||
let computation = | ||
Computation.with_action () () @@ fun _ _ _ -> | ||
Lwt_unix.handle_signal Sys.sigchld; | ||
propagate () | ||
in | ||
Picos_select.return_on_sigchld computation () | ||
in | ||
propagate () | ||
|
||
let test_system_unix () = | ||
let sleep = Lwt_unix.system "sleep 2" in | ||
Lwt_main.run @@ Lwt.bind sleep | ||
@@ fun _status -> | ||
Test_scheduler.run @@ fun () -> | ||
assert (Unix.system "exit 101" = Unix.WEXITED 101); | ||
assert (Unix.system "echo Hello world!" = Unix.WEXITED 0); | ||
assert (Unix.system "this-is-not-supposed-to-exist" = Unix.WEXITED 127); | ||
match Unix.wait () with | ||
| _ -> assert false | ||
| exception Unix.Unix_error (ECHILD, _, _) -> | ||
Lwt.bind (Lwt_unix.system "ls -l") @@ fun _ -> Lwt.return () | ||
|
||
let () = | ||
[ | ||
( "Unix", | ||
if Sys.win32 then [] | ||
else [ Alcotest.test_case "system" `Quick test_system_unix ] ); | ||
] | ||
|> Alcotest.run "Picos_stdio_with_lwt" |