Programming/Rust

쪼잔한 Rust 16. 두렵지 않은 Concurrency

동건 2018. 9. 11. 20:39

Chapter 16. 두렵지 않은 Concurrency

Concurrent 프로그래밍을 안전하고 효율적으로 다루는 것이 Rust가 삼는 주된 목표 중 하나이다. 프로그램의 여러 부분을 독립적으로 실행시키는 concurrent 프로그래밍과 동시에 실행시키는 parallel 프로그래밍은 많은 컴퓨터들이 여러 프로세서를 이용함에 따라 더욱이 중요하게 여겨지고 있다. 역사적으로 볼 때, 이러한 영역의 프로그래밍은 어렵고 오류가 나기 쉬운 것으로 생각되어왔다. Rust는 이러한 인식을 바꾸고 싶다.

Rust 팀은 애초에 메모리의 안전을 보장하는 것과 concurrency 문제를 예방하는 것은 완전히 별개의 문제라고 생각했었다. 시간이 지나면서, Rust 팀은 소유권과 타입 시스템이 이 두 가지 문제를 해결하는데 강력한 도구가 될 수 있음을 알게 되었다! 소유권과 타입 검사를 잘 이용하면, 많은 concurrency 오류들이 Rust에서는 런타임 에러가 아니라 컴파일 시점에서의 오류가 될 수 있었다. 즉, 런타임에서 concurrency로 인한 버그를 재상산하기 위해 많은 시간을 할애할 필요 없이, 잘못된 코드가 컴파일이 되지 않고 문제를 알려주는 에러를 내놓게 될 것이다. 그리하여 당신은 코드가 프로덕션에 배포된 후의 시점이 아니라 코딩을 하는 시점에서 잘못된 것을 고칠 수 있게 된다. 이러한 면을 우리는 Rust의 두렵지 않은(fearless) concurrency라고 이름지었다. 두렵지 않은 concurrency는 미묘한 버그가 없고, 새로운 버그를 유발하지 않고 리팩터를 할 수 있도록 코드를 짤 수 있게 해준다.

참고: 이 글이 편하게 읽히기 위해서, concurrent와 parallel을 엄밀하게 구분하지 않고 뭉뚱그려서 concurrent라고 이야기할 것이다. 이번 챕터에서 concurrent라는 사용할 때마다 부디 알아서 concurrent 또는 parallel로 상황에 맞게 이해해주길 바란다.

많은 언어들은 concurrency 문제를 다루는 해법에 대한 각자의 신조가 있다. 예를 들자면, Erlang은 우아한 message-passing concurrency를 가지고 있는 반면에 thread 간의 state를 공유하는 것에 있어서는 이해하기 어려운 방법을 제공한다. 하이 레벨 언어는 추상화를 얻기 위해 일정 로우 레벨의 제어를 포기하기 때문에, concurrency 문제를 해결할 수 있는 방법들 중의 일부분만을 지원하는 것도 이해가 간다. 반면에 로우 레벨 언어는 어떤 상황에서도 최고의 성능을 내는 해법을 지원해야 하고, 하드웨어의 추상화가 훨씬 적어야 한다. 따라서 Rust는 어떤 상황에서도 그에 맞게 concurrency 문제를 풀어나가는 다양한 도구들을 제공한다.

아래는 이번 챕터에서 다루게 될 주제들이다.

  • 여러 thread를 만들어서 여러 코드를 동시에 실행하는 법
  • Channel이 thread 간에 메세지를 보내는 message-passing concurrency
  • 여러 thread가 일정 데이터에 같이 접근할 수 있는 shared-state concurrency
  • Rust의 concurrency가 기본 라이브러리 뿐만 아니라 사용자가 정의한 타입에도 적용할 수 있도록 하는 SyncSend trait


Thread를 사용해서 코드를 동시에 실행하기

현재 대부분의 운영 체제에서 프로그램 코드는 프로세스에서 실행되고, 운영 체제는 여러 프로세스를 동시에 관리한다. 그 프로그램 안에서 또한 독립적인 부분들을 동시에 실행시킬 수 있고, 그 실행 단위를 thread라고 부른다.

프로그램의 계산을 여러 thread로 나눠서 하는 것은 성능을 개선할 수 있는 반면에 프로그램을 더 복잡하게 만든다. 그 동시에 실행되는 thread들이 실행되는 순서가 일정하게 정해지지 않기 때문이다. 이는 아래와 같은 문제를 야기한다.

  • 여러 thread들이 비정상적인 순서로 데이터에 접근하는 race condition
  • 두 thread가 서로 끝나길 기다리기 때문에 양 쪽 모두 작업을 멈추게 되는 deadlock
  • 굉장히 특정한 상황에서만 버그가 일어나고, 그 버그를 똑같이 재생산하기가 어려우며, 확실하게 버그를 고치기 어려운 경우

Rust는 thread를 사용할 때의 안 좋은 효과들을 줄이려 노력할 것이다. 하지만 여전히 multithreaded 상황에서의 프로그래밍은 조심스럽게 생각해야하며, 단일 thread에서의 프로그래밍을 할 때와는 다른 코드 구조를 필요로 함을 인지하고 있어야 한다.

프로그래밍 언어들은 thread 개념을 각기 다른 방법으로 구현한다. 많은 운영 체제들은 새로운 thread를 생성하기 위한 API를 제공한다. 이를 이용해서 프로그래밍 언어가 운영 체제의 API를 호출해서 thread를 생성하는 것을 때로는 1:1 모델이라고도 불리는데, 이는 하나의 운영 체제 thread가 한 언어에서의 thread이기 때문이다.

또 다른 많은 언어들은 또한 그들만의 특별한 thread 구현체를 가지고 있기도 하다. 이렇게 언어가 제공하는 thread를 green thread라고 부르며, 이 green thread는 운영 체제 thread의 개수와는 다른 개념으로 실행된다. 그렇기 때문에 green thread를 사용하는 모델은 M:N 모델이라고 불리기도 한다: N개의 운영 체제 thread에 따라 M개의 green thread가 생성되기 때문이다. 여기서 MN은 굳이 같은 수일 필요는 없다.

이러한 모델들은 각자의 장점과 단점이 있는데, Rust에게 가장 중요한 단점은 바로 런타임에서 지원을 해줘야 한다는 점이다. 런타임이라는 것은 어떤 상황이냐에 따라 다른 의미를 가질 수 있는 헷갈리는 용어이다.

