class Channel
class Channel {}
A Channel
is a thread-safe queue that helps you to send a series of objects
from one or more producers to one or more consumers. Each object will arrive at
only one such consumer, selected by the scheduler. If there is only one
consumer and one producer, the order of objects is guaranteed to be preserved.
Sending on a Channel
is non-blocking.
my $c = Channel.new;
await (^10).map: {
start {
my $r = rand;
sleep $r;
$c.send($r);
}
}
$c.close;
say $c.list;
Further examples can be found in the concurrency page
Methods
method send
method send(Channel:D: \item)
Enqueues an item into the Channel
. Throws an exception of type
X::Channel::SendOnClosed if the Channel
has been
closed already. This call will not block waiting for a consumer to take the object.
There is no set limit on the number of items that may be queued, so
care should be taken to prevent runaway queueing.
my $c = Channel.new;
$c.send(1);
$c.send([2, 3, 4, 5]);
$c.close;
say $c.list; # OUTPUT: «(1 [2 3 4 5])»
method receive
method receive(Channel:D:)
Receives and removes an item from the Channel
. It blocks if no item is
present, waiting for a send
from another thread.
Throws an exception of
type X::Channel::ReceiveOnClosed if the Channel
has been closed, and the last item has been removed already, or if close
is called
while receive
is waiting for an item to arrive.
If the Channel
has been marked as erratic with method fail
, and the last
item has been removed, throws the argument that was given to fail
as an
exception.
See method poll
for a non-blocking version that won't throw exceptions.
my $c = Channel.new;
$c.send(1);
say $c.receive; # OUTPUT: «1»
method poll
method poll(Channel:D:)
Receives and removes an item from the Channel
. If no item is present, returns
Nil instead of waiting.
my $c = Channel.new;
Promise.in(2).then: { $c.close; }
^10 .map({ $c.send($_); });
loop {
if $c.poll -> $item { $item.say };
if $c.closed { last };
sleep 0.1;
}
See method receive
for a blocking version that properly responds to Channel
closing and failure.
method close
method close(Channel:D:)
Close the Channel
, normally. This makes subsequent send
calls die with
X::Channel::SendOnClosed. Subsequent calls of
.receive
may still drain any remaining items that were previously sent, but if
the queue is empty, will throw an X::Channel::ReceiveOnClosed
exception. Since you can produce a Seq from a Channel
by contextualizing to array with @()
or by calling the .list
method, these methods will not terminate until the Channel
has been
closed. A whenever-block will also
terminate properly on a closed Channel
.
my $c = Channel.new;
$c.close;
$c.send(1);
CATCH { default { put .^name, ': ', .Str } };
# OUTPUT: «X::Channel::SendOnClosed: Cannot send a message on a closed channel»
Please note that any exception thrown may prevent .close
from being called,
this may hang the receiving thread. Use a LEAVE
phaser to enforce the .close
call in this case.
method list
method list(Channel:D:)
Returns a list based on the Seq which will iterate items in the queue and
remove each item from it as it iterates. This can only terminate once the
close
method has been called.
my $c = Channel.new; $c.send(1); $c.send(2);
$c.close;
say $c.list; # OUTPUT: «(1 2)»
method closed
method closed(Channel:D: --> Promise:D)
Returns a promise that will be kept once the Channel
is closed by a call to
method close
.
my $c = Channel.new;
$c.closed.then({ say "It's closed!" });
$c.close;
sleep 1;
method fail
method fail(Channel:D: $error)
Closes the Channel
(that is, makes subsequent send
calls die), and enqueues
the error to be thrown as the final element in the Channel
. Method receive
will throw that error as an exception. Does nothing if the Channel
has already
been closed or .fail
has already been called on it.
my $c = Channel.new;
$c.fail("Bad error happens!");
$c.receive;
CATCH { default { put .^name, ': ', .Str } };
# OUTPUT: «X::AdHoc: Bad error happens!»
method Capture
method Capture(Channel:D: --> Capture:D)
Equivalent to calling .List.Capture on the invocant.
method Supply
method Supply(Channel:D:)
This returns an on-demand
Supply that emits a value for every value
received on the Channel
. done
will be called on the Supply when the Channel
is closed.
my $c = Channel.new;
my Supply $s1 = $c.Supply;
my Supply $s2 = $c.Supply;
$s1.tap(-> $v { say "First $v" });
$s2.tap(-> $v { say "Second $v" });
^10 .map({ $c.send($_) });
sleep 1;
Multiple calls to this method produce multiple instances of Supply, which compete
over the values from the Channel
.
sub await
multi await(Channel:D)
multi await(*@)
Waits until all of one or more Channel
s has a value available, and returns
those values (it calls .receive
on the Channel
). Also works with
Promises.
my $c = Channel.new;
Promise.in(1).then({$c.send(1)});
say await $c;
Since 6.d, it no longer blocks a thread while waiting.