Asynchronous Stream in C # 8

Async / Await functionality was introduced in C # 5 to improve user interface responsiveness and web access to resources. In other words, asynchronous methods help developers perform asynchronous operations that do not block threads and return a single scalar result. After numerous attempts by Microsoft to simplify asynchronous operations, the async / await pattern has gained a good reputation among developers with a simple approach.

Existing asynchronous methods are significantly limited in that they should only return one value. Let's look at some async Task<int> DoAnythingAsync() method that is common for such syntax. The result of his work is some one meaning. Due to this limitation, you cannot use this function with the yield keyword and the asynchronous IEnumerable<int> interface (to return the result of an asynchronous enumeration).

If you combine the async/await function and the yield , then you could use a powerful programming model known as asynchronous data pull , or a pull based enumeration based enumeration or asynchronous async sequence , as it is called in F #.

The new ability to use asynchronous threads in C # 8 removes the limitation associated with returning a single result and allows the asynchronous method to return multiple values. These changes will give the asynchronous template more flexibility, and the user will be able to retrieve data from somewhere (for example, from the database) using delayed asynchronous sequences or to receive data from asynchronous sequences in parts as available.


 foreach await (var streamChunck in asyncStreams) { Console.WriteLine($“Received data count = {streamChunck.Count}”); } 

Another approach to solving problems associated with asynchronous programming is to use reactive extensions (Rx). Rx is gaining importance among developers and this method is used in many programming languages, for example Java (RxJava) and JavaScript (RxJS).