이 책의 문맥에서는 런타임을 언어에 의해서 모든 binary 실행 파일에 기본적으로 포함되는 코드를 의미한다. 이 코드는 언어에 따라 클 수도 작을 수도 있겠지만, 어셈블리 언어가 아닌 이상 모든 프로그래밍 언어는 런타임 코드를 가질 수 밖에 없다. 이러한 의미에서, 종종 사람들이 "런타임이 없다"고 말할 때는 사실 "런타임이 적다"라는 의미로 이야기하는 것이라고 생각하면 된다. 런타임 코드가 적을 수록 적은 기능을 가지지만, 반대로 binary의 크기가 작아져서 이 언어를 더 많은 상황에서 여타 언어와 함께 엮을 수 있다는 장점이 있다. 많은 언어들이 런타임의 크기를 늘려서 더 많은 기능들을 추가하는 것에 문제가 없다고 하지만, Rust는 런타임 코드가 거의 없다시피하게 적을 것을 요구하고, 성능을 유지하기 위해 C 언어를 호출할 수 있어야 한다는 것은 타협할 수 없는 지점이다.

Green thread의 M:N 모델은 언어가 thread를 관리하기 위해서 더 큰 런타임을 필요로 하게 된다. 따라서 Rust의 기본 라이브러리는 1:1 thread 모델만을 구현하여 제공한다. Rust는 꽤나 로우 레벨의 언어이기 때문에, 당신이 어느 thread를 언제 실행시킬 것이냐와 같은 제어를 더 원한다거나, context switching의 비용을 줄이고 싶다면, 이를 위한 M:N threading 모델을 구현한 crate이 있으니 조사해보기 바란다.

지금까지 Rust에서 thread가 무엇인지 정의했다. 이제 Rust의 기본 라이브러리에서 제공하는 thread API를 어떻게 사용하는지 알아보자.


spawn으로 새로운 Thread 생성하기

새로운 thread를 하나 생성하기 위해서는 thread::spawn 함수를 호출하고 거기에 생성될 thread가 실행할 코드를 가진 closure 하나 (closure는 챕터 13에서 다루었다)를 건네주면 된다. 예제 16-1은 메인 thread에서 텍스트를 출력하고, 새로운 thread에서도 다른 텍스트를 출력하는 코드이다.

예제 16-1: 메인 thread가 출력하는 동안 또 다른 출력을 하는 새로운 thread 생성하기

// Filename: src/main.rs
use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

여기서 새로운 thread는 그 일이 끝나든 아니든, 메인 thread가 종료될 경우 같이 멈춘다는 것을 알아두자. 이 프로그램의 출력 결과물은 매번 실행할 때마다 조금씩 다를 수 있는데, 어쨌든 아래와 비슷하게 출력될 것이다.

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

thread::sleep은 짧은 시간 동안 thread의 실행을 멈추도록 강제하는데, 이와 동시에 다른 thread가 실행될 수 있도록 해준다. 그리하여 다른 thread들이 실행될 차례를 가질 수도 있고 아닐 수도 있다. 왜냐하면 thread들이 실행되는 차례를 스케쥴링하는 것은 순전히 운영 체제의 몫이기 때문이다. 위 실행 결과에서는 새롭게 생성된 thread의 출력 명력이 먼저 있었음에도 불구하고 메인 thread의 텍스트가 가장 먼저 출력되었다. 게다가 우리는 i가 9가 될 때까지 생성된 thread가 출력하도록 했지만, 메인 thread가 그 전에 멈춰버렸기 때문에 생성된 thread는 5까지만 출력할 수 있었다.

만약 위 코드를 실행했는데 오직 메인 thread의 출력물만이 보인다거나, 두 thread 간의 출력 순서가 뒤섞이지 않는다면, 반복 범위의 숫자를 늘려서 운영 체제가 thread 사이를 옮겨다니는 기회를 더욱 많이 들려서 확인해보자.


join Handle을 이용해서 모든 Thread가 끝나길 기다리기

위 예제 16-1의 코드는 대부분의 경우 메인 thread가 먼저 끝나기 때문에 생성된 thread가 일을 다 마치기 전에 같이 끝나게 된다. 게다가 생성된 thread가 실행될 것이라는 확신조차 할 수 없는 상황이 된다. 왜냐하면 thread가 실행될 순서가 확실하지 않기 때문이다!

We can fix the problem of the spawned thread not getting to run, or not getting to run completely, by saving the return value of thread::spawn in a variable. The return type of thread::spawn is JoinHandle. A JoinHandle is an owned value that, when we call the join method on it, will wait for its thread to finish. Listing 16-2 shows how to use the JoinHandle of the thread we created in Listing 16-1 and call join to make sure the spawned thread finishes before main exits: 우리는 이 문제를 thread::spawn의 결과값을 변수로 담아서 해결할 수 있다. 그 thread::spawn의 반환 타입은 JoinHandle이다. JoinHandle은 소유권을 가진 값으로, 여기에 구현된 join method를 호출하면 해당 thread가 끝날 때까지 기다릴 것이다. 아래 예제 16-2에서는 이전 예제에서 만든 thread의 JoinHandle로 어떻게 join을 호출해서 메인 thread가 끝을 내기 전에 생성된 thread를 기다리는지 볼 수 있다.

예제 16-2: Thread가 완료될 때까지 실행하기 위해서 thread::spawnJoinHandle을 저장하기

// Filename: src/main.rs
use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

Handle의 join을 호출한 thread가 끝날 때 까지 현재 실행 중인 thread를 멈추게 한다. Thread를 멈추게 한다(blocking)는 것은 이 thread가 어떤 작업을 진행하거나, 끝내는 것을 막고 있다는 것을 의미한다. 위 예제에서 메인 thread의 for 반복문 이후에 join을 호출했기 때문에, 그 실행 결과는 아래와 비슷하게 나올 것이다.

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

두 thread가 서로 얽혀서 출력하다가, 메인 thread가 handle.join()을 실행하는 시점에서는 생성된 thread의 작업이 끝날 때까지 기다리고 있게 된다.

반대로 handle.join()을 메인 thread의 for 반복문 앞에 두면 어떻게 되는지 보자.

// Filename: src/main.rs
use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

메인 thread는 생성된 thread가 끝날 때까지 기다린 다음에서야 자신의 for를 실행하므로, 출력물은 더 이상 순서가 섞여서 나오지 않게 된다.

hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!

이렇게 어디에서 join이 실행되느냐와 같은 작은 차이로 인해 thread가 동시에 실행될 것인지 아닌지에 영향을 미친다.


move Closures를 Thread에서 이용하기

move closure는 한 쪽 thread의 데이터를 다른 thread로 옮기게 할 수 있어서 thread::spawn과 함께 종종 쓰인다.

챕터 13에서 move 키워드를 closure의 parameter 리스트 앞에 붙여서 closure가 가진 환경 안에서 쓰는 값들의 ownership을 가져갈 수 있다고 언급했었다. 이 테크닉이 특히나 유용할 수 있는 경우는, 이렇게 새로운 thread를 만드는 경우에 thread 간에 사용하는 값들의 ownership을 옮기고 싶을 때이다.

