26 Aralık 2017 Salı

Parallel.ForEach Metodu

Giriş
ForEach metodu kendi içinde bir Partitioner<T> kullanır. Parallel sınıfı bu partitioner'ı kontrol etmemize imkan tanır. Bu partitioner akıllı bir şekilde veriyi küçük küçük bölümlendirir. Eğer (veriyapısının uzunluğu / çekirdek sayısı) şeklinde bir algoritma kullansaydı, yavaş işlenen bir eleman yüzünden bir çekirdek işini bitiremediği halde diğer çekirdekler işlerini bitirip boşa çıkacaklardı. Bu da kaynak israfına sebep olacaktı. Çok büyük olmayan veri paketleri kullanılarak tüm işlemcilerin maksimum kullanımı sağlanıyor.

ForEach metodunun o kadar çok overload edilmiş hali var ki, kafa karıştırıyor.

ForEach - source + action
Bir Enumerable üzerinde dönmemizi sağlar.
Örnek
Şöyle yaparız.
Parallel.ForEach<Item>(items, item => DoSomething(item));
Mesela bir URL dizisini indirmek için kullanılabilir.

ForEach - source + parallelOptions
Döngüyü durdurmak mümkün. Şöyle yaparız.
List<string> list = ...;

Parallel.ForEach(list, (string str, ParallelLoopState loopState) =>{
  if (someCondition)
    loopState.Stop();    ...
});
Örnek
İki tane liste olsun. İlk listenin bir elemanı için diğer listedeki her eleman ile işlem yapmak isteyelim. Şöyle yaparız.
Parallel.ForEach(list1, new ParallelOptions { MaxDegreeOfParallelism = 4 }, e1 =>
{
  Parallel.ForEach(list2., new ParallelOptions { MaxDegreeOfParallelism = 4 }, e2 =>
  {               
    ...
  });
});
ForEach - source + localInit + body + localFinally
ForEach ThreadLocal değişkenler ile kullanılabilir.

Örnek
Şöyle yaparız. ForEach()'e verilen ilk generic parametre source dizideki eleman tipidir. İkinci parametre thread local değişkenin tipidir. body metodu her zaman thread local değişkeni dönmek zorundadır.
/ First type parameter is the type of the source elements
// Second type parameter is the type of the thread-local variable (partition subtotal)
Parallel.ForEach<int, long>(nums, // source collection
    () => 0, // method to initialize the local variable
    (j, loop, subtotal) => // method invoked by the loop on each iteration
    {
      sum1 += nums[j]; //modify local variable
      return subtotal; // value to be passed to next iteration
    },
    // Method to be executed when each partition has completed.
    // finalResult is the final value of subtotal for a particular partition.
    (finalResult) => Interlocked.Add(ref total, finalResult)
);
Console.WriteLine("The total from Parallel.ForEach is {0:N0}", total);
Örnek
Şöyle yaparız. Her bir satır için paralel şekilde veritabanıda işlem yapılıyor.
Parallel.ForEach(dt.Rows,
  () =>
  {
    var con = new SqlConnection();
    var cmd = con.CreateCommand();
    cmd.CommandText = sql_proc;
    cmd.CommandType = CommandType.StoredProcedure;
    con.Open();

    cmd.Parameters.Add(new SqlParameter("@a", SqlDbType.Int));
    // NB : Size sensitive parameters must have size
    cmd.Parameters.Add(new SqlParameter("@b", SqlDbType.VarChar, 100));
    cmd.Parameters.Add(new SqlParameter("@c", SqlDbType.Bit));
    // Prepare won't help with SPROCs but can improve plan caching for adhoc sql
    // cmd.Prepare();
    return new {Conn = con, Cmd = cmd};
  },
  (dr, pls, localInit) =>
  {
    localInit.Cmd.Parameters["@a"] = dr["a"];
    localInit.Cmd.Parameters["@b"] = dr["b"];
    localInit.Cmd.Parameters["@c"] = dr["c"];
    localInit.Cmd.ExecuteNonQuery();
    return localInit;
  },
  (localInit) =>
  {
    localInit.Cmd.Dispose();
    localInit.Conn.Dispose();
});





Hiç yorum yok:

Yorum Gönder