Rx is based on a push push model (Tell Don't Ask principle), also known as reactive programming. Those. unlike IEnumerable, when the consumer requests the next element, in the Rx model, the data provider signals the consumer that a new element appears in the sequence. Data is pushed into the queue in asynchronous mode and the consumer uses it at the time of receipt.

In this article, I will compare a model based on pushing data (such as Rx) with a model based on pulling data (such as IEnumerable), and also show which scenarios are best suited for which model. The whole concept and benefits are examined with a lot of examples and demo code. In the end, I will show the application and demonstrate it with code example.

Comparison of a model based on pushing data with a model based on pulling data (pull-)

Fig. -1- Comparison of a model based on data pulling with a model based on data pushing

These examples are based on the relationship between the data provider and the consumer, as shown in Fig. -one-. A pull-based model is easy to understand. In it, the consumer requests and receives data from the supplier. An alternative approach is a push push model. Here, the provider publishes the data in the queue and the consumer must subscribe to it in order to receive it.

A data-pull-based model is suitable for cases where the provider generates data faster than the consumer uses it. Thus, the consumer receives only the necessary data, which avoids overflow problems. If the consumer uses the data faster than the supplier produces it, a model based on pushing data is suitable. In this case, the supplier can send more data to the consumer so that there are no unnecessary delays.

Rx and Akka Streams (a flow-based programming model) use the back pressure method to control the flow. To solve the problems of the supplier and the recipient described above, the method uses both pushing and pulling data.

In the example below, a slow consumer pulls data from a faster provider. After the consumer processes the current element, he will ask the supplier for the next and so on until the end of the sequence.

Motivation to use and basic information

To understand the whole need for asynchronous threads, consider the following code.

 //       (count) static int SumFromOneToCount(int count) { ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; } //  : const int count = 5; ConsoleExt.WriteLine($"Starting the application with count: {count}!"); ConsoleExt.WriteLine("Classic sum starting."); ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}"); ConsoleExt.WriteLine("Classic sum completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 


We can make the method deferred using the yield statement, as shown below.

 static IEnumerable<int> SumFromOneToCountYield(int count) { ConsoleExt.WriteLine("SumFromOneToCountYield called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; yield return sum; } } 

Method call

 const int count = 5; ConsoleExt.WriteLine("Sum with yield starting."); foreach (var i in SumFromOneToCountYield(count)) { ConsoleExt.WriteLine($"Yield sum: {i}"); } ConsoleExt.WriteLine("Sum with yield completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 


As shown in the output window above, the result is returned in parts, not a single value. The summary results shown above are known as deferred listing. However, the problem is still not resolved: summing methods block the code. If you look at the threads, you can see that everything is running in the main thread.

Let's apply the magic word async to the first SumFromOneToCount method (without yield).

 static async Task<int> SumFromOneToCountAsync(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsync called!"); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }); return result; } 

Method call

 const int count = 5; ConsoleExt.WriteLine("async example starting."); //      . ,  . ,        . var result = await SumFromOneToCountAsync(count); ConsoleExt.WriteLine("async Result: " + result); ConsoleExt.WriteLine("async completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 


Fine. Now the calculations are performed in a different thread, however, the problem with the result still exists. The system returns the result with a single value.
Imagine that we can combine deferred enumerations (yield statement) and asynchronous methods in an imperative programming style. The combination is called asynchronous streams and this is a new feature in C # 8. It is great for solving problems associated with the programming model based on data extraction, for example, downloading data from a site or reading records in a file or database in modern ways.

Let's try to do this in the current version of C #. I will add the async keyword to the SumFromOneToCountYield method as follows:

Fig. -2- Error while using yield and async keyword at the same time.

When we try to add async to SumFromOneToCountYield, an error occurs as shown above.
Let's try it differently. We can remove the yield keyword and apply IEnumerable in the task, as shown below:

 static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!"); var collection = new Collection<int>(); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; collection.Add(sum); } return collection; }); return result; } 

Method call

 const int count = 5; ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!"); var scs = await SumFromOneToCountTaskIEnumerable(count); ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!"); foreach (var sc in scs) { //   ,  .     . ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}"); } ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine); 


As you can see from the example, everything is calculated in asynchronous mode, but the problem still remains. Results (all results are collected in a collection) are returned as a single block. And this is not what we need. If you remember, our goal was to combine the asynchronous calculation mode with the possibility of delay.

To do this, you need to use an external library, for example, Ix (part of Rx), or asynchronous streams, presented in C #.

Let's get back to our code. To demonstrate asynchronous behavior, I used an external library .

 static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence) { ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called"); await sequence.ForEachAsync(value => { ConsoleExt.WriteLineAsync($"Consuming the value: {value}"); //    Task.Delay(TimeSpan.FromSeconds(1)).Wait(); }); } static IEnumerable<int> ProduceAsyncSumSeqeunc(int count) { ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; //    Task.Delay(TimeSpan.FromSeconds(0,5)).Wait(); yield return sum; } } 

Method call

 const int count = 5; ConsoleExt.WriteLine("Starting Async Streams Demo!"); //   .       . IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable(); ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#"); //    ;      . var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence)); //   . ,    . consumingTask.Wait(); ConsoleExt.WriteLineAsync("Async Streams Demo Done!"); 


Finally, we see the desired behavior. You can run an enumeration loop in asynchronous mode.
See source code here .

Asynchronous data pulling using client-server architecture as an example

Let's look at this concept with a more realistic example. All the benefits of this feature are best seen in the context of the client-server architecture.

Synchronous call in case of client-server architecture

When sending a request to the server, the client is forced to wait (i.e., it is blocked) until a response arrives, as shown in Fig. -3-.

Fig. -3- Synchronous data pulling, during which the client waits until the request processing is completed

Asynchronous data pulling

In this case, the client requests data and moves on to other tasks. Once the data is received, the client will continue to do the work.

Fig. -4- Asynchronous data pulling during which the client can perform other tasks while data is being requested

Pulling data asynchronously

In this case, the client requests a piece of data and continues to perform other tasks. Then, after receiving the data, the client processes it and requests the next part, and so on, until all the data is received. It was from this scenario that the idea of ​​asynchronous threads came about. In fig. -5- shows how the client can process the received data or perform other tasks.

Fig. -5- Pulling data as an asynchronous sequence (asynchronous streams). The client is not blocked.

Asynchronous threads

Like IEnumerable<T> and IEnumerator<T> there are two new IAsyncEnumerable<T> and IAsyncEnumerator<T> interfaces, which are defined as shown below:

 public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(); } public interface IAsyncEnumerator<out T> : IAsyncDisposable { Task<bool> MoveNextAsync(); T Current { get; } } //      public interface IAsyncDisposable { Task DiskposeAsync(); } 

In InfoQ, Jonathan Allen got this topic right. Here I will not go into details, so I recommend reading his article .

The focus is on the return value of Task<bool> MoveNextAsync() (changed from bool to Task<bool> , bool IEnumerator.MoveNext() ). Thanks to him, all calculations, as well as their iteration, will occur asynchronously. The consumer decides when to get the next value. Although it is an asynchronous model, it still uses data pulling. For asynchronous cleaning of resources, you can use the IAsyncDisposable interface. More information about asynchronous threads can be found here .


The final syntax should look something like the following:

 foreach await (var dataChunk in asyncStreams) { //        yield    . } 

From the example above, it is clear that instead of calculating a single value, we, theoretically, can sequentially calculate many values, while waiting for other asynchronous operations.

Redesigned Microsoft Example

I rewrote the demo code of Microsoft. It can be downloaded entirely from my GitHub repository .

The example is based on the idea of ​​creating a large stream in memory (an array of 20,000 bytes) and sequentially extracting elements from it in asynchronous mode. During each iteration, 8 KB is pulled from the array.

At step (1), a large data array is created, filled with dummy values. Then, during step (2), a variable called checksum is defined. This variable containing the checksum is intended to verify the correctness of the sum of the calculations. An array and a checksum are created in memory and returned as a sequence of elements in step (3).

Step (4) involves applying the AsEnumarble extension AsEnumarble (the more appropriate name AsAsyncEnumarble), which helps to simulate an asynchronous stream of 8 KB (BufferSize = 8000 elements (6))

It is usually not necessary to inherit from IAsyncEnumerable, but in the example shown above, this operation is performed to simplify the demo code, as shown in step (5).

Step (7) involves the use of the foreach keyword, which extracts 8 KB chunks of data from an asynchronous stream in memory. The pulling process occurs sequentially: when the consumer (part of the code containing foreach ) is ready to receive the next piece of data, he pulls them from the provider (the array contained in the stream in memory). Finally, when the cycle is completed, the program will check the value of 'c' for the checksum and if they match, it will display the message “Checksums match!”, According to step (8).

Microsoft demo output window:


We looked at asynchronous threads, which are great for pulling data asynchronously and writing code that generates multiple values ​​in asynchronous mode.
Using this model, you can query the next data element in a sequence and get a response. It differs from the IObservable<T> data push model, using which values ​​are generated regardless of the state of the consumer. Asynchronous streams allow you to perfectly represent asynchronous data sources controlled by the consumer when he himself determines the willingness to accept the next piece of data. Examples include using web applications or reading records in a database.

I demonstrated how to create an enumeration in asynchronous mode and use it using an external library with asynchronous sequence. I also showed what advantages this function provides when downloading content from the Internet. Finally, we looked at the new syntax for asynchronous threads, as well as a complete example of its use based on the Microsoft Build Demo Code ( May 7–9, 2018 // Seattle, WA )


All Articles