본문 바로가기
C,C++/C++

[C++] std::thread 기본 활용 방법을 알아보자 | mutex, condition_variable, 종료 방법

by woohyeon 2020. 7. 3.
반응형

개인적으로 공부하는 내용이므로 틀린 부분이 있을 수 있습니다. 있다면 알려주세요 :)

1. 공유 자원에 대한 race condition 해결하기 (mutex)


하나의 프로세스 내에 두 개 이상의 스레드가 존재하는 멀티 스레딩 환경에선 한가지 문제점이 존재한다. 바로 하나의 자원에 여러 스레드가 동시에 접근함으로 인해 생기는 문제점이다. 예를 들어 다음 프로그램은 10개의 스레드가 생성되면서 각자 Foo 함수를 실행한다. Foo 함수에선 s_num이라는 전역 변수를 증가시키고 감소시킨다. 따라서 결과 값은 0을 기대하지만 직접 실행해보면 항상 0이 보장되진 않는다. volatile 키워드는 테스트를 위해 임시적으로 컴파일러의 최적화를 차단해두기 위함이다.

volatile int g_num = 0;
void Foo()
{
    for (size_t i = 0; i < 10000; ++i)
    {
        ++g_num;
        --g_num;
    }
}

int main()
{
    thread threads[10];
    for (size_t i = 0; i < 10; ++i)
    {
        threads[i] = thread(Foo);
    }

    for (size_t i = 0; i < 10; ++i)
    {
        threads[i].join();
    }

    cout << g_num << endl; 

    return 0;
}

이와 같이 매번 다른 값이 나오는 이유는 공유 자원인 s_num이라는 변수에 여러 스레드가 동시에 접근하기 때문에 생기는 문제점이다. 예를 들어 첫 번째 스레드가 값을 1 증가시킨 후 1을 감소시키기 전에 스레드간의 문맥교환이 발생하였다. 그래서 두 번째 스레드가 증가된 값을 읽어 1에서 추가로 1을 증가시킨다. 그리고 다시 문맥교환이 발생하여 또 다른 스레드가 1을 증가시켜 3이 된다. 이제 첫 번째 스레드가 값을 감소시킨다. 원래라면 1에서 1이 감소해 0이 되어야 하지만 현재 값은 3이기 때문에 2가 된다. 이런식으로 얽히고 얽혀 정상적인 결과가 나오지 않는 것이 멀티 스레딩의 대표적인 문제점이다.


Mutex

이를 해결하려면 위와 같은 무질서한 상황을 막기 위해 무언가를 서로 맞춰주는 동기화가 필요하다. C++에선 이를 위해 mutex라는 클래스가 존재한다. mutex는 mutual exclusion의 약자로 상호 배제라는 뜻이며, 하나의 스레드만 접근하도록 임계 영역을 잠근다고 해서 lock이라고도 한다. 사용법은 다음과 같다. mutex를 사용하기 위해선 <mutex> 헤더파일을 인클루드해야 한다.

#include <mutex>

int   g_num = 0;
std::mutex g_mutex;

void Foo()
{
    for (size_t i = 0; i < 10000; ++i)
    {
        g_mutex.lock();
        ++g_num;
        --g_num;
        g_mutex.unlock();
    }
}

위와 같이 사용법은 간단하다. std::mutex 타입의 변수를 선언하고 여러 스레드가 동시에 접근될 수 있는 공유 자원을 사용하는 곳을 lock()unlock()을 통해 감싸주면 된다. 그러면 많은 스레드 중에 먼저 루프에 진입하여 lock 함수를 호출하는 스레드가 해당 공간을 점유하게 된다. 만약 unlock() 이전에 다른 스레드가 접근하려고 하면 블락되며 lock이 풀릴 때까지 기다려야 한다.


scoped_lock

lock() 호출 후 unlock()을 호출해주지 않으면 다른 스레드는 영원히 해당 영역에 접근하지 못한다. 위의 코드는 짧지만 만약 코드가 길다면 충분히 unlock()을 까먹을 수 있다. 마치 메모리 해제를 까먹는 것처럼. 이를 위해 RAII 원칙이 적용된 lock 기능을 제공한다. 다른 lock도 있지만 일단 scoped_lock만 보자. scoped_lock은 C++ 17부터 지원하며, 스마트 포인터처럼 자신의 영역을 벗어나면 자동으로 unlock()을 호출해준다. 혹시나 C++ 17을 사용하지 못하는 환경이라면 lock_guard를 사용할 수 있다. 사용법은 다음과 같다.

