22 Ekim 2017 Pazar

BlockingCollection Sınıfı

Giriş
BlockingCollection klasik bir thread-safe collection. Pipeline veya Producer/Consumer kuyruk için kullanılıyor.

Kendisi içerde elemanları saklamak için default olarak ConcurrentQueue kullanıyor. Teknik olarak ConcurrentStack veya ConcurrentBag de kullanılabilir ama bu kullanım şekli için bir sebep
düşünemiyorum.

Bu sınıf sayesinde mutex ve condition'ları bilmeye gerek yok. Java'daki karşılığı sanırım ArrayBlockingQueue olabilir.

Arayüzler
.Net 4.5 ile şöyledir.
public class BlockingCollection<T> : IEnumerable<T>, IEnumerable, ICollection, 
IDisposable, IReadOnlyCollection<T>
{ ... }
Dolayısıyla şu tür kodlar çalışır.
IEnumerable<string> items = new BlockingCollection<string>();
System.Collections.ICollection items = new BlockingCollection<string>();
IReadOnlyCollection<string> items = new BlockingCollection<string>();
.Net 4 ve öncesinde ise IReadOnlyCollection arayüzünü gerçekleştirmiyordu. Yani şöyleydi.
public class BlockingCollection<T> : IEnumerable<T>, IEnumerable, ICollection, 
IDisposable
{ ... }
Constructor
Üst sınırı olmayan bir kuyruk yaratmak için şöyle yaparız
BlockingCollection<string> queue = new BlockingCollection<string>();
Add Metodu
Eğer BlockingColletion üst sınır ile yaratılmışsa yer açılıncaya kadar bloke olabilir. Üst sınır olmadan yaratılmışsa bloke olmayacaktır. Bu metod void döner.
queue.Add(item);

CompleteAdding Metodu - Bu metod niçin var ?
BlockingCollection sınıfı genellikle producer-consumer işlerinde kullanılıyor. Consumer Parallel.ForEach (...) ile, collection.GetConsumingEnumerable() metodunu kullanarak, hızlandırma (optimizasyon) amaçlı belli büyüklükteki bir tomar (chunk) okumaya çalışıyor. Ancak producer tomardan daha az sayıda eleman üretirse, consumer sonsuza kadar bekler. İşte bu durumun olmaması için, üreticiye artık daha fazla eleman gelmeyeceği, elindekileri işlemesi gerektiğini bildirmek için bu metod var. Açıklaması şöyle.
Parallel::ForEach will use a default Partitioner<T> implementation which, for an IEnumerable<T> which has no known length, will use a chunk partitioning strategy. What this means is each worker thread which Parallel::ForEach is going to use to work on the data set will read some number of elements from the IEnumerable<T> which will then only be processed by that thread (ignoring work stealing for now). It does this to save the expense of constantly having to go back to the source and allocate some new work and schedule it for another worker thread. So, usually, this is a good thing.
Bu metod çağırılınca artık kuyruğa yeni bir eleman eklenemiyor.
void ReadInFile(string filePath, BlockingCollection<string> collection)
{
  foreach(var line in File.ReadLines(filePath))
  {
    collection.Add(line);
  }

  //This lets the consumer know that we will not be adding any more items
  collection.CompleteAdding();
}
GetConsumingEnumerable Metodu
Bu metod hem iterator olarak kullanılır hem de  kuyruktaki elemanları tüketir. Döngü içinde çağrılır. Şöyle yaparız.
var queue = new BlockingCollection<Action>();

int numWorkers = 5;

for (int i = 0; i < numWorkers; i++)
{
  Thread t = new Thread(() =>
  {
    foreach (var action in queue.GetConsumingEnumerable())
    {
      action();
    }
  });
  t.Start();
}
Take metodu
Şöyle yaparız
BlockingCollection<object> queue = new BlockingCollection<object>();
...
object item = queue.Take();





Hiç yorum yok:

Yorum Gönder