예제 16-1에서의 closure는 argument로 받는 것이 없었다. 새로 생성한 thread가 실행할 코드가 메인 thread에서 어느 데이터도 필요로 하지 않았기 때문이다. 그렇게 메인 thread의 데이터를 사용하기 위해서는, 생성된 thread의 closure에서 필요한 값을 필히 가져가야 한다. 예제 16-3은 메인 thread에서 만든 vector를 생성된 thread에서 사용하는 내용이다. 하지만 이는 제대로 작동하지 않는 것을 곧 보게 될 것이다.

예제 16-3: 메인 thread의 vector를 다른 thread에서 사용하려고 시도

// Filename: src/main.rs
use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Closure가 v를 사용하고자 하므로, closure의 환경에 가져가려고 할 것이다. thread::spawn 함수가 이 closure를 새로운 thread에서 실행시키기 때문에, 그 새로운 thread가 v에 접근할 수 있어야 한다. 하지만 이 예제를 컴파일하려고 하면 아래와 같은 오류를 받게 된다.

error[E0373]: closure may outlive the current function, but it borrows `v`,
which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
help: to force the closure to take ownership of `v` (and any other referenced
variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^

Rust는 어떻게 v를 가져갈 것인지를 추측한다. 위 코드에서 println!v의 reference만을 필요로 하므로, closure는 v를 빌려가려고 시도하게 된다. 하지만 여기서 문제가 있다: Rust는 새롭게 생성된 thread가 얼마나 오랫동안 실행될 것인지 모르기 때문에 v의 reference가 언제까지 살아있을 것인지도 모른다는 점이다.

아래 예제 16-4는 그렇게 v의 reference가 죽어서 문제가 될 수 있는 시나리오를 보여준다.

예제 16-4: 메인 thread가 v를 drop한 뒤에 그 reference를 가져가려는 closure

// Filename: src/main.rs
use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    drop(v); // oh no!

    handle.join().unwrap();
}

만약 이 코드가 실행될 수 있다면, 그 생성된 thread는 즉시 백그라운드에 묻혀서 전혀 실행되지 않고 있을 가능성이 있게 된다. 생성된 thread는 v의 reference를 가지고 있는데, 메인 thread가 즉시 v를 드랍시켜버린다 (drop 함수는 챕터 15에서 다뤘다). 그러고 나서야 생성된 thread가 실행되기 시작할 것인데, v는 더 이상 살아있지 않으므로 그 reference 역시 마찬가지이다. 그라믄 안돼!

예제 16-3의 컴파일 에러를 고치기 위해 그 에러 메세지가 주는 조언을 들어보자.