#include <mutex>

int   g_num = 0;
std::mutex g_mutex;

void Foo()
{
    for (size_t i = 0; i < 10000; ++i)
    {
        std::scoped_lock<std::mutex> sLock(g_mutex);
        ++g_num;
        --g_num;
    }
}

scoped_lock 객체가 생성되면서 lock()이 호출되고 객체가 소멸될 때 unlock()이 호출된다. mutex::lock을 직접적으로 사용하기보단 RAII가 적용된 lock_guard, scoped_lock 등을 사용하는 것이 좋다.
참고로 바로 lock을 호출하지 않도록 할 수도 있다. (아래 사진 두 번째 생성자 참고)


std::scoped_lock 클래스

image

scoped_lock과 같이 RAII 원칙이 적용된 객체를 최대한 좁은 범위에 설정하고 싶을 때 아래와 같이 괄호를 이용할 수 있다.

void Foo()
{
    ...
    {   
        std::scoped_lock<std::mutex> lock(g_mutex); // scope를 벗어나면 사라짐 
        ...
    }   // scope
    ...
}

 

2. 스레드의 실행 흐름 제어하기(sleep, wait, condition_variable)


스레드를 사용하다보면 일이 없을 때 또는 특정 조건에 작업을 중단하고 원할 때 스레드의 실행을 재개하도록 하고 싶을 수 있다. 이는 CPU를 불필요하게 점유하는 일이 없도록 하기 위해서다. 이를 위해 크게 sleep과 wait가 존재한다. sleepwait는 이름에서 알 수 있듯이 스레드를 재우고, 기다리게 한다. 즉 일시정지 시킨다. sleep은 시간이라는 특성을 이용하여 중지시키고, wait은 시간, 조건 또는 notify라는 신호를 받을때 까지 중지시킨다.


sleep은 다음과 같이 두 개가 존재한다.

  • std::this_thread::sleep_for(): 얼마동안 sleep 상태에 있을 것인지
  • std::this_thread::sleep_until(): 언제까지 sleep 상태에 있을 것인지

 

여기선 sleep_for()만 알아본다. 인자는 std::chrono 타입의 값을 받는다. 사용법은 다음과 같다. 참고로 chrono를 사용하기 위해선 <chrono> 헤더를 인클루드 해야 한다.

std::thread th1([]()
{
    // loop1
    while(true)
    {
        std::cout << "Hello from child thread \n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // ms 기준
    }
});

// loop2
while(true)
{
    std::cout << "Hello from main thread \n";
    std::this_thread::sleep_for(std::chrono::seconds(1)); // sec 기준
}

위 프로그램은 다음과 같이 동작한다. 메인 스레드가 th1 스레드를 생성하고 loop2에 진입한다. Hello from main thread을 출력하고 1초간 잠이 들고를 반복한다. th1 스레드 또한 문자열을 출력하고 1초간 잠드는 것을 반복한다. sleep 함수는 CPU에 부담을 덜 주기위한 용도로 사용될 수 있다. 그리고 sleep함수는 다른 스레드가 호출해줄 수 없다. 오로지 std::this_thread를 통해 현재 스레드가 실행하도록 해야한다. this_thread는 클래스에서 현재 객체를 가리키는 포인터 this와 비슷한 의미이다.


condition_variable(조건 변수)

waitnotify를 사용하기 위해선 condition_variable 이라는 클래스를 추가로 사용해야 한다. 이 조건 변수 클래스는 어떤 조건(condition)을 이용하여 스레드의 상태를 감지하고 제어한다. 여기서 살펴볼 것은 크게 다음과 같다.

  • wait()
  • notify_one()
  • std::unique_lock

조건 변수를 이용하여 스레드를 wait 상태로 만들고 notify라는 신호를 주어 깨우는 예제를 보자.

std::mutex g_mutex;
std::condition_variable g_controller;

// loop1
std::thread th1([]() 
{
    while(true)
    {
        std::unique_lock<std::mutex> lock(g_mutex);
        g_controller.wait(lock); // notify 신호에 의해 깨어날 때까지 blocking(waiting) 상태
        std::cout << "> Message from a child: Hello? \n";
    }
});

// loop2
while(true)
{
    std::string line;
    std::getline(std::cin, line);
    if(line == "q")
    {
        th1.detach();    
        break;
    }

    std::unique_lock<std::mutex> lock(g_mutex);
    g_controller.notify_one();
}

