class Channel

Thread-safe queue for sending values from producers to consumers
class Channel {}

A Channel is a thread-safe queue that helps you to send a series of objects from one or more producers to one or more consumers. Each object will arrive at only one such consumer, selected by the scheduler. If there is only one consumer and one producer, the order of objects is guaranteed to be preserved. Sending on a Channel is non-blocking.

my $c = Channel.new;
    await (^10).map: {
        start {
            my $r = rand;
            sleep $r;
            $c.send($r);
        }
    }
    $c.close;
    say $c.list;

Further examples can be found in the concurrency page

Methods

method send

method send(Channel:D: \item)

Enqueues an item into the Channel. Throws an exception of type X::Channel::SendOnClosed if the Channel has been closed already. This call will not block waiting for a consumer to take the object. There is no set limit on the number of items that may be queued, so care should be taken to prevent runaway queueing.

my $c = Channel.new;
    $c.send(1);
    $c.send([2, 3, 4, 5]);
    $c.close;
    say $c.list; # OUTPUT: «(1 [2 3 4 5])␤»

method receive

method receive(Channel:D:)

Receives and removes an item from the Channel. It blocks if no item is present, waiting for a send from another thread.

Throws an exception of type X::Channel::ReceiveOnClosed if the Channel has been closed, and the last item has been removed already, or if close is called while receive is waiting for an item to arrive.

If the Channel has been marked as erratic with method fail, and the last item has been removed, throws the argument that was given to fail as an exception.

See method poll for a non-blocking version that won't throw exceptions.

my $c = Channel.new;
    $c.send(1);
    say $c.receive; # OUTPUT: «1␤»

method poll

method poll(Channel:D:)

Receives and removes an item from the Channel. If no item is present, returns Nil instead of waiting.

my $c = Channel.new;
    Promise.in(2).then: { $c.close; }
    ^10 .map({ $c.send($_); });
    loop {
        if $c.poll -> $item { $item.say };
        if $c.closed  { last };
        sleep 0.1;
    }

See method receive for a blocking version that properly responds to Channel closing and failure.

method close

method close(Channel:D:)

Close the Channel, normally. This makes subsequent send calls die with X::Channel::SendOnClosed. Subsequent calls of .receive may still drain any remaining items that were previously sent, but if the queue is empty, will throw an X::Channel::ReceiveOnClosed exception. Since you can produce a Seq from a Channel by contextualizing to array with @() or by calling the .list method, these methods will not terminate until the Channel has been closed. A whenever-block will also terminate properly on a closed Channel.

my $c = Channel.new;
$c.close;
$c.send(1);
CATCH { default { put .^name, ': ', .Str } };
# OUTPUT: «X::Channel::SendOnClosed: Cannot send a message on a closed channel␤»

Please note that any exception thrown may prevent .close from being called, this may hang the receiving thread. Use a LEAVE phaser to enforce the .close call in this case.

method list

method list(Channel:D:)

Returns a list based on the Seq which will iterate items in the queue and remove each item from it as it iterates. This can only terminate once the close method has been called.

my $c = Channel.new; $c.send(1); $c.send(2);
    $c.close;
    say $c.list; # OUTPUT: «(1 2)␤»

method closed

method closed(Channel:D: --> Promise:D)

Returns a promise that will be kept once the Channel is closed by a call to method close.

my $c = Channel.new;
    $c.closed.then({ say "It's closed!" });
    $c.close;
    sleep 1;

method fail

method fail(Channel:D: $error)

Closes the Channel (that is, makes subsequent send calls die), and enqueues the error to be thrown as the final element in the Channel. Method receive will throw that error as an exception. Does nothing if the Channel has already been closed or .fail has already been called on it.

my $c = Channel.new;
    $c.fail("Bad error happens!");
    $c.receive;
    CATCH { default { put .^name, ': ', .Str } };
    # OUTPUT: «X::AdHoc: Bad error happens!␤»

method Capture

method Capture(Channel:D: --> Capture:D)

Equivalent to calling .List.Capture on the invocant.

method Supply

method Supply(Channel:D:)