help: to force the closure to take ownership of `v` (and any other referenced
variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^

By adding the move keyword before the closure, we force the closure to take ownership of the values it’s using rather than allowing Rust to infer that it should borrow the values. The modification to Listing 16-3 shown in Listing 16-5 will compile and run as we intend: Closure 앞에 move 키워드를 붙여줌으로써 closure가 사용할 값들을 보고 빌려가야 하는지 마는지를 추측하기보다는 아예 그 값들의 ownership을 취하도록 하게 한다. 그렇게 고친 예제 16-5는 컴파일이 되고 우리가 원하는 대로 실행될 것이다.

예제 16-5: Closure가 사용할 값의 ownership을 취하도록 강제하는 move 키워드

// Filename: src/main.rs
use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

그렇다면 예제 16-4에서 봤던 것처럼 move closure를 사용하는 상황에서도 메인 thread가 vdrop하면 어떻게 될까? move가 이럴 때도 문제를 해결해줄까? 유감스럽게도 그렇지 않다. 그리고 또 다른 오류 메세지를 받게 될 것이다. move closure를 사용한다면 v는 closure의 환경 안으로 이동하게 되는데, 그렇다면 메인 thread가 더 이상 vdrop할 수 없게 되는 것이다. 그리하여 아래와 같은 컴파일 오류를 얻게 된다.

error[E0382]: use of moved value: `v`
  --> src/main.rs:10:10
   |
6  |     let handle = thread::spawn(move || {
   |                                ------- value moved (into closure) here
...
10 |     drop(v); // oh no!
   |          ^ value used here after move
   |
   = note: move occurs because `v` has type `std::vec::Vec<i32>`, which does
   not implement the `Copy` trait

Rust의 ownership 규칙이 우리를 다시 한 번 살려줬다! 예제 16-3에서 에러가 났던 이유는 보수적인 판단을 하는 Rust가 thread가 사용할 v를 빌리가려고만 했고, 이는 메인 thread가 이론적으로 생성된 thread의 reference를 죽여버릴 가능성을 야기하기 때문이다. Rust에게 v의 ownership을 생성된 thread로 옮기라고 하면, 이제 더 이상 메인 thread에서는 v를 사용할 수가 없음이 보장된다. 만약 예제 16-4 역시 같은 식으로 바꾼다면, 메인 thread에서 v 사용하려고 할 때 ownership 규칙을 어기게 되는 것이다. move 키워드는 Rust가 기본적으로 생각하는 보수적인 대여 방식을 무시하기는 하지만, ownership 규칙을 위반하게 하지는 않는다.

지금까지의 thread와 thread API에 대한 기초적인 이해를 바탕으로 우리가 어떤 일을 더 할 수 있는지 알아보자.



Thread 간의 데이터가 오가기 위해 Message Passing 사용하기

안전한 concurrency를 보장하기 위한 방법 중에 유명해지고 있는 것이 message passing으로, 이는 thread나 actor가 데이터를 가진 메세지를 서로에게 전달하면서 소통하는 방식이다. Go 언어의 문서에 쓰여진 슬로건에서 그 아이디어를 찾아볼 수 있다: "Do not communicate by sharing memory; instead, share memory by communicating."

Rust에서 message-sending concurrency를 다루기 위한 주요 도구 중 하나로 channel이 있는데, 이것은 Rust의 기본 라이브러리에서 구현한 프로그래밍 개념이다. 프로그래밍에서의 channel을 계곡이나 강 같은 물 줄기로 생각해보자. 그 물 줄기 위로 고무 오리나 보트를 띄우면, 그 물길을 따라 아래 쪽으로 흘러갈 것이다.

프로그래밍에서의 channel은 transmitter와 receiver 두 부분으로 나뉜다. Transmitter는 강에서 고무 오리를 띄우는 상류 지점이고, receiver는 흘러간 고무 오리가 멈추게 되는 하류 지점이다. 한 쪽 코드에서는 전송하고자 하는 데이터를 transmitter의 method에 담아 호출하고, 다른 한 편에서는 도착한 메세지가 있는지를 receiving 지점에서 확인하는 것이다. Transmitter나 receiver 둘 중 한 쪽이라도 중단됐다면, 그 channel은 닫혔다(closed)고 한다.

이제 우리는 한 thread는 값을 생성해서 channel에 내보내고, 다른 thread는 그 값들을 받아서 출력하는 프로그램을 만들어 볼 것이다. 이런 기능을 소개하는 것이 주 목적이므로 이 프로그램은 단순한 값을 thread 간에 보낼 것이다. 이 기술에 어느 정도 익숙해지고 나면, 당신은 channel을 이용해서 채팅 시스템을 만들거나, 여러 thread들이 계산을 나눠서 한 뒤에 다른 하나의 thread에 전송하고 그 thread가 결과들을 모을 수 있는 시스템을 만들 수도 있을 것이다.

일단 아래 예제 16-6에서는 아무 일도 하지 않는 channel을 만들었다. 그리고 우리가 이 channel에 어떤 타입의 값을 보낼 것인지 아직 알려주지 않았기에 Rust는 컴파일을 거부할 것이다.

예제 16-6: Channel을 만들어서 txrx로 할당하기

// Filename: src/main.rs
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

mpsc::channel 함수를 통해 새로운 channel을 만들었는데, mpscmultiple producer, single consumer를 의미한다. 즉, Rust의 기본 라이브러리가 구현해 놓은 channel은 전송할 수 있는 지점이 여러 개이지만, 그 전송된 값을 받는 지점은 오직 하나를 가지고 있다는 뜻이다. 지금 우리는 producer도 하나인 프로그램으로 시작할 것이지만, 나중에는 여러 producer를 추가하게 될 것이다.

mpsc::channel 함수는 tuple을 반환하는데, 그 첫 원소는 발신 지점(transmitting end)이고 두 번째 원소는 수신 지점(receiving end)이다. 변수명 txrx는 각각 transmitter와 receiver를 나타내는 축약어로, 많은 분야에서 전통적으로 쓰이고 있는 것이므로 여기서도 이렇게 이름지었다. 또한 tuple을 분해하는 패턴을 let statement에 사용했는데, 이는 챕터 18에서 자세히 다룰 것이므로 넘어가겠다.

이제 전송하는 지점을 새로 생성한 thread로 옮겨주고 거기서 string 하나를 보내게 해서 메인 thread와 통신하도록 해보겠다. 이는 마치 강의 상류에서 고무 오리를 띄우는 것과 같고, 채팅 메세지를 한 쪽에서 다른 쪽으로 보내는 것과 같다.

예제 16-7: tx를 생성된 thread에 옮겨서 "hi"를 보내기

// Filename: src/main.rs
use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

이전에 했던 것처럼 thread::spawn을 사용해서 새로운 thread 하나를 만들었고, move를 사용하여 tx를 closure 안으로 옮겨서 생성된 thread가 tx를 소유하도록 했다. 생성된 thread가 channel을 통해서 메세지를 보내기 위해서는 발신 지점을 소유해야 한다.

발신 지점에서는 우리가 보내고자 하는 값을 취하는 send method가 있다. 이 send method는 Result<T, E> 타입을 반환하므로, 수신 지점이 이미 죽어서 더 이상 값을 보낼 곳이 없을 경우에 send는 오류를 반환할 것이다. 위 예제에서 우리는 unwrap을 사용해서 에러일 경우에는 패닉하도록 해두었다. 하지만 실제로 사용할 때에는 그 에러를 제대로 처리해주어야 할 것이다. 적절한 오류 처리에 대해 복습하고자 한다면 챕터 9을 복기하길 바란다.

아래 예제 16-8에서는 메인 thread에 있는 channel의 수신 지점에서 값을 받게 할 것이다. 이는 강가의 하류 끝으로 떠 내려온 고무 오리를 받는 것과 같고, 채팅 메세지를 수신하는 것과 같다.

예제 16-8: 메인 thread에서 "hi"를 받아서 출력하기

// Filename: src/main.rs
use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Channel의 수신 지점에는 두 가지 유용한 method로 recvtry_recv가 있다. 위 예제에서는 receive의 의미를 갖는 recv를 사용했는데, 이 method는 메인 thread의 실행을 멈추고 channel로 어떤 값이 도착할 때까지 기다리게 한다. 그래서 어떤 값이 도착하면, recv는 그 값을 Result<T, E> 타입으로 반환할 것이다. 만약 channel의 수신 지점이 이미 닫혀있다면 recv는 오류를 반환해서 더 이상 값이 들어오지 않을 것이라는 신호를 주게 된다.

try_recv method는 메인 thread의 실행을 멈추지 않고, 바로 Result<T, E>를 반환할 것이다. 만약 수신할 수 있는 값이 있었다면 그 값을 담은 Ok를 반환할 것이고, 수신할 값이 없다면 Err 값을 보내게 된다. try_recv는 메세지를 기다리는 동안 thread가 다른 할 일이 있을 때 유용하게 사용할 수 있다. try_recv를 매 시점에서 호출하여 수신할 값이 있는지 확인하고, 그 값이 있다면 메세지를 처리하고 그렇지 않다면 다시 try_recv로 체크하는 잠깐 사이에 다른 일을 하는 반복문을 쓸 수도 있는 것이다.

여기서는 예제의 단순함을 위해서 recv를 사용했다. 메인 thread가 메세지를 기다리는 것 외에는 아무런 할 일이 없으므로, 메인 thread가 멈춰있는 것이 잘못된 행동은 아니다.

예제 16-8을 실행시키면 메인 thread가 값을 받아서 출력하는 것을 볼 수 있을 것이다.

Got: hi

완벽하다!


Channel과 소유권 이전

Ownership 규칙은 메세지를 보내는 데에 있어서 안전한 concurrent 코딩을 할 수 있도록 도와주는 굉장히 중요한 역할을 한다. Rust 프로그램 전반에 걸쳐 ownership을 생각해야 하기 때문에 Concurrent 프로그래밍에서 오류를 예방할 수 있게 되는 것이다. Channel과 ownership이 함께 엮여서 오류들을 어떻게 예방할 수 있는지 실험을 해보겠다. 이미 추가 thread가 channel에 val 값을 보낸 다음에 그 값을 사용하도록 해보자. 예제 16-9의 코드를 컴파일하려고 해보면, 왜 안되는지 대답을 들을 수 있다.

예제 16-9: Channel에 보낸 다음에 val을 사용하려는 시도

// Filename: src/main.rs
use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

tx.send를 통해 channel에 val을 보낸 뒤에, 그 val을 출력하려고 한다. 이를 허락하는 것은 나쁜 생각이다. 어떤 값이 다른 thread로 내보내진 후에는, 그 thread가 값을 받아서 메인 thread가 그 값을 사용하려고 하기 전에 변경하거나 없앨 수 있기 때문이다. 그렇게 되면 thread가 값을 받아서 변경함으로써 데이터가 일관적이지 못하거나 존재하지 않게 되어 오류를 불러일으킬 가능성을 만들게 된다. 하지만 Rust는 예제 16-9의 코드를 컴파일할 때 에러를 되돌려준다.

error[E0382]: use of moved value: `val`
  --> src/main.rs:10:31
   |
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value used here after move
   |
   = note: move occurs because `val` has type `std::string::String`, which does
not implement the `Copy` trait

이렇게 코드의 concurrency에 대한 실수는 컴파일 시점에서의 에러로 끝이 났다. send 함수가 그 parameter의 ownership을 가져가버리기 때문에 그 값은 이동하고, receiver는 그 ownership을 전달 받는다. Ownership 시스템이 이런 사항을 모두 체크하기 때문에, 값을 이미 전송한 뒤에 또 사용하는 실수를 막을 수 있다.


여러 값을 보낸 뒤 Receiver가 기다리는 것을 확인하기

예제 16-8의 코드는 컴파일이 되고 실행 가능했지만, channel 위에서 두 개의 개별적인 thread가 서로 이야기하고 있는지를 명확하게 보여주지는 못 했다. 그래서 코드를 약간 개선하여, 아래 예제 16-10에서는 실제로 그 예제가 동시적으로 실행하고 있는지 확인할 수 있도록 했다. 생성된 thread는 여러 메세지를 1초 씩 쉬면서 보낼 것이다.

예제 16-10: 여러 메세지를 조금씩 쉬면서 보내기

// Filename: src/main.rs
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

메인 thread에서는 더 이상 recv 함수를 명시적으로 호출하지는 않는다. 그 대신 우리는 rx를 iterator로 취급하여 사용한다. 매번 어떤 값을 수신할 때마다 그 값을 출력하게 될 것이다. 그리고 channel이 닫힌다면 반복 역시 끝날 것이다.

예제 16-10을 실행시키면 1초 마다 한 줄 씩의 출력을 볼 수 있을 것이다.

Got: hi
Got: from
Got: the
Got: thread

메인 thread에는 일시 정지에 관련된 어떠한 코드도 없으므로, 생성된 thread에서 값을 수신하기 위해 메인 thread가 대기하고 있다는 것을 확실하게 알 수 있다.


Trasmitter를 Clone하여 여러 producer 만들기

mpsc는 multiple producer, single consumer의 약자라고 이야기했었다. 이 개념을 제대로 적용해서 예제 16-10의 코드를 확장해보겠다. 여러 thread를 만들어서 하나의 receiver에 값들을 전송하려고 한다. 이는 channel의 transmitting 부분을 복제해서 수행할 수 있다. 예제 16-11을 보자.

예제 16-11: 여러 producer를 통해서 여러 메세지 보내기

// Filename: src/main.rs
// --중략--

let (tx, rx) = mpsc::channel();

let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {}", received);
}

// --snip--

첫 번째 thread를 만들기 전에, channel의 발신 부분에 있는 clone을 사용했다. 이 작업을 통해서 새로운 발신 handle을 얻어서 이것을 첫 thread에 건넸다. 그리곤 channel의 원래 있었던 발신 지점을 두 번째 thread에 주었다. 이렇게 우리는 두 개의 thread를 만들었고, 각 thread는 channel의 수신 지점에 서로 다른 메세지를 보내게 했다.

이 코드를 실행시키면 아래와 같은 출력물을 받게 될 것이다.

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

메세지가 도착하는 순서는 당신의 운영 체제에 따라 다를 수 있다. 이러한 면이 concurrency가 재밌으면서도 어려운 점이다. thread::sleep으로 여러 thread에 각기 다른 시간을 기다리도록 실험을 해보면, 매 번 실행할 때마다 다른 출력 결과를 얻게 될 것이다.

지금까지 channel이 어떻게 작동하는 지 살펴보았다.이제 Concurrency의 또 다른 방법을 알아보러 가자.



Shared-State Concurrency

Message passing은 concurrency를 다루는 좋은 방법 중 하나이지만, 유일한 방법은 아니다. Go 언어의 문서에서 본 슬로건의 일부를 다시 생각해보자: "Communicate by sharing memory."

메모리를 공유해서 통신하는 것은 어떻게 이루어지는 것일까? 게다가, 왜 message-passing 신봉자들은 그런 방식을 거부하고 그 반대로 하려고 하는 것일까?

프로그래밍 언어에 상관없이 channel은 하나의 소유권과 비슷하다. 왜냐하면 channel에 하나의 값을 전송시키면 그 값은 더 이상 사용할 수 없기 때문이다. 메모리를 공유하는 concurrency는 다중 소유권과 비슷하다. 여러 thread가 하나의 메모리 위치를 동시에 접근할 수 있기 때문이다. 스마트 포인터가 다중 소유권을 가능하게 했던 챕터 15에서 봤듯이, 다중 소유권에 따른 여러 소유주들을 관리해야 하기 때문에 프로그램이 더욱 복잡해지게 되었었다. 물론 Rust의 타입 시스템과 소유권 규칙은 이러한 여러 소유주 관리를 할 수 있도록 많은 도움을 준다. 그 예제를 위해, 공유된 메모리를 위한 concurrency 단위로 굉장히 널리 알려진 것 중 하나인 mutex를 알아보겠다.


한 번에 한 Thread만 데이터에 접근을 허용하는 Mutex

Mutexmutual exclusion의 약어로, 어느 한 시점에서 데이터에 접근할 수 있는 권한을 단 하나의 thread에만 주는 것이다. Mutex 안에서 데이터에 접근하기 위해서는, thread는 필히 mutex의 lock을 가져가겠다는 신호를 보내서 데이터 접근을 원한다고 신호를 보내야 한다. Lock은 mutex의 일부로 존재하는 데이터 구조로, 현재 누가 데이터를 독점하고 있는지를 계속 추적한다. 따라서 mutex는 잠금(locking) 시스템을 통해서 데이터를 보호하는 보디가드로 설명할 수 있겠다.

Mutex는 어렵다는 평판을 가지고 있는데, 아래 두 가지 규칙을 준수해야하기 때문이다.

  • 데이터를 사용하기 전에 꼭 lock을 획득해야 한다.
  • Mutex가 지키는 데이터를 사용해서 용무를 다 봤다면, 반드시 데이터를 unlock해서 다른 thread가 lock을 획득할 수 있도록 해야 한다.

어느 컨퍼런스에서의 패널 토론장에서 마이크가 하나 뿐이라고 생각해보자. 패널 한 사람이 발언을 하기 위해서는 우선 마이크를 달라는 신호를 보내야 한다. 그래서 마이크를 잡고 하고 싶은 말을 다 하고 나면 다음에 발언하고 싶은 패널에게 마이크를 건네줘야 한다. 만약 그 사람이 마이크를 주는 것을 잊어버린다면, 아무도 더 이상 발언을 할 수 없게 된다. 그리고 단 하나의 마이크를 통해 이어지는 토론을 제대로 관리하지 않는다면, 패널 토론은 제대로 이루어지지도 못할 것이다!

Mutex를 제대로 관리하는 것은 굉장히 까다로운 일이어서 많은 사람들이 channel을 신봉하는 이유가 된다. 하지만 Rust의 타입 시스템과 소유권 규칙 덕분에 mutex를 잘못 잠그고 풀 일이 생기지 않게 된다.


Mutex<T>의 API

Mutex의 사용법을 위한 예제로, 단일 thread 상황에서 mutex를 쓰는 예제 16-12로 시작해보자.

예제 16-12: 간단한 Mutex<T> API 탐험을 위한 단일 thread 상황

// Filename: src/main.rs
use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

여타 다른 타입들처럼, Mutex<T>new 함수로 생성할 수 있다. Mutex 안에 담긴 데이터에 접근하기 위해서는 lock method를 사용해서 lock을 획득해야 한다. 이 함수 호출은 lock을 획득하기 전까지는 아무런 일도 하지 못하도록 현재 thread를 멈추게 한다.

만약 이미 lock을 가지고 있는 다른 thread가 패닉을 일으킨다면, lock 호출은 실패하게 된다. 이럴 경우에는 누구도 lock을 획득할 수 없으므로, 차라리 unwrap을 통해 lock을 호출하는 thread가 이런 문제일 경우에 패닉하도록 선택했다.

Lock을 잘 획득했다면 그 반환값을 이제 다룰 수 있는데, 위 예제에서는 num이 된다. 이 값은 mutex 안의 데이터에 접근할 수 있는 mutable reference이다. 타입 시스템은 mutex m 안에 있는 값을 사용하기 전에 lock을 확실하게 획득했는지를 보장한다. Mutex<i32>i32가 아니므로, i32 값을 사용하기 위해서는 lock을 획득해야만 하는 것이다. 그렇게 하지 않고서는 절대로 mutex 안의 i32 값을 사용하도록 타입 시스템이 놔두지 않기 때문에 우리는 이러한 사실을 절대 잊어버릴 수 없다.

아마 당신도 예상할 수 있었겠지만, Mutex<T>는 스마트 포인터이다. 좀 더 엄밀하게 말하자면, lock을 호출하면 MutexGuard라는 스마트 포인터를 반환한다. 이 스마트 포인터에는 우리의 내부 데이터를 가리키는 Deref가 구현되어 있고 또한 MutexGuard가 scope 밖으로 나갔을 때 자동적으로 lock을 풀도록 하는 Drop 역시 구현되어 있다. 이 Drop 특성은 예제 16-12의 내부 scope에서도 적용되는 것을 알 수 있다. 결론적으로 lock이 자동적으로 풀리는 특성 덕분에, 우리가 lock을 푸는 것을 까먹어서 다른 thread가 mutex에 접근하지 못하게 되는 위험 상황을 배제할 수 있는 것이다.

Lock을 없앤 다음에 우리는 mutex 값을 출력해서 그 안의 값이 6으로 제대로 바뀌었는지 확인할 수 있다.


여러 Thread 간에 Mutex<T> 공유하기

Now, let’s try to share a value between multiple threads using Mutex<T>. We’ll spin up 10 threads and have them each increment a counter value by 1, so the counter goes from 0 to 10. Note that the next few examples will have compiler errors, and we’ll use those errors to learn more about using Mutex<T> and how Rust helps us use it correctly. Listing 16-13 has our starting example: 이제 Mutex<T>를 이용해서 여러 thread 간에 값을 공유시켜보자. counter 값을 1씩 증가시키는 10개의 thread를 돌려서 counter를 최종적으로 0부터 10까지 증가시키려고 한다. 앞으로 보게 될 예제들은 컴파일 오류를 일으킬 것임을 미리 알려둔다. 우리는 이 오류들을 통해서 Mutex<T>를 사용하는 방법에 대해 더 배우고, 어떻게 Rust가 이를 올바르게 사용하도록 돕는지 보려고 한다. 아래 예제 16-13으로 시작해보자.

예제 16-13: Mutex<T>가 보호하는 counter를 증가시키는 10개의 thread

// Filename: src/main.rs
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

i32 값을 가지는 Mutex<T>counter 변수를 만든 다음에, 10개의 thread를 thread::spawn를 사용해서 만들고 모두 같은 closure를 주었다. 이 closure는 Mutex<T>lock method를 호출해서 lock을 획득하고, mutex 내부 값에 1을 더한다. Thread가 closure 실행을 마치고 나면 num은 scope 밖으로 나오게 되므로 lock을 풀어서 다른 thread가 다시 획득할 수 있도록 하게 해준다.

메인 thread에서는 예제 16-2에서 했던 것처럼 join handle을 모두 모은다. 그리고 모든 handle의 join을 호출해서 모든 thread가 일을 마치도록 한다. Thread들의 모든 일이 끝난 시점에서 메인 thread는 lock을 획득하고 결과값을 출력하게 된다.

앞서서 예제가 제대로 컴파일되지 않을 것이라 말했었다. 왜 그럴지 이제 알아보자!

error[E0382]: capture of moved value: `counter`
  --> src/main.rs:10:27
   |
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved (into closure) here
10 |             let mut num = counter.lock().unwrap();
   |                           ^^^^^^^ value captured here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error[E0382]: use of moved value: `counter`
  --> src/main.rs:21:29
   |
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved (into closure) here
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value used here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error: aborting due to 2 previous errors

오류 메세지가 말하길 counter 값이 closure 안으로 이동했고 lock을 호출하면서 잡혀야 한다고 한다. 이 설명이 우리가 원하는 바인 것 같은데, 컴파일러가 허락하지 않는다!

프로그램을 좀 더 단순화시켜서 이 상황이 어떻게 되는 것인지 알아보겠다. for 반복문에 10개의 thread를 만들지 말고, 딱 반복문 없이 2개만 만들어서 무슨 일이 일어나는지 보자.

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    let handle = thread::spawn(move || {
        let mut num = counter.lock().unwrap();

        *num += 1;
    });
    handles.push(handle);

    let handle2 = thread::spawn(move || {
        let mut num2 = counter.lock().unwrap();

        *num2 += 1;
    });
    handles.push(handle2);

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

아래는 단순화된 2개 thread를 사용하는 코드를 컴파일했을 때 나오는 결과이다.

error[E0382]: capture of moved value: `counter`
  --> src/main.rs:16:24
   |
8  |     let handle = thread::spawn(move || {
   |                                ------- value moved (into closure) here
...
16 |         let mut num2 = counter.lock().unwrap();
   |                        ^^^^^^^ value captured here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error[E0382]: use of moved value: `counter`
  --> src/main.rs:26:29
   |
8  |     let handle = thread::spawn(move || {
   |                                ------- value moved (into closure) here
...
26 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value used here after move
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`,
   which does not implement the `Copy` trait

error: aborting due to 2 previous errors

아하! 첫 번째 오류 메세지가 말하길 counter는 첫 thread에 연결된 closure로 이동했다고 한다. 이 이동으로 인해 두 번째 thread에서 lock을 실행해서 counter를 가져오는 것이 불가능해지는 것이다! 즉 Rust는 counter의 소유권을 여러 thread로 이동시킬 수 없다고 이야기하고 있다. 이전 예제에서는 반복문 안에 thread들이 쓰여져 있었기 때문에 서로 다른 thread임을 에러 메세지에서 지적할 수가 없었다. 이 컴파일 에러를 챕터 15에서 소개했던 다중 소유권 방법을 사용해서 고쳐보도록 하자.


여러 Thread 상황에서의 다중 소유권

챕터 15에서는 reference를 세기 위해서 스마트 포인터 Rc<T>를 사용해 여러 소유주에게 값을 주었었다. 여기서도 이 방법을 사용해보고 어떻게 되는지 알아보자. 예제 16-14에서는 Mutex<T>Rc<T>로 감싸고 thread에 소유권을 넘겨주기 전에 그 Rc<T>를 clone한다. 우리는 이미 이전 예제의 오류 메세지를 파악했기 때문에 다시 for 반복문을 사용하고 move 키워드를 closure에 넣을 것이다.

예제 16-14: 여러 thread가 Mutex<T>를 소유하기 위한 Rc<T> 사용 시도

// Filename: src/main.rs
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

다시 한 번 컴파일을 해보면... 다른 오류를 받는다! 이 컴파일러는 우리에게 많은 것을 가르쳐주고 있다.

error[E0277]: the trait bound `std::rc::Rc<std::sync::Mutex<i32>>:
std::marker::Send` is not satisfied in `[closure@src/main.rs:11:36:
15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
  --> src/main.rs:11:22
   |
11 |         let handle = thread::spawn(move || {
   |                      ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>`
cannot be sent between threads safely
   |
   = help: within `[closure@src/main.rs:11:36: 15:10
counter:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send` is
not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
   = note: required because it appears within the type
`[closure@src/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
   = note: required by `std::thread::spawn`

와, 이 에러 메세지는 꽤 말이 많다! 그 중에 몇 가지 집중해서 봐야할 중요한 부분이 있다: 첫 inline 에러에서 std::rc::Rc<std::sync::Mutex<i32>>가 thread 사이에서 안전하게 보내지지 않는다고 말한다. 그 이유는 두 번째 집중해서 봐야할 부분에서 나오는데, 아래 자세하게 설명하는 에러 메세지가 말하길 trait bound Send가 충족되지 않는다고 한다. Send는 concurrent한 상황에서 thread와 함께 사용하는 타입들을 안전하게 보장하는 trait 중 하나로, 다음 섹션에서 설명하겠다.

유감스럽게도 Rc<T>는 thread 사이에서 사용하기에 안전하지 않았다. Rc<T>가 reference 세는 것을 관리한다는 것은 그 reference의 개수를 clone할 때마다 더하고 그 clone이 drop될 때마다 뺌으로써 이루어진다. 하지만 이렇게 reference 개수를 셈하는 방식이 다른 thread에 의해서 방해받지 않도록 보장하는 concurrency를 위한 기본 기능 (concurrency primitives)을 전혀 사용하지 않는다. 따라서 메모리 누수나, 우리가 아직 미처 다 사용하지 않은 값이 먼저 제거되는 등의 미묘한 버그가 일어나 reference의 개수를 잘못 셀 수 있게 된다. 우리는 thread 간에 안전하게 사용할 수 있는 (thread-safe) Rc<T>가 필요하다.


Arc<T>를 통한 Atomic Reference Counting

다행히도 그러한 타입이 바로 Arc<T>이다. 여기서의 Aatomic을 나타내는데, 이는 atomically reference counted 타입임을 의미한다. Atomic은 또 하나의 concurrency 기본 개념 (primitives)으로, 여기서 자세한 설명은 생략한다. 기본 라이브러리 문서 std::sync::atomic를 참조하길 바란다. 지금 이 시점에는 그저 atomic이 thread 간에 안전하게 공유될 수 있는 기초적인 타입이라고 생각하자.

그렇다면 모든 기본 타입들이 atomic하면 되지 않을까, 왜 기본 라이브러리의 타입들은 Arc<T>를 기본적으로 사용하도록 구현하지 않았을까 하는 의문을 가질 수도 있을 것이다. Thread 안전을 보장하는 것은 성능 저하를 동반하기 때문에, 사용자가 정말 원할 때에만 그 기능을 사용할 수 있어야 한다는 것이 그 이유이다. 단순히 단일 thread에서 작업을 하는 경우에는 atomic이 보장하는 것들을 사용할 필요 없다. 그게 더 빠르기 때문이다.

다시 예제로 돌아가보자. Arc<T>Rc<T>는 같은 API를 가지고 있기 때문에, 크게 코드를 변경할 점이 몇 가지 없다. 아래 예제 16-15가 마침내 컴파일이 성공하고 실행이 될 코드이다.

예제 16-15: 여러 thread 간에 소유권을 공유할수 있도록 Mutex<T>Arc<T>로 감싸서 사용하기

// Filename: src/main.rs
use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

이 코드는 이렇게 출력할 것이다.

Result: 10

드디어 해냈다! 0부터 10까지 세는 것이 그다지 인상적이지 않을 수도 있겠으나, 그래도 Mutex<T>와 thread 안전성에 대해서 많은 것을 배울 수 있었는 시간이었다. 이 예제의 구조를 응용해서 숫자를 세는 것보다 더욱 복잡한 작업을 할 수도 있을 것이다. 독립적으로 계산할 수 있는 부분을 분리하여 thread들에게 넘기고, Mutex<T>를 사용해서 최종 결과를 thread들이 업데이트하도록 하는 것이 이 예제에서 볼 수 있는 전략이다.


RefCell<T>/Rc<T> 그리고 Mutex<T>/Arc<T> 간의 유사성

당신은 counter가 immutable 변수였지만, 그 내부의 값은 mutable reference였던 것을 알아차렸을 수도 있다. 이는 Cell family와 마찬가지로 Mutex<T> 자체가 내부적으로 mutability를 제공한다는 것을 의미한다. 챕터 15에서 RefCell<T>를 통해서 Rc<T> 내부 내용을 바꿀 수 있도록 했던 것처럼, Mutex<T>를 통해서 Arc<T> 내부 내용을 바꿀 수 있다.

한 가지 더 당부하고 싶은 바는, Rust가 Mutex<T>를 사용할 때의 모든 논리적인 오류를 잡아낼 수는 없다는 것이다. 챕터 15에서 두 Rc<T> 값들이 서로를 가리키게 되면서 메모리 누수를 유발하는 reference 순환이 만들어질 수 있는 위험성을 보았었다. 마찬가지로 Mutex<T> 역시 deadlock을 만들 위험이 따른다. Deadlock을 맛보고 싶다면, 직접 deadlock이 생기는 Rust 프로그램을 만들어보자. 그리고 언어와 상관없이 mutex를 사용하는데 있어서 deadlock을 해결하는 전략을 찾아보고 이를 Rust에서 한 번 구현해보라. Mutex<T>MutexGuard의 기본 라이브러리 API 문서가 많은 도움이 될 것이다.

마지막으로 SendSync trait을 소개하고, 이를 통해서 커스텀 타입을 어떻게 사용하지는 알아보면서 이 챕터를 마무리하겠다.



SyncSend trait을 활용한 Concurrency 확장

Rust는 그 언어가 가진 concurrency를 위한 기능이 너무도 적다는 것이 재미있는 점이다. 이번 챕터에서 소개했던 거의 모든 concurrency 기능들은 기본 라이브러리에서 제공하는 것이지, 언어 자체가 지원하는 것이 아니다. 그리고 concurrency를 다루는 선택권은 그 언어나 기본 라이브러리에 제한되지도 않는다. 직접 concurrency 기능을 만들어 낼 수도 있고, 다른 사람이 구현한 것을 사용할 수도 있다.

하지만, 언어에 녹여져 있는 두 개의 concurrency 개념이 있다. std::marker에 속한 trait인 SyncSend이다.


Send를 통해서 Thread 간의 소유권이 이동하는 것을 허락하기

Send marker trait을 통해 구현한 타입은 그 소유권이 thread 간에 이동할 수 있도록 만들어준다. Rust 타입의 대부분은 Send이지만, Rc<T>와 같은 예외적인 것도 있다. Rc<T> 값을 복제해서 다른 thread에 소유권을 옮기려고 할 때, 두 thread 모두 동시에 reference 개수를 업데이트할 수도 있기 때문에, Rc<T>Send가 되면 안 된다. 이러한 이유에서 Rc<T>는 별도의 thread 안전을 위한 성능 저하를 감수할 필요가 없는 단일 thread 상황에서 쓰이도록 만들어진 것이다.

따라서 Rust의 타입 시스템과 trait 제한(bound)를 통해서 thread 간에 Rc<T> 값을 위험하게 보내는 것이 실수로라도 용납되지 않는다. 예제 16-14에서 Rc<Mutex<i32>>Send를 구현하지 않았기 때문에 컴파일 에러를 받았던 것을 기억해보자. 그리고 이를 SendArc<T>로 바꿨을 때, 컴파일이 성공했었다.

Any type composed entirely of Send types is automatically marked as Send as well. Almost all primitive types are Send, aside from raw pointers, which we’ll discuss in Chapter 19. 어떤 타입이라도 Send 타입으로만 온전하게 이루어져 있다면 그 타입 역시 Send가 된다. 대부분의 기초 타입들은 Send인데, 챕터 19에서 다룰 raw pointer는 그렇지 않음을 말해둔다.


Sync를 통해서 여러 Thread가 접근할 수 있도록 허용하기

Sync marker trait을 구현한 타입은 여러 thread로부터 참조될 수 있다. 좀 더 설명하자면, 어떤 타입 T의 reference &TSend라면 그 타입은 Sync인데, 이는 그 reference가 다른 thread로 안전하게 보내질 수 있다는 것을 의미한다. Send와 마찬가지로 기본 타입들 모두 Sync이며, Sync 타입들로만 이루어진 타입 역시 Sync이다.

The smart pointer Rc<T> is also not Sync for the same reasons that it’s not Send. The RefCell<T> type (which we talked about in Chapter 15) and the family of related Cell<T> types are not Sync. The implementation of borrow checking that RefCell<T> does at runtime is not thread-safe. The smart pointer Mutex<T> is Sync and can be used to share access with multiple threads as you saw in the “Sharing a Mutex<T> Between Multiple Threads” section. 스마트 포인터 Rc<T>Send가 아니었던 이유로 인해 Sync 또한 아니다. 챕터 15에서 소개했던 RefCell<T> 타입과 그와 관련한 Cell<T> family도 Sync가 아니다. 런타임에서 RefCell<T>의 대여를 검사하는 것은 thread 간에 안전하도록 구현된 것이 아니다. 반면에 스마트 포인터 Mutex<T>Sync이기 때문에 여러 thread가 접근할 수 있었다. "여러 Thread 간에 Mutex<T> 공유하기" 섹션에서 봤듯이 말이다.


손수 SendSync를 구현하는 것은 Unsafe Rust이다

SendSync로 구성된 타입들은 자동적으로 SendSync가 되기 때문에 우리는 이 trait을 직접 구현할 필요는 없다. 이들은 marker trait이기 때문에 더욱이 구현해야될 method조차 없다. 이 trait들은 concurrency와 관련된 일관성을 유지시키기 위해서 사용되는 것이다.

이 trait들을 손수 구현하는 것은 unsafe Rust 코드이다. Unsafe Rust에 대해서는 챕터 19에서 다룰 것이므로 지금은 SendSync로 이루어지지 않는 새로운 concurrent 타입을 구축한다는 것은 그 안전을 보장하기 위해서 굉장히 조심스럽게 생각해야한다는 점을 알아두는 선에서 마무리하겠다. Rustonomicon에서 더 자세한 내용을 찾아볼 수 있다.



정리하는 말

이 챕터가 concurrency를 다루는 마지막 챕터는 아니다. 챕터 20에서 그 개념을 사용해서 보다 현실적인 상황을 다룰 것이다.

앞서 말했듯이, Rust가 concurrency를 다루는 방법의 대부분이 언어에 새겨진 것이 아니기 때문에, 많은 concurrency 해법들이 crate으로 구현되어 있다. Crate들은 기본 라이브러리보다 더 빠르게 발전하므로, 여러 thread를 다루는 상황에서의 최신 crate들을 꼭 찾아보길 바란다.

Rust 기본 라이브러리는 message passing을 위한 channel과 Mutex<T>Arc<T>와 같이 concurrent 상황에서 안전하게 사용할 수 있는 스마트 포인터 타입을 제공한다. Rust의 타입 시스템과 소유권 규칙을 통해 data race나 이미 소멸한 reference와 같은 문제에 직면하지 않는 코딩을 보장한다. 코드가 컴파일에 성공했다는 것은, 여타 언어에서 흔하게 발생할 수 있는 추적하기 어려운 concurrency 버그 걱정 없이 기쁘게 실행해도 된다는 의미이다. Concurrent 프로그래밍은 더이상 두려운 개념이 아니다. 얼른 나가서 당신의 프로그램을 concurrent하게 만들어보자, 두려움 없이 말이다!

다음 챕터에서는 Rust 프로그램의 규모가 커짐에 따라 필요한 문제를 세우고 해법을 구조화하는 정석적인 방법을 소개할 것이다. 그리고 object-oriented 프로그래밍과 관련해서 Rust의 정석이 어떻게 연관되는지도 논의하려고 한다.

반응형