우선 위 프로그램의 동작 먼저 살펴보면 다음과 같다. 메인 스레드가 th1 스레드를 생성하고 loop2로 진입하여 사용자로부터 문자열을 입력받는다. 동시에 th1은 loop1로 진입하자마자 wait 상태가 된다. 메인 스레드에서 문자열 입력이 확인되면 notify_one()을 통해 th1 스레드에게 신호를 주면 th1 스레드는 Hello from a child thread와 같은 문자열을 출력하고 다시 wait 상태가 된다. 이와 같은 과정을 반복하다가 메인 스레드에서 'q'를 입력받으면 루프를 탈출하고 프로그램을 종료한다. th1 스레드는 detach 되어 메인 스레드가 종료되면 같이 종료된다.

결과는 다음과 같다.

image

이제 condition_variable 의 사용법을 살펴보자. 우선 조건 변수를 사용하기 위해선 mutex가 필요하다. 따라서 mutex 타입의 변수를 하나 선언한다. 그리고 조건 변수를 하나 선언한다. 이러한 변수들은 모든 스레드가 공유할 수 있어야 하므로 이를 염두에 두고 선언해야 한다.

그리고 스레드에게 wait이라는 상태 변화를 주기 전에 뮤텍스를 통해 꼭 잠금을 걸어야 한다. 잠금이 걸려있지 않은 상태에서 wait은 정의되지 않은 행동이다. 잠금은 지금까지 봐왔던 mutex::lock, lock_guard, scoped_lock 대신 unique_lock 이라는 것을 사용해야 한다. unique_lock도 RAII 원칙이 적용된 lock이다. mutex 객체를 전달하여 unique_lock 객체를 생성하면 잠금이 걸린다. wait()을 호출하면서 unique_lock 객체를 인자로 전달한다.

이때 wait()은 잠겨있던 lock을 풀고 스레드를 waiting 상태로 만든다. 즉 wait()은 unlock + wait 이다. 이는 waiting 상태에서는 아무 동작도 할 수 없기 때문에 그동안 다른 곳에서 lock을 사용할 수 있도록 하는 것이다. 그러다가 notify 신호를 받게 되면 다시 lock이 걸리면서 waiting 상태에서 깨어난다. 만약 다른 이유로 lock을 잡을 수 없는 상황이면 lock을 잡을 수 있을 때까지 기다린다.
(i.e. 다른 곳에서 동일한 mutex로 lock을 잡고 있을 경우)

notify_one()을 호출하기 전에도 lock을 걸어주는 것을 볼 수 있다. 위의 상황에선 없어도 되지만 일단은 세트로 생각하자. Detail

 

wait에 조건(서술 함수) 추가하기

위의 프로그램에선 자식 스레드 1개 메인 스레드 1개가 실행된다. 그중에서 자식 스레드는 wait을 하고 메인 스레드는 notify를 함으로써 자식 스레드를 깨운다. 그런데 이 순서가 항상 지켜질까? 만약 wait을 하기전에 notify를 호출하면 어떻게 될까? 위 프로그램에선 메인 스레드가 먼저 실행이 되더라도 사용자로부터 입력을 받기까지의 시간이 소요되므로 자식 스레드가 항상 먼저 waiting 상태에 들어가므로 문제가 없다. 예를 들어 아래와 같이 게임에서 드랍된 아이템을 주으면 DB에 아이템 정보가 저장하는 프로그램이 있다고 생각해보자.

std::mutex g_mutex;
std::condition_variable g_ctrl
std::queue<std::string> g_itemQueue;  // 아이템 정보를 저장할 큐

void PushItemToQueue() // producer
{
    std::unique_lock<std::mutex> lock(g_mutex);
    g_itemQueue.push("gameItem");

    g_ctrl.notify_one();
}

void SaveItemToDB()    // consumer
{
    std::string item;
    {
        std::unique_lock<std::mutex> lock(g_mutex);
        g_ctrl.wait(lock);

        item = g_itemQueue.front();
        g_itemQueue.pop();
    }
    std::cout << "Saved game item in the Database \n";
}

int main()
{
    std::thread producer(PushItemToQueue);
    std::thread consumer(SaveItemToDB);

    producer.join();
    consumer.join();

    return 0;
}

위 프로그램의 목표는 다음과 같다. 우선 consumer 스레드에 의해 SaveItemToDB() 함수가 실행되면서 wait 상태에 들어간다. 이후 producer 스레드가 PushItemToQueue() 함수를 실행하여 게임 아이템을 큐에 저장한다. 그리고 waiting 중인 스레드를 깨운다. 깨어난 스레드는 큐에서 아이템 정보를 빼낸다. 그리고 데이터 베이스에 저장이 되었다고 출력한다. 이것이 우리가 바라는 목표이다.