This returns an on-demand Supply that emits a value for every value received on the Channel. done will be called on the Supply when the Channel is closed.

my $c = Channel.new;
    my Supply $s1 = $c.Supply;
    my Supply $s2 = $c.Supply;
    $s1.tap(-> $v { say "First $v" });
    $s2.tap(-> $v { say "Second $v" });
    ^10 .map({ $c.send($_) });
    sleep 1;

Multiple calls to this method produce multiple instances of Supply, which compete over the values from the Channel.

sub await

multi await(Channel:D)
    multi await(*@)

Waits until all of one or more Channels has a value available, and returns those values (it calls .receive on the Channel). Also works with Promises.

my $c = Channel.new;
    Promise.in(1).then({$c.send(1)});
    say await $c;

Since 6.d, it no longer blocks a thread while waiting.

See Also

class Attribute

Member variable

class Cancellation

Removal of a task from a Scheduler before normal completion

class CompUnit

CompUnit

class CompUnit::Repository::FileSystem

CompUnit::Repository::FileSystem

class CompUnit::Repository::Installation

CompUnit::Repository::Installation

class Distro

Distribution related information

class Grammar

Formal grammar made up of named regexes

class IO::ArgFiles

Iterate over contents of files specified on command line

class IO::CatHandle

Use multiple IO handles as if they were one

class IO::Handle

Opened file or stream

class IO::Notification

Asynchronous notification for file and directory changes

class IO::Notification::Change

Changes in a file, produced by watch-file

class IO::Path

File or directory path

class IO::Path::Cygwin

IO::Path pre-loaded with IO::Spec::Cygwin

class IO::Path::Parts

IO::Path parts encapsulation

class IO::Path::QNX

IO::Path pre-loaded with IO::Spec::QNX

class IO::Path::Unix

IO::Path pre-loaded with IO::Spec::Unix

class IO::Path::Win32

IO::Path pre-loaded with IO::Spec::Win32

class IO::Pipe

Buffered inter-process string or binary stream

class IO::Socket::Async

Asynchronous socket in TCP or UDP

class IO::Socket::Async::ListenSocket

A tap for listening TCP sockets

class IO::Socket::INET

TCP Socket

class IO::Spec

Platform specific operations on file and directory paths

class IO::Spec::Cygwin

Platform specific operations on file and directory paths for Cygwin

class IO::Spec::QNX

Platform specific operations on file and directory paths QNX

class IO::Spec::Unix

Platform specific operations on file and directory paths for POSIX

class IO::Spec::Win32

Platform specific operations on file and directory paths for Windows

class IO::Special

Path to special I/O device

class Kernel

Kernel related information

class Lock

A low-level, re-entrant, mutual exclusion lock

class Lock::ConditionVariable

Condition variables used in locks

class Match

Result of a successful regex match

class Pod::Block

Block in a Pod document

class Pod::Block::Code

Verbatim code block in a Pod document

class Pod::Block::Comment

Comment in a Pod document

class Pod::Block::Declarator

Declarator block in a Pod document

class Pod::Block::Named

Named block in a Pod document

class Pod::Block::Para

Paragraph in a Pod document

class Pod::Block::Table

Table in a Pod document

class Pod::Defn

Pod definition list

class Pod::FormattingCode

Pod formatting code

class Pod::Heading

Heading in a Pod document

class Pod::Item

Item in a Pod enumeration list

class Proc

Running process (filehandle-based interface)

class Proc::Async

Running process (asynchronous interface)

class Promise

Status/result of an asynchronous computation

class Regex

String pattern

class Semaphore

Control access to shared resources by multiple threads

class Supplier

Live Supply factory

class Supplier::Preserving

Cached live Supply factory

class Supply

Asynchronous data stream with multiple subscribers

class Tap

Subscription to a Supply

class Thread

Concurrent execution of code (low-level)

class ThreadPoolScheduler

Scheduler that distributes work among a pool of threads

class Unicode

Unicode related information

class VM

Raku Virtual Machine related information

The Camelia image is copyright 2009 by Larry Wall. "Raku" is trademark of the Yet Another Society. All rights reserved.