programming erlang ch19 ch20 - andstudy/forge GitHub Wiki

19์žฅ ๋ฉ€ํ‹ฐ์ฝ”์–ด ์„œ๊ณก

๋ณ‘ํ–‰์„ฑ์˜ ๋‘๊ฐ€์ง€ ๋ชจ๋ธ

  • shared state concurrency

    • ๋™์ผํ•œ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ณต์œ ํ•˜๋ฉด์„œ ์ฒ˜๋ฆฌ
    • ๋ณ‘ํ–‰์›”๋“œ๋กœ ๋„˜์–ด์˜ค๋ฉด์„œ ์žฌ์•™๋ฐœ์ƒ
      • ์ž ๊ธˆ ๋งค์ปค๋‹ˆ์ฆ˜ ํ•„์š”. ๋ณต์žกํ•ด์ง
  • message passing concurrency

    • ๊ฐ€๋ณ€ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ๊ฐ€ ์—†์Œ. (์™„์ „ํžˆ ์ฐธ์€ ์•„๋‹ˆ์ง€๋งŒ ์ถฉ๋ถ„ํžˆ ์ฐธ์ด๋ผ๊ณ  ๊ฐ•์กฐ)
    • ์ž ๊ธˆ ๋งค์ปค๋‹ˆ์ฆ˜์ด ํ•„์š”์—†์Œ. ์‰ฝ๊ณ  ๊ฐ„๋‹จ.

20์žฅ ๋ฉ€ํ‹ฐ์ฝ”์–ดCPUํ”„๋กœ๊ทธ๋ž˜๋ฐ

๋ฉ€ํ‹ฐ์ฝ”์–ด CPU ์—์„œ ํšจ์œจ์ ์œผ๋กœ ์‹คํ–‰๋˜๋Š” ํ”„๋กœ๊ทธ๋žจ ๋งŒ๋“ค๊ธฐ

  1. ๋งŽ์€ ํ”„๋กœ์„ธ์Šค๋ฅผ ์‚ฌ์šฉํ•˜์„ธ์š”
  • CPU ๋Š” ํ•ญ์ƒ ๋นก์„ธ๊ฒŒ ๋Œ๋ ค์•ผ ํšจ์œจ์ .
  1. side effect ๋ฅผ ํ”ผํ•ด์ฃผ์„ธ์š”
  • ๋™์‹œ์— ๊ฐ™์€ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์“ฐ๋ฉด ์žฌ์•™๋ฐœ์ƒ.
  • erlang ์€ ๋ฉ”๋ชจ๋ฆฌ ๊ณต์œ ๋ฅผ ์•ˆํ•ด์š”
    • -> ์‚ฌ์‹ค 2๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค๋ˆˆ....
  1. ์ˆœ์ฐจ์ ๋ณ‘๋ชฉ (Sequential bottleneck) ์„ ํ”ผํ•ด์ฃผ์„ธ์š”
  • ์šฐ๋ฆฌ๋Š” ๋‚˜์„œ -> ์‚ด๊ณ  -> ์ฃฝ๋Š”๋‹ค. ์ด๊ฑธ ๋ณ‘๋ ฌ๋กœ ํ• ์ˆ˜๋Š” ์—†์–ด์š”

