Scaling Connections with BlockingCollection()












3














I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.



I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().



So something like (semi-pseudocode):



Socket Reading Task



Task.Run(() =>
{
while (notCancelled)
{
element = ReadXml();
switch (element)
{
case messageheader:
MessageBlockingQueue.Add(deserialze<messageType>());
...
}
}
});


Message Buffer Task



Task.Run(() =>
{
while (notCancelled)
{
Process(MessageQueue.Take());
}
});


So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.



I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.



Is this an inefficient way to handle it? what would be a better way?










share|improve this question






















  • What do you do with these messages
    – TheGeneral
    Nov 20 '18 at 9:30










  • @TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
    – FinalFortune
    Nov 20 '18 at 16:06
















3














I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.



I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().



So something like (semi-pseudocode):



Socket Reading Task



Task.Run(() =>
{
while (notCancelled)
{
element = ReadXml();
switch (element)
{
case messageheader:
MessageBlockingQueue.Add(deserialze<messageType>());
...
}
}
});


Message Buffer Task



Task.Run(() =>
{
while (notCancelled)
{
Process(MessageQueue.Take());
}
});


So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.



I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.



Is this an inefficient way to handle it? what would be a better way?










share|improve this question






















  • What do you do with these messages
    – TheGeneral
    Nov 20 '18 at 9:30










  • @TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
    – FinalFortune
    Nov 20 '18 at 16:06














3












3








3







I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.



I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().



So something like (semi-pseudocode):



Socket Reading Task



Task.Run(() =>
{
while (notCancelled)
{
element = ReadXml();
switch (element)
{
case messageheader:
MessageBlockingQueue.Add(deserialze<messageType>());
...
}
}
});


Message Buffer Task



Task.Run(() =>
{
while (notCancelled)
{
Process(MessageQueue.Take());
}
});


So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.



I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.



Is this an inefficient way to handle it? what would be a better way?










share|improve this question













I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.



I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().



So something like (semi-pseudocode):



Socket Reading Task



Task.Run(() =>
{
while (notCancelled)
{
element = ReadXml();
switch (element)
{
case messageheader:
MessageBlockingQueue.Add(deserialze<messageType>());
...
}
}
});


Message Buffer Task



Task.Run(() =>
{
while (notCancelled)
{
Process(MessageQueue.Take());
}
});


So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.



I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.



Is this an inefficient way to handle it? what would be a better way?







c# multithreading performance networking parallel-processing






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 20 '18 at 8:43









FinalFortune

136514




136514












  • What do you do with these messages
    – TheGeneral
    Nov 20 '18 at 9:30










  • @TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
    – FinalFortune
    Nov 20 '18 at 16:06


















  • What do you do with these messages
    – TheGeneral
    Nov 20 '18 at 9:30










  • @TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
    – FinalFortune
    Nov 20 '18 at 16:06
















What do you do with these messages
– TheGeneral
Nov 20 '18 at 9:30




What do you do with these messages
– TheGeneral
Nov 20 '18 at 9:30












@TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
– FinalFortune
Nov 20 '18 at 16:06




@TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
– FinalFortune
Nov 20 '18 at 16:06












3 Answers
3






active

oldest

votes


















4














You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.



Your read loop would become:



while (notCancelled) {
var next = await queue.Reader.ReadAsync(optionalCancellationToken);
Process(next);
}


and the producer:



switch (element)
{
case messageheader:
queue.Writer.TryWrite(deserialze<messageType>());
...
}


so: minimal changes





Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).






share|improve this answer























  • Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
    – FinalFortune
    Nov 22 '18 at 6:18



















1














I actually do something similar in another project. What I learned or would do differently are the following:





  1. First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)) because Task.Run uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.



    var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
    thread.Start(cancellationToken);



  2. Your Process can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:



    private void ReaderLoop(object state)
    {
    var token = (CancellationToken)state;
    while (!token.IsCancellationRequested)
    {
    try
    {
    var message = MessageQueue.Take(token);
    OnMessageReceived(new MessageReceivedEventArgs(message));
    }
    catch (OperationCanceledException)
    {
    if (!disposed && IsRunning)
    Stop();
    break;
    }
    }
    }



Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:



public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
void Callback(IAsyncResult ar)
{
var method = (EventHandler<TEventArgs>)ar.AsyncState;
try
{
method.EndInvoke(ar);
}
catch (Exception e)
{
HandleError(e, method);
}
}

foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
handler.BeginInvoke(sender, args, Callback, handler);
}


So the OnMessageReceived implementation can be:



protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> messageReceivedHandler.InvokeAsync(this, e);




  1. Finally it was a big lesson that BlockingCollection<T> has some performance issues. It uses SpinWait internally, whose SpinOnce method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection implementation using an AutoResetEvent for triggering incoming data. I added a Take(CancellationToken) overload to it as follows:



    /// <summary>
    /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
    /// </summary>
    public T Take(CancellationToken token)
    {
    T item;
    while (!queue.TryDequeue(out item))
    {
    waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
    token.ThrowIfCancellationRequested();
    }

    return item;
    }



Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection also will do it.






share|improve this answer































    1














    Yes, this is a bit inefficient, because you block ThreadPool threads.
    I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern



    You can also look at examples with testing a producer -consumer pattern here:
    https://github.com/BBGONE/TestThreadAffinity



    You can use await Task.Yield in the loop to give other tasks access to this thread.



    You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.



    If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md



    They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest






    share|improve this answer























      Your Answer






      StackExchange.ifUsing("editor", function () {
      StackExchange.using("externalEditor", function () {
      StackExchange.using("snippets", function () {
      StackExchange.snippets.init();
      });
      });
      }, "code-snippets");

      StackExchange.ready(function() {
      var channelOptions = {
      tags: "".split(" "),
      id: "1"
      };
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function() {
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled) {
      StackExchange.using("snippets", function() {
      createEditor();
      });
      }
      else {
      createEditor();
      }
      });

      function createEditor() {
      StackExchange.prepareEditor({
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: true,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: 10,
      bindNavPrevention: true,
      postfix: "",
      imageUploader: {
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      },
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      });


      }
      });














      draft saved

      draft discarded


















      StackExchange.ready(
      function () {
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53389154%2fscaling-connections-with-blockingcollectiont%23new-answer', 'question_page');
      }
      );

      Post as a guest















      Required, but never shown

























      3 Answers
      3






      active

      oldest

      votes








      3 Answers
      3






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      4














      You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.



      Your read loop would become:



      while (notCancelled) {
      var next = await queue.Reader.ReadAsync(optionalCancellationToken);
      Process(next);
      }


      and the producer:



      switch (element)
      {
      case messageheader:
      queue.Writer.TryWrite(deserialze<messageType>());
      ...
      }


      so: minimal changes





      Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).






      share|improve this answer























      • Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
        – FinalFortune
        Nov 22 '18 at 6:18
















      4














      You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.



      Your read loop would become:



      while (notCancelled) {
      var next = await queue.Reader.ReadAsync(optionalCancellationToken);
      Process(next);
      }


      and the producer:



      switch (element)
      {
      case messageheader:
      queue.Writer.TryWrite(deserialze<messageType>());
      ...
      }


      so: minimal changes





      Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).






      share|improve this answer























      • Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
        – FinalFortune
        Nov 22 '18 at 6:18














      4












      4








      4






      You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.



      Your read loop would become:



      while (notCancelled) {
      var next = await queue.Reader.ReadAsync(optionalCancellationToken);
      Process(next);
      }


      and the producer:



      switch (element)
      {
      case messageheader:
      queue.Writer.TryWrite(deserialze<messageType>());
      ...
      }


      so: minimal changes





      Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).






      share|improve this answer














      You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.



      Your read loop would become:



      while (notCancelled) {
      var next = await queue.Reader.ReadAsync(optionalCancellationToken);
      Process(next);
      }


      and the producer:



      switch (element)
      {
      case messageheader:
      queue.Writer.TryWrite(deserialze<messageType>());
      ...
      }


      so: minimal changes





      Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).







      share|improve this answer














      share|improve this answer



      share|improve this answer








      edited Nov 20 '18 at 11:03

























      answered Nov 20 '18 at 10:44









      Marc Gravell

      777k19221262539




      777k19221262539












      • Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
        – FinalFortune
        Nov 22 '18 at 6:18


















      • Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
        – FinalFortune
        Nov 22 '18 at 6:18
















      Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
      – FinalFortune
      Nov 22 '18 at 6:18




      Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
      – FinalFortune
      Nov 22 '18 at 6:18













      1














      I actually do something similar in another project. What I learned or would do differently are the following:





      1. First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)) because Task.Run uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.



        var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
        thread.Start(cancellationToken);



      2. Your Process can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:



        private void ReaderLoop(object state)
        {
        var token = (CancellationToken)state;
        while (!token.IsCancellationRequested)
        {
        try
        {
        var message = MessageQueue.Take(token);
        OnMessageReceived(new MessageReceivedEventArgs(message));
        }
        catch (OperationCanceledException)
        {
        if (!disposed && IsRunning)
        Stop();
        break;
        }
        }
        }



      Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:



      public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
      {
      void Callback(IAsyncResult ar)
      {
      var method = (EventHandler<TEventArgs>)ar.AsyncState;
      try
      {
      method.EndInvoke(ar);
      }
      catch (Exception e)
      {
      HandleError(e, method);
      }
      }

      foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
      handler.BeginInvoke(sender, args, Callback, handler);
      }


      So the OnMessageReceived implementation can be:



      protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
      => messageReceivedHandler.InvokeAsync(this, e);




      1. Finally it was a big lesson that BlockingCollection<T> has some performance issues. It uses SpinWait internally, whose SpinOnce method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection implementation using an AutoResetEvent for triggering incoming data. I added a Take(CancellationToken) overload to it as follows:



        /// <summary>
        /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
        /// </summary>
        public T Take(CancellationToken token)
        {
        T item;
        while (!queue.TryDequeue(out item))
        {
        waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
        token.ThrowIfCancellationRequested();
        }

        return item;
        }



      Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection also will do it.






      share|improve this answer




























        1














        I actually do something similar in another project. What I learned or would do differently are the following:





        1. First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)) because Task.Run uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.



          var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
          thread.Start(cancellationToken);



        2. Your Process can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:



          private void ReaderLoop(object state)
          {
          var token = (CancellationToken)state;
          while (!token.IsCancellationRequested)
          {
          try
          {
          var message = MessageQueue.Take(token);
          OnMessageReceived(new MessageReceivedEventArgs(message));
          }
          catch (OperationCanceledException)
          {
          if (!disposed && IsRunning)
          Stop();
          break;
          }
          }
          }



        Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:



        public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
        {
        void Callback(IAsyncResult ar)
        {
        var method = (EventHandler<TEventArgs>)ar.AsyncState;
        try
        {
        method.EndInvoke(ar);
        }
        catch (Exception e)
        {
        HandleError(e, method);
        }
        }

        foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
        handler.BeginInvoke(sender, args, Callback, handler);
        }


        So the OnMessageReceived implementation can be:



        protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
        => messageReceivedHandler.InvokeAsync(this, e);




        1. Finally it was a big lesson that BlockingCollection<T> has some performance issues. It uses SpinWait internally, whose SpinOnce method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection implementation using an AutoResetEvent for triggering incoming data. I added a Take(CancellationToken) overload to it as follows:



          /// <summary>
          /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
          /// </summary>
          public T Take(CancellationToken token)
          {
          T item;
          while (!queue.TryDequeue(out item))
          {
          waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
          token.ThrowIfCancellationRequested();
          }

          return item;
          }



        Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection also will do it.






        share|improve this answer


























          1












          1








          1






          I actually do something similar in another project. What I learned or would do differently are the following:





          1. First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)) because Task.Run uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.



            var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
            thread.Start(cancellationToken);



          2. Your Process can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:



            private void ReaderLoop(object state)
            {
            var token = (CancellationToken)state;
            while (!token.IsCancellationRequested)
            {
            try
            {
            var message = MessageQueue.Take(token);
            OnMessageReceived(new MessageReceivedEventArgs(message));
            }
            catch (OperationCanceledException)
            {
            if (!disposed && IsRunning)
            Stop();
            break;
            }
            }
            }



          Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:



          public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
          {
          void Callback(IAsyncResult ar)
          {
          var method = (EventHandler<TEventArgs>)ar.AsyncState;
          try
          {
          method.EndInvoke(ar);
          }
          catch (Exception e)
          {
          HandleError(e, method);
          }
          }

          foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
          handler.BeginInvoke(sender, args, Callback, handler);
          }


          So the OnMessageReceived implementation can be:



          protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
          => messageReceivedHandler.InvokeAsync(this, e);




          1. Finally it was a big lesson that BlockingCollection<T> has some performance issues. It uses SpinWait internally, whose SpinOnce method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection implementation using an AutoResetEvent for triggering incoming data. I added a Take(CancellationToken) overload to it as follows:



            /// <summary>
            /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
            /// </summary>
            public T Take(CancellationToken token)
            {
            T item;
            while (!queue.TryDequeue(out item))
            {
            waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
            token.ThrowIfCancellationRequested();
            }

            return item;
            }



          Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection also will do it.






          share|improve this answer














          I actually do something similar in another project. What I learned or would do differently are the following:





          1. First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)) because Task.Run uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.



            var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
            thread.Start(cancellationToken);



          2. Your Process can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:



            private void ReaderLoop(object state)
            {
            var token = (CancellationToken)state;
            while (!token.IsCancellationRequested)
            {
            try
            {
            var message = MessageQueue.Take(token);
            OnMessageReceived(new MessageReceivedEventArgs(message));
            }
            catch (OperationCanceledException)
            {
            if (!disposed && IsRunning)
            Stop();
            break;
            }
            }
            }



          Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:



          public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
          {
          void Callback(IAsyncResult ar)
          {
          var method = (EventHandler<TEventArgs>)ar.AsyncState;
          try
          {
          method.EndInvoke(ar);
          }
          catch (Exception e)
          {
          HandleError(e, method);
          }
          }

          foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
          handler.BeginInvoke(sender, args, Callback, handler);
          }


          So the OnMessageReceived implementation can be:



          protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
          => messageReceivedHandler.InvokeAsync(this, e);




          1. Finally it was a big lesson that BlockingCollection<T> has some performance issues. It uses SpinWait internally, whose SpinOnce method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection implementation using an AutoResetEvent for triggering incoming data. I added a Take(CancellationToken) overload to it as follows:



            /// <summary>
            /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
            /// </summary>
            public T Take(CancellationToken token)
            {
            T item;
            while (!queue.TryDequeue(out item))
            {
            waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
            token.ThrowIfCancellationRequested();
            }

            return item;
            }



          Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection also will do it.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 20 '18 at 9:59

























          answered Nov 20 '18 at 9:53









          taffer

          8,08021536




          8,08021536























              1














              Yes, this is a bit inefficient, because you block ThreadPool threads.
              I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern



              You can also look at examples with testing a producer -consumer pattern here:
              https://github.com/BBGONE/TestThreadAffinity



              You can use await Task.Yield in the loop to give other tasks access to this thread.



              You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.



              If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md



              They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest






              share|improve this answer




























                1














                Yes, this is a bit inefficient, because you block ThreadPool threads.
                I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern



                You can also look at examples with testing a producer -consumer pattern here:
                https://github.com/BBGONE/TestThreadAffinity



                You can use await Task.Yield in the loop to give other tasks access to this thread.



                You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.



                If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md



                They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest






                share|improve this answer


























                  1












                  1








                  1






                  Yes, this is a bit inefficient, because you block ThreadPool threads.
                  I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern



                  You can also look at examples with testing a producer -consumer pattern here:
                  https://github.com/BBGONE/TestThreadAffinity



                  You can use await Task.Yield in the loop to give other tasks access to this thread.



                  You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.



                  If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md



                  They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest






                  share|improve this answer














                  Yes, this is a bit inefficient, because you block ThreadPool threads.
                  I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern



                  You can also look at examples with testing a producer -consumer pattern here:
                  https://github.com/BBGONE/TestThreadAffinity



                  You can use await Task.Yield in the loop to give other tasks access to this thread.



                  You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.



                  If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md



                  They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest







                  share|improve this answer














                  share|improve this answer



                  share|improve this answer








                  edited Nov 20 '18 at 11:00

























                  answered Nov 20 '18 at 10:10









                  Maxim T

                  947




                  947






























                      draft saved

                      draft discarded




















































                      Thanks for contributing an answer to Stack Overflow!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.





                      Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                      Please pay close attention to the following guidance:


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function () {
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53389154%2fscaling-connections-with-blockingcollectiont%23new-answer', 'question_page');
                      }
                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      "Incorrect syntax near the keyword 'ON'. (on update cascade, on delete cascade,)

                      Alcedinidae

                      RAC Tourist Trophy