참고: 이처럼 하나의 스레드는 큐에 데이터를 저장만 하고 다른 스레드는 큐에 저장된 데이터를 처리만 하는 패턴을 Producer-consumer(생산자-소비자) 패턴이라 한다.

하지만 위 프로그램은 원하는대로 동작하지 않을 수 있다. 만약 producer 스레드가 함수 내에서 lock을 먼저 잡았다고 가정하면 consumer 스레드는 lock이 풀릴 때까지 waiting 상태로 진입할 수 없다. 이러한 경우 notify 신호를 받을 때 까지 무한정 기다리게 된다. 하지만 이미 notify_one()을 호출하는 함수는 종료되었기 때문에 consumer 스레드는 종료되지 않는다. sleep을 사용하여 실행 순서를 어떻게 해볼 수 있겠지만 한계가 있다.

이러한 상황을 위해 wait 함수는 Predicate(서술 함수)를 추가로 지원한다. 서술 함수란 이처럼 실행 순서가 보장되지 않는 상황에서 조건을 추가하여 그 조건에 따라 waiting 상태로 들어갈지 계속 진행할지를 결정한다. 서술 함수는 bool을 반환해야 하며 사용자가 적절한 조건을 파악하여 작성한다. 예를 들어 위 프로그램의 경우 큐에 데이터가 존재한다면 wait에서 깨어나서 처리하는 것이 올바른 로직이다. 때문에 위 프로그램에서 wait을 다음과 같이 수정한다.

g_ctrl.wait(lock, [&]() { return !g_itemQueue.empty(); }); // 람다 함수 사용


서술 함수가 true를 반환한다면 wait 상태에서 깨어나서 계속 진행을 한다. false라면 계속해서 기다린다. 위 프로그램에선 큐가 비어있다면 기다리고 큐에 데이터가 존재하면 깨어나서 데이터를 처리하는 것이 맞다. 즉 !g_itemQueue.empty()가 true라면 깨어나서 데이터를 처리한다. 위와 같이 코드를 변경하게 되면 wait 이전에 notify가 발생하더라도 조건을 판단하여 깨어날지 말지를 판단한다. 물론 wait이 먼저 호출되고 notify가 호출이 된다해도 조건을 만족하지 않으면 깨어나지 않는다.

주의할 점은 서술 함수와 notify는 별개의 조건이다. 즉 서술 함수도 만족하고 notify도 호출되어야 한다. notify가 호출 되지 않았다면 조건이 참이더라도 waiting 상태로 진입하게 된다. 아마도 notify 호출 이력(?)을 어딘가에 저장하는 것 같다.

 

3. 스레드 종료하기


std::thread의 실행을 멈추는 함수는 따로 존재하지 않는다. 우리가 상황에 맞게 스레드의 종료 조건을 설계하여 종료시켜야 하는데 이는 꽤나 까다로운 문제일 수 있다. 전형적인 방법 한 가지를 살펴보자. 바로 bool 변수를 이용하는 것이다. 예제를 보면 다음과 같다.

bool 변수 이용

mutex g_mutex;
bool bEnded = false; 

int main()
{
    thread th1([&]() {
        while(bEnded == false) // true이면 스레드 실행 종료
        {
            this_thread::sleep_for(chrono::milliseconds(2000));
            cout << "> Hello, I'm a child thread \n";
        }

        cout << "> A child thread is stopped. \n";
    });

    while(true)
    {
        string line;
        getline(cin, line);

        if(line == "q")
        {
            bEnded = true;
            th1.join();
            break;
        }
    }

    return 0;
}

위 프로그램은 th1 스레드가 2초마다 문장을 출력한다. 만약 메인 스레드에서 입력한 값이 'q'라면 bool 변수를 true로 변경하여 th1 스레드가 루프를 벗어나 종료하도록 한다. 그와 동시에 메인 스레드는 반복문을 탈출하여 프로그램을 종료한다. join을 사용하기 때문에 자식 스레드가 종료될 때까지 기다린다. 자식 스레드의 실행 도중에 강제로 종료하는 것(TerminateThread())은 금지되어 있다. 즉 실행 중인 스레드를 강제로 종료하지 못한다. 루프를 돌면서 조건을 만족하지 않으면 루프를 빠져나오거나 detach를 통해 자식 스레드를 떼어내고 메인 스레드를 종료시키던지 해야한다.




댓글