ํ”„๋กœ๊ทธ๋žจ ๋ณ‘๋ ฌํ™” ์‹œํ‚ค๊ธฐ

  • ๋ฉ”์„ธ์ง€๋กœ data ์™€ ์ฒ˜๋ฆฌ๊ฒฐ๊ณผ๋ฅผ ์ฃผ๊ณ ๋ฐ›์Œ์œผ๋กœ์จ ๋ณ‘๋ ฌํ™”

  • map

      map(_, []) -> [];
      map(F, [H|T]) -> [F(H) | map(F,T)].
      }}}
       * pmap ( Parallel map ) : List ์˜ ์ธ์ˆ˜ ๋”ฑ 1๊ฐœ์”ฉ๋งŒ ๋ณ‘๋ ฌ๋กœ ํ‰๊ฐ€ํ•˜์—ฌ ์ทจํ•ฉ (์ทจํ•ฉ์ˆœ์„œ๊ณ ๋ คํ•จ)
      {{{#!gcode
      pmap(F, L) -> 
          S = self(),
          %% make_ref() returns a unique reference
          %%   we'll match on this later
          Ref = erlang:make_ref(), 
          Pids = map(fun(I) -> 
      		       spawn(fun() -> do_f(S, Ref, F, I) end)
      	       end, L),
          %% gather the results
          gather(Pids, Ref).
      
      do_f(Parent, Ref, F, I) ->					    
          Parent ! {self(), Ref, (catch F(I))}.
      
      gather([Pid|T], Ref) ->
          receive
      	{Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
          end;
      gather([], _) ->
          [].
    
  • pmap1 : ์ทจํ•ฉ์ˆœ์„œ๊ณ ๋ คํ•˜์ง€ ์•Š๋Š” ๋ฒ„์ „

      pmap1(F, L) -> 
          S = self(),
          Ref = erlang:make_ref(),
          foreach(fun(I) -> 
      		    spawn(fun() -> do_f1(S, Ref, F, I) end)
      	    end, L),
          %% gather the results
          gather1(length(L), Ref, []).
      
      do_f1(Parent, Ref, F, I) ->					    
          Parent ! {Ref, (catch F(I))}.
      
      gather1(0, _, L) -> L;
      gather1(N, Ref, L) ->
          receive
      	{Ref, Ret} -> gather1(N-1, Ref, [Ret|L])
          end.
    
  • ์–ธ์ œ pmap ์„ ์‚ฌ์šฉํ• ์ˆ˜ ์žˆ๋‚˜์š”?

    1. ์ฒ˜๋ฆฌ๋˜๋Š” ์ž‘์—…๋Ÿ‰์ด ์ ์œผ๋ฉด pmap ์‚ฌ์šฉํ•˜์ง€ ๋งˆ์„ธ์š”
    2. ํ”„๋กœ์„ธ์Šค๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์ด ์ƒ์„ฑ๋ ๊ฒฝ์šฐ๋ฅผ ํ”ผํ•ด์ฃผ์„ธ์š”
    3. ์กฐ๊ธˆ ๋” ์ถ”์ƒํ™”ํ•ด์„œ ์‚ฌ์šฉํ•˜์„ธ์š”. ์ทจํ•ฉ์ˆœ์„œ ๊ณ ๋ ค

๋ฉ”์„ธ์ง€๋Š” ์ž‘๊ฒŒ, ๊ณ„์‚ฐ์€ ํฌ๊ฒŒ

  • SMP Erlang (Symmetric Multi Processing)

    1. erl -smp +S ์Šค์ผ€์ฅด๋Ÿฌ๊ฐฏ์ˆ˜N

    2. N์„ ๋”ฐ๋กœ ์„ค์ •ํ•˜์ง€ ์•Š์œผ๋ฉด ๋จธ์‹ ์˜ CPU ๊ฐฏ์ˆ˜๊ฐ€ ๊ธฐ๋ณธ๊ฐ’์œผ๋กœ ์„ค์ •๋จ.

      ||Erlang (BEAM) emulator version 5.6.3 {{{#red [smp:2]}}} [async-threads:0]|| ||Eshell V5.6.3 (abort with ^G)|| ||1>||

  • -smp ์˜ต์…˜์„ ์ด์šฉํ•˜์—ฌ ๊ฐ€์ƒ ๋ฉ€ํ‹ฐ์ฝ”์–ดCPU ๋ฅผ emulate ํ•˜๊ธฐ์œ„ํ•œ ์Šคํฌ๋ฆฝํŠธ

    • runtests.pl

      for( $i = 1; $i < 32; $i++ ){
      	print "$i \n";
          # CPU ๊ฐฏ์ˆ˜๋ฅผ 1๋ถ€ํ„ฐ 32๊ฐœ๊นŒ์ง€ ๋Š˜๋ ค๊ฐ€๋ฉด์„œ ptests:tests(CPU๊ฐฏ์ˆ˜) ์ˆ˜ํ–‰
      	system("erl -boot start_clean -noshell -smp +S $i -s ptests tests $i >> results.txt");
      	}
      
      set i = 1
      
      FOR /L %%i IN (1,1,5) DO erl -boot start_clean -noshell -smp +S %%i -s ptests tests %%i >> results.txt
      
  • ํ…Œ์ŠคํŠธ ํ”„๋กœ๊ทธ๋žจ

      -module(ptests).
      -export([tests/1, fib/1]).
      -import(lists, [map/2]).
      -import(lib_misc, [pmap/2]).
      
      tests([N]) ->
          Nsched = list_to_integer(atom_to_list(N)),
          run_tests(1, Nsched).
      
      run_tests(N, Nsched) ->
          case test(N) of
      	stop ->
      	    init:stop();
      	Val ->
      	    io:format("~p.~n",[{Nsched, Val}]),
      	    run_tests(N+1, Nsched)
          end.
      
      test(1) ->
          %% Make 100 lists 
          %%   Each list contains 1000 random integers
          seed(),
          S = lists:seq(1,100),
          L = map(fun(_) -> mkList(1000) end, S),
          {Time1, S1} = timer:tc(lists,    map,  [fun lists:sort/1, L]),
          {Time2, S2} = timer:tc(lib_misc, pmap, [fun lists:sort/1, L]),
          {sort, Time1, Time2, equal(S1, S2)};
      
      test(2) ->
          %% L = [27,27,27,..] 100 times
          L = lists:duplicate(100, 27), 
          {Time1, S1} = timer:tc(lists,    map,  [fun ptests:fib/1, L]),
          {Time2, S2} = timer:tc(lib_misc, pmap, [fun ptests:fib/1, L]),
          {fib, Time1, Time2, equal(S1, S2)};
      
      test(3) ->
          stop.
      
      %% Equal is used to test that map and pmap compute the same thing
      equal(S,S)   -> true;
      equal(S1,S2) ->  {differ, S1, S2}.
       
      %% recursive (inefficent) fibonacci
      fib(0) -> 1;
      fib(1) -> 1;
      fib(N) -> fib(N-1) + fib(N-2).
      
      %% Reset the random number generator. This is so we
      %% get the same sequence of random numbers each time we run
      %% the program
      seed() -> random:seed(44,55,66).
      
      %% Make a list of K random numbers
      %%    Each random number in the range 1..1000000
      mkList(K) -> mkList(K, []).
      
      mkList(0, L) -> L;
      mkList(N, L) -> mkList(N-1, [random:uniform(1000000)|L]).
    

๋ณ‘๋ ฌํ™” ์˜ˆ์ œ 1. mapreduce

  • map + reduce

    • map : mapping ํ”„๋กœ์„ธ์Šค๊ฐ€ {Key,Value} ์Œ์˜ ์ŠคํŠธ๋ฆผ ์ƒ์„ฑํ•ด์„œ reduce ํ”„๋กœ์„ธ์Šค๋กœ ๋ณด๋‚ด๊ธฐ

    • reduce : reduce ํ”„๋กœ์„ธ์Šค๋Š” ๊ฐ™์€ ํ‚ค๋ฅผ ์ง€๋‹Œ ์Œ๋ผ๋ฆฌ ๊ฒฐํ•ฉํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ๋ณ‘ํ•ฉ

      -module(phofs).
      -export([mapreduce/4]).
      
      -import(lists, [foreach/2]).
      
      %% F1(Pid, X) -> sends {Key,Val} messages to Pid
      %% F2(Key, [Val], AccIn) -> AccOut
      
      mapreduce(F1, F2, Acc0, L) ->
          S = self(),
          Pid = spawn(fun() -> reduce(S, F1, F2, Acc0, L) end),
          receive
      	{Pid, Result} ->
      	    Result
          end.
      
      reduce(Parent, F1, F2, Acc0, L) ->
          process_flag(trap_exit, true),
          ReducePid = self(),
          %% Create the Map processes
          %%   One for each element X in L
          foreach(fun(X) -> 
      		    spawn_link(fun() -> do_job(ReducePid, F1, X) end)
      	    end, L),
          N = length(L),
          %% make a dictionary to store the Keys
          Dict0 = dict:new(),
          %% Wait for N Map processes to terminate
          Dict1 = collect_replies(N, Dict0),
          Acc = dict:fold(F2, Acc0, Dict1),
          Parent ! {self(), Acc}.
      
      %% collect_replies(N, Dict)
      %%     collect and merge {Key, Value} messages from N processes.
      %%     When N processes have terminated return a dictionary
      %%     of {Key, [Value]} pairs
      
      collect_replies(0, Dict) ->
          Dict;
      collect_replies(N, Dict) ->
          receive
      	{Key, Val} ->
      	    case dict:is_key(Key, Dict) of
      		true ->
      		    Dict1 = dict:append(Key, Val, Dict),
      		    collect_replies(N, Dict1);
      		false ->
      		    Dict1 = dict:store(Key,[Val], Dict),
      		    collect_replies(N, Dict1)
      	    end;
      	{'EXIT', _,  Why} ->
      	    collect_replies(N-1, Dict)
          end.
      
      %% Call F(Pid, X)
      %%   F must send {Key, Value} messsages to Pid
      %%     and then terminate
      
      do_job(ReducePid, F, X) ->
          F(ReducePid, X).
      
  • mapreduce ์˜ˆ์ œ

    • map : generate_words -> ํŒŒ์ผ์—์„œ word ๋ฅผ ์ฐพ๊ธฐ

    • reduce : count_words -> word ์˜ ์‚ฌ์šฉ ๋นˆ๋„์ˆ˜

      -module(test_mapreduce).
      -compile(export_all).
      -import(lists, [reverse/1, sort/1]).
      
      test() ->
          wc_dir(".").
      
      wc_dir(Dir) ->
          F1 = fun generate_words/2,
          F2 = fun count_words/3,
          Files = lib_find:files(Dir, "*.erl", false),
          L1 = phofs:mapreduce(F1, F2, [], Files),
          reverse(sort(L1)).
      
      generate_words(Pid, File) ->
          F = fun(Word) -> Pid ! {Word, 1} end,
          lib_misc:foreachWordInFile(File, F).
      
      count_words(Key, Vals, A) ->
          [{length(Vals), Key}|A].
      

๋ณ‘๋ ฌํ™” ์˜ˆ์ œ 2. ๋””์Šคํฌ ์ƒ‰์ธ

  • "์ด๊ฒƒ์€ ๋ณต์žกํ•œ ํ”„๋กœ๊ทธ๋žจ์ž…๋‹ˆ๋‹ค.(์ด ์ฑ…์—์„œ ๊ฐ€์žฅ ๋ณต์žกํ•˜๋‹ค.)"
  • "๋ชจ๋‘ ํ•ฉํ•ด ์ฝ”๋“œ๊ฐ€ ์•ฝ 1200์ค„์ด๋‚˜ ๋˜๋Š” ๊ด€๊ณ„๋กœ ๋‚˜๋Š” ์—ฌ๊ธฐ ๊ทธ ์ฝ”๋“œ๋ฅผ ์ „๋ถ€ ํฌํ•จ์‹œ์ผœ ์ž๋ผ๋‚˜๋Š” ์ƒˆ์‹น์„ ๋ฐŸ๊ณ  ์‹ถ์ง€๋Š” ์•Š๋‹ค" by ์กฐ ์•”์ŠคํŠธ๋กฑ

๋ฏธ๋ž˜๋กœ ์„ฑ์žฅํ•˜๊ธฐ

  • ์–ผ๋žญ์œผ๋กœ ์ƒˆ๋กœ์šด ์Šคํƒ€์ผ์˜ ํ”„๋กœ๊ทธ๋žจ ๋งŒ๋“ค๊ธฐ๋ฅผ ์ฆ๊ฒจ๋ณด์ž