본문 바로가기

IT

ConcurrentLinkedQueue

728x90
반응형

출처 : https://juejin.cn/post/6844903906288336909

소스 코드를 읽고 실행 과정에 따라 분석하기 전에 결론을 내립니다. tail은 반드시 대상의 진정한 끝점을 가리키지 않는다., 우리는 분석 후에 이 기능을 찾을 것입니다.

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}
public boolean offer(E e) {
    //(1)e가 null인 경우 null 포인터 예외를 던집니다.
    checkNotNull(e);
    //(2)새 Node 노드를 생성하면 Unsafe 클래스의 putObject 메소드가 Node 생성자에서 호출됩니다.
    final Node<E> newNode = new Node<E>(e);
    //(3)꼬리 노드에서 새 노드 삽입
    for (Node<E> t = tail, p = t;;) {
        //q는 꼬리 노드의 다음 노드이지만 멀티 스레딩에서 다른 스레드가 꼬리 노드를 수정하면 이 스레드에서 p!=null을 볼 수 있습니다(후자의 CAS가 수행)
        Node<E> q = p.next;
        //(4)q가 null이면 p가 이제 꼬리 노드임을 의미하므로 더하기를 수행할 수 있습니다.
        if (q == null) {
            //(5)여기서 cas는 p 노드의 다음 노드를 newNode로 설정하는 데 사용됩니다.
            //(null을 전달하고 p의 다음이 null인지 비교하고 null이면 newNode 옆에 설정)
            if (p.casNext(null, newNode)) {
                //(6)다음은 꼬리 노드를 업데이트하는 코드입니다.
                //CAS가 성공적으로 실행된 후 p(원래 연결 목록의 꼬리) 노드의 다음 노드는 이미 newNode입니다. 여기서 꼬리 노드는 newNode로 설정됩니다.
                if (p != t) // hop two nodes at a time
                    // p가 t와 같지 않으면 다른 스레드가 꼬리를 먼저 업데이트함을 의미합니다.
                    // q==null 분기로 이동하지 않습니다.
                    // p에 의해 얻은 값은 t 이후의 값일 수 있습니다.
                    // 꼬리 원자를 새 노드로 업데이트
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        //제거된 경우
        else if (p == q)
            //(7)다중 스레드 동작시 다른 스레드가 poll 방식을 사용하여 요소를 제거한 후 head의 next를 head로 바꿀 수 있으므로 새로운 head를 찾아야 합니다: 여기서는 뒤에 poll 방식의 설명도표를 참고하여 이해하십시오.
            p = (t != (t = tail)) ? t : head;
        else
            // (8)쿼리 끝 노드
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

 

​ 위는 제안 방법의 구현 및 주석이며 여기서는 단일 스레드 실행과 다중 스레드 실행의 두 가지 경우로 나누고 위의 소스 코드 구현에 따라 전체 프로세스를 단계별로 분석합니다. 먼저 단일 스레드 실행 프로세스에 대해 설명합니다.

단일 스레드 실행

​ 싱글 쓰레드 환경에서 실행한 후, 그 방법에 따라 단계별로 직접 판단을 실행할 수 있습니다.

  1. 먼저 스레드가 제공 메서드를 호출할 때 코드(1)에서 null이 아닌 검사가 수행되고 null에 대해 예외가 throw되고 null에 대해 실행되지 않습니다.(2)
  2. 코드(2) Node<E> newNode = new Node<E>(e)는 item을 생성자의 매개변수로 사용하여 새 노드를 생성합니다.
  3. 코드 (3) for (Node<E> t = tail, p = t;;)은 대기열의 끝 부분에서 스핀 주기를 시작하여 대기열의 끝 부분에서 새 노드가 추가되도록 합니다.
  4. 획득 tail한 next노드( q), 이때 큐 상황은 다음 그림( 기본 구성 방법에서 head 와 tail 을 모두 가리키는 것은 null 인 item 의 노드입니다)과 같다. 이때 qnull을 가리킨다.

5. 코드 (4) if (q == null)의 실행 판단 q==null이 참

6. 코드 (5) if (p.casNext(null, newNode))에서의 실행은 pCAS에 의해 생성된 다음 노드를 업데이트하는 것 newNode입니다. (CAS는 p의 다음이 null인지 여부를 결정하고 null인 경우에만 업데이트됩니다 newNode)

7. 이때 p==t, tail을 업데이트하는 코드 블록(6)은 실행되지 않고 casTail(t, newNode), 제공 메소드에서 종료됩니다. 이때 대기열 상황은 다음과 같습니다.


8. 그런 다음 이 스레드가 실행되지만 꼬리는 변경되지 않았습니다. 실제로 두 번째 제안이 있을 때 p=tail,p.next!=null코드 (8)이 실행 p = (p != t && t != (t = tail)) ? t : q되고 간략하게 분석 됩니다.

  • p != t  : p는 꼬리, t는 꼬리이므로false
  • t != (t = tail)  : 분명히 거짓
 

9. 그래서 결과는 p=q가 되고 다음 싸이클을 수행한 후 판정 p.next이 null이 되므로 CAS가 성공할 수 있고 그로 인해 p!=t tail 노드가 업데이트됩니다.

​ 그래서 위에서 주어진 결론이 여기에 반영됩니다. 즉 tail은 항상 행렬의 끝점을 가리키지 않는다. 실제로 꼬리 노드를 다른 방법으로 꼬리 지점으로 만들 수 있습니다. 즉, 다음과 같습니다.

if (e == null)
    throw new NullPointerException();
Node<E> n = new Node<E>(e);
for (;;) {
    Node<E> t = tail;
    if (t.casNext(null, n) && casTail(t, n)) {
        return true;
    }
}


​ 하지만 enqueue 작업이 많은 경우 꼬리가 가리키는 노드를 CAS 모드에서 매번 업데이트해야 하므로 데이터 양이 많을 때 성능에 큰 영향을 미칩니다. 따라서 최종 구현에서는 CAS 연산을 줄임으로써 많은 수의 enqueue 연산의 성능을 향상시킨다. (그러나 거리가 더 길다. long의 부정적인 효과는 루프 본문이 필요하기 때문에 대기열에 넣을 때마다 꼬리 노드를 찾는 데 시간이 더 오래 걸린다는 것입니다 한 번 더 순환하여 테일 노드를 지정합니다. (진짜 테일 노드를 가리킨 다음 newNode를 추가합니다). 사실 앞서 멤버 속성을 분석했을 때 tail이 volatile에 의해 수정되고 CAS 방식은 본질적으로 volatile 변수에 대한 읽기 및 쓰기 작업이며 volatile의 쓰기 작업 비용이 읽기 작업의 비용보다 크다는 것도 알고 있었습니다. 그래서 Concurrent Linked Queue는 온라인 상에서 volatile 변수에 대한 읽기 동작 횟수를 증가시켜 상대적으로 쓰기 동작을 줄인다. 다음은 단일 스레드가 제공 메서드를 실행할 때 꼬리가 가리키는 변경 사항의 개략도입니다.


다중 스레드 실행

​ 위에서 설명한 단일 스레드의 실행이 다중 스레드 환경에서 실행될 때 일어나는 일, 여기에서는 두 개의 스레드가 동시에 실행된다고 가정합니다.

 

사례 1

​, 여기서 분석된 것은 사실 여러 스레드가 CAS 업데이트 p.next 노드에 수행된다고 가정하는 코드이다. threadA가 offer(item1)를 호출하고, threadB가 offer(item2)를 호출하여 해당 p.casNext(null, newNode)위치 로 실행된다고 가정하고 아래를 살펴보겠습니다.

  • threadA가 위의 코드 줄을 먼저 실행하고 성공적으로 업데이트한다고 가정하는 CAS 작업의 원자성 p.next는 newNode입니다.
  • 이때, threadB는 CAS 비교( p.next!=null)에서 자연히 실패하므로 다음 루프에서 tail 노드를 재획득한 후 업데이트를 시도하게 된다.

이때 대기열은 다음과 같습니다.


  • threadB가 테일 노드를 얻은 후 q!=null( q=p.next,p=tail)
  • p==q너무 판단을 계속 false하므로 코드를 실행하십시오 (8)
  • p = (p != t && t != (t = tail)) ? t : q이 코드 분석
    1. p != t: p는 꼬리, t는 꼬리이므로false
    2. t != (t = tail): 분명히 거짓
    3. 따라서 위의 삼항 연산의 결과는 p=q다음 그림과 같이 입니다.

  • 그런 다음 루프를 다시 실행합니다. 이번에 p.next는 null이므로 코드 (5)를 실행할 수 있습니다 p.casNext(null,newNode). 이 때 CAS가 판단하여 p.next == null설정할 수 있습니다.p.next=Node(item2)
  • CAS가 성공하면 p!=t위 그림과 같이 판단하므로 tail을 Node(item2)로 설정할 수 있습니다. 그런 다음 제안을 종료합니다. 이때 대기열 상황은 다음과 같습니다.
두 스레드 가 사례1 모두 처음에 얻어지면 p=tail,p.next=null둘 다 CAS를 실행하고 추가를 시도 newNode하지만 첫 번째 루프에서 한 스레드만 성공적으로 추가하고 true()를 반환할 수 (하지만 이 때 꼬리는 변하지 않았으며 싱글 쓰레드 요약과 마찬가지로 꼬리와 실제 꼬리 노드의 차이는 1 또는 0입니다.)있으므로 다른 스레드는 다시 시도합니다. 두 번째 루프, 이번에는 p의 포인트, 즉 p = (p != t && t != (t = tail)) ? t : q코드를 변경합니다. 그러면 세 번째 사이클에서 CAS를 성공적으로 추가할 수 있습니다(물론 여기서는 가상의 2개 스레드 상황을 분석하고 있습니다. 실제 멀티스레딩 환경은 확실히 더 복잡하지만 논리는 여전히 유사합니다)


사례 2

​ 여기서 분석은 주로 p = (p != t && t != (t = tail)) ? t : q코드의 또 다른 상황, p=t즉 상황입니다.

  • p != t사실이다,
  • t != (t = tail) : 또한 true (왼쪽의 t는 재순환 시작 시 얻은 tail을 가리키는 정보이며, 그 꼬리를 검색하여 괄호 안의 t에 대입한다. 이때, 다른 volatile스레드가 변경했을 가능성이 있습니다. tail)

​ 그러면 결과는 p가 큐의 꼬리 노드 꼬리를 다시 가리키게 됩니다. 이런 상황을 상상해 봅시다.


 

사실 이것은 중복 루프를 피하기 위해 , volatile의 가시성를 사용하는 것입니다. 현재 큐의 꼬리 노드에 요소를 추가하려는 스레드를 빠르게 찾습니다. 그림과 같이 이 시점에서 threadA가 변수 tail을 읽고 threadB가 이 때 여러 개의 Node를 추가했다고 가정하면 이 시점에서 tail 포인터를 수정한 다음 이 시점에서 스레드 A가 t=tail을 다시 실행할 때 , t는 다른 노드를 가리키므로 threadA t가 가리키는 노드는 전후에 두 번 읽은 변수가 다른데, t != (t = tail)이것이 참이고, t p != t가 가리키는 노드의 변경도 참이므로 이 실행 결과 코드 행은 p와 t의 최신 t 포인터가 동일한 노드를 가리키고 t가 이때 큐의 실제 꼬리 노드이기도 합니다. 그런 다음 큐의 실제 꼬리 노드를 찾았으므로 제안 작업을 수행할 수 있습니다.


사례 3

​ 위에서 멀티 스레딩으로 요소를 추가하는 작업에 대해 설명했으므로 두 스레드가 모두 poll 메서드를 제공하고 호출할 때 제안 메서드의 코드 블록(7)이 여기에서 호출됩니다. poll 방식은 아직 언급하지 않았기 때문에 여기서는 먼저 코드를 설명하지 않겠습니다. poll 방식이 여러 쓰레드에서 실행될 경우에는 offer-poll-offer로 설명을 하고 그 다음에는 offer 방식을 실행할 수도 있습니다. 코드 라인.

else if (p == q) //(7)다중 스레드 동작 시 poll 방식으로 요소를 제거한 후 head의 next를 head로 바꿀 수 있으므로 새로운 head를 찾아야 합니다.
 p = (t != (t = tail)) ? t : head;


 

Poll 방법

poll 방식은 queue의 선두에 있는 element를 가져와서 제거하는 방식으로 queue가 비어 있으면 null을 반환한다. 먼저 poll 방식의 소스코드를 보고 single thread와 multi-thread에서 실행을 분석해보자. 따로 실.

public E poll() {
    //기호
    restartFromHead:
    for (;;) {//스핀 사이클
        for (Node<E> h = head, p = h, q;;) {
            //(1)현재 노드의 item 저장
            E item = p.item;
            //(2)만약 현재 노드의 값이 null이 아니라면, null로 바꿉니다.
            if (item != null && p.casItem(item, null)) {
                //(3)CAS가 성공하면 현재 노드를 표시하고 체인 테이블에서 제거합니다.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //(4)대기열이 비어 있으면 null을 반환합니다.
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //(5)현재 노드가 참조되어 있으면 새 큐 헤더 노드를 찾습니다.
            else if (p == q)
                continue restartFromHead;
            else
                p = q; //다음 루프로 이동하여 p가 가리키는 위치를 변경합니다.
        }
    }
}
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

 

우리는 위에서 poll 메소드의 소스 코드를 보았고, 이제 이 메소드의 구현에 따라 예시로 이해하게 될 것이다.

단일 스레드 실행

폴 작업은 큐의 헤드에서 요소를 가져오는 것이므로 다음과 같습니다.

  • 헤드 노드에서 루프를 시작하고 먼저 for (Node<E> h = head, p = h, q;;)현재 큐의 헤드 노드를 가져옵니다. 물론 처음에 큐가 비어 있으면 다음과 같이 됩니다.

헤드 노드는 센티넬 노드로 존재하기 때문에 코드 (4)를 실행하게 되는데 else if ((q = p.next) == null), 큐가 비어있기 때문에 직접 실행하게 되며 updateHead(h, p), updateHead메소드에서 판단되면 h=pnull을 직접 리턴하게 된다.

  • 위는 queue가 비어있는 경우이고, queue가 비어있지 않을 때 현재 queue 상황은 다음과 같다고 가정합니다.
  • 따라서 코드 (4)의 판단 결과 else if ((q = p.next) == null)는 거짓이고,
  • 따라서 다음 판단이 실행 else if (p == q)되고 판단 결과는 여전히 거짓입니다.
  • p=q다음 순환 대기열 상태가 완료된 후 마지막 실행 은 다음과 같습니다.

 
새로운 주기에서는 item!=null이라고 판단할 수 있으므로 CAS 메서드를 사용하여 item을 null로 설정합니다(이는 단일 스레드의 경우 테스트). 따라서 계속 실행 if(p!=h)하고 판단 결과는 참입니다. . 따라서 if: 의 내용을 실행합니다. updateHead(h, ((q = p.next) != null) ? q : p)이것이 의미하는 바는 무엇입니까? 아래와 같이 결과는 q=null이므로 들어오는 매개변수는 p입니다(p가 가리키는 위치는 위 그림에 표시됨)


//updateHead메서드의 매개변수(Node h,Node p)
q = p.next;
if(null != q) {
//두 번째 매개변수는 q입니다.
} else {
    //두 번째 매개변수는 p
}

그런 다음 updateHead 메서드를 실행합니다. 여기서 메서드의 세부 정보를 다시 살펴봐야 합니다.

final void updateHead(Node<E> h, Node<E> p) {
    //만약에! =p, CAS 방식으로 헤드 노드를 p로 설정
    if (h != p && casHead(h, p))
        //다음은 h 노드의 다음 노드를 자신(h)으로 설정하는 것입니다.
        h.lazySetNext(h);
}
//Node 클래스의 메소드
void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}

 

따라서 이를 실행한 후 다음 그림과 같이 대기열의 상태는 무엇입니까? 실행이 완료되면 제거된 item1 요소를 반환합니다.

offer, poll의 다중 스레드 실행

​ 위는 단일 스레드에서 poll 메소드를 호출하는 실행 과정을 분석한 것입니다. 사실 청약 방식이 도입된 지 얼마 되지 않은 시점에서 아직 풀리지 않은 함정이 있다. 아래 설명된 상황

  • 원래 대기열에 item1 요소가 있다고 가정합니다.

  • thread1이 offer 메소드를 호출할 때 다른 스레드가 poll 메소드를 호출하여 head 노드를 제거한다고 가정하면, 위의 분석에 따르면 poll 메소드가 호출된 후의 큐는 다음과 같습니다.
  •  

 

(여기서 offer의 실행 흐름을 상기하자) 따라서 thread1이 계속 실행되면 for (Node<E> t = tail, p = t;;)위 그림과 같이 tail이 가리키는 위치를 얻게 되지만, tail이 가리키는 노드의 다음 포인터가 가리키는 위치는 여전히 자기 자신이다. . 따라서 Node<E> q = p.next실행 후 q=tail=p. 따라서 offer 방법에서 다음과 같은 판단이 수행됩니다.

else if (p == q)
    //(7)다중 스레드 동작 시 poll 방식으로 요소를 제거한 후 head의 next를 head로 바꿀 수 있으므로 새로운 head를 찾아야 합니다.
    p = (t != (t = tail)) ? t : head;

p = (t != (t = tail)) ? t : head또는 아래와 같이 이 문장을 간단히 분석 합니다. 간단한 분석을 통해 poll 메소드가 호출된 후 p가 새로운 헤드 노드를 가리키고(위 그림의 헤드 노드), 오퍼를 호출한 스레드가 정상적으로 노드를 추가할 수 있다는 결론을 내릴 수 있다. 여전히 위와 동일합니다. (그러면 꼬리가 큐 노드의 끝을 가리키는 시점은 언제일까? 실제로 제안 메소드를 호출하여 요소를 추가 한 후 꼬리가 가리키는 위치가 이후에 업데이트된다고 p.casNext(null, newNode)판단 한다)p != t


//맨 처음에 얻은 t=tail
t=tail; //for 중간값 t
//...... offer의 다른 코드
if(t != (t = tail)) { //이것은 여전히 같습니다: tail은 volatile이므로 tail 변수를 다시 읽어옵니다.
    p = t; //즉, 꼬리 노드는 변경되지 않은 상태로 유지됩니다(위 그림의 폴링을 수행한 후 상황에 따르면 꼬리를 가리키는 위치는 변경되지 않았으므로 p는 t에 할당되지 않습니다).
} else {
    p = head; //이때 헤드가 가리키는 새 헤드 노드에 유의하십시오.
}

 

poll, poll의 다중 스레드 실행

​ 그렇게 많이 분석한 결과, offer 방식과 마찬가지로 poll에 아직 분석되지 않은 코드가 하나 있다는 것을 알게 되었기 때문에 다이어그램을 통해 분석하고 이 코드 프레임워크를 먼저 살펴보겠습니다.


//기호
restartFromHead:
for (;;) {//스핀 사이클
    for (Node<E> h = head, p = h, q;;) {
        //...other code
        //이것은 스핀 순환체 중의 하나이다.
        else if (p == q)
            continue restartFromHead;
    }
}

또는 두 개의 스레드가 이제 poll 메서드를 실행한다고 가정합니다.

  • 초기 경우의 대기열 상태는 다음과 같습니다.

 

  • if (item != null && p.casItem(item, null))다음 그림과 같이 threadA가 poll 메서드를 실행하고 이 블록을 성공적으로 실행하고 item1을 null로 설정했다고 가정합니다.

  • 그러나 threadA는 updateHead 메소드를 실행하지 않았고, 이때 threadB가 poll을 수행한 후 p는 아래 그림과 같이 위 그림의 head를 가리킨다.

 

이후 threadA는 updateHead 메소드를 실행하여 헤드의 포인트를 갱신하고, 원래 헤드의 다음 노드를 자신을 가리키게 하고, B 쓰레드가 실행 q=p.next되어 자연스럽게 같은 p==q결과가 나오므로 이 때 해당 헤드로 점프해야 한다. 최신 헤드를 다시 가져오기 위한 외부 루프. 노드, 그리고 계속 실행

 

poll 방법 요약

poll 메서드는 head 요소를 제거할 때 CAS 연산을 사용하여 head 노드의 항목을 null로 설정한 다음 플러시하고 head 노드의 포인팅 위치를 설정하여 queue 요소를 삭제하는 효과를 얻습니다. 이때 원래 헤드 센티넬 노드는 격리된 노드이며 재활용됩니다. 물론 스레드가 poll 메서드를 실행하고 헤드 노드가 수정되었음을 발견하면(위에서 언급한 경우), 새 노드를 다시 얻기 위해 가장 바깥쪽 루프로 점프해야 합니다.

728x90
반응형