mirror of
				https://github.com/ton-blockchain/ton
				synced 2025-03-09 15:40:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			684 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			684 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
/*
 | 
						|
    This file is part of KittenDB-Engine Library.
 | 
						|
 | 
						|
    KittenDB-Engine Library is free software: you can redistribute it and/or modify
 | 
						|
    it under the terms of the GNU Lesser General Public License as published by
 | 
						|
    the Free Software Foundation, either version 2 of the License, or
 | 
						|
    (at your option) any later version.
 | 
						|
 | 
						|
    KittenDB-Engine Library is distributed in the hope that it will be useful,
 | 
						|
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
    GNU Lesser General Public License for more details.
 | 
						|
 | 
						|
    You should have received a copy of the GNU Lesser General Public License
 | 
						|
    along with KittenDB-Engine Library.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
    Copyright 2014-2016 Telegraph Inc
 | 
						|
              2014-2016 Nikolai Durov
 | 
						|
              2014      Andrey Lopatin
 | 
						|
*/
 | 
						|
 | 
						|
char disable_linker_warning_about_empty_file_mp_queue_cpp;
 | 
						|
 | 
						|
#ifdef TG_LCR_QUEUE
 | 
						|
#include <assert.h>
 | 
						|
#include <errno.h>
 | 
						|
#include <pthread.h>
 | 
						|
#include <signal.h>
 | 
						|
#include <stddef.h>
 | 
						|
#include <stdio.h>
 | 
						|
#include <stdlib.h>
 | 
						|
#include <string.h>
 | 
						|
#include <time.h>
 | 
						|
#include <unistd.h>
 | 
						|
 | 
						|
#include <linux/futex.h>
 | 
						|
#include <sys/syscall.h>
 | 
						|
 | 
						|
#include "mp-queue.h"
 | 
						|
 | 
						|
#undef assert
 | 
						|
#ifndef assert
 | 
						|
#define assert(x) x
 | 
						|
#endif
 | 
						|
 | 
						|
volatile int mpq_blocks_allocated, mpq_blocks_allocated_max, mpq_blocks_allocations, mpq_blocks_true_allocations,
 | 
						|
    mpq_blocks_wasted, mpq_blocks_prepared;
 | 
						|
volatile int mpq_small_blocks_allocated, mpq_small_blocks_allocated_max;
 | 
						|
 | 
						|
__thread int mpq_this_thread_id;
 | 
						|
__thread void **thread_hazard_pointers;
 | 
						|
volatile int mpq_threads;
 | 
						|
 | 
						|
struct mp_queue MqGarbageBlocks, MqPreparedBlocks;
 | 
						|
struct mp_queue MqGarbageSmallBlocks, MqPreparedSmallBlocks;
 | 
						|
 | 
						|
static inline void barrier(void) {
 | 
						|
  asm volatile("" : : : "memory");
 | 
						|
}
 | 
						|
static inline void mfence(void) {
 | 
						|
  asm volatile("mfence" : : : "memory");
 | 
						|
}
 | 
						|
 | 
						|
/* hazard pointers, one per thread */
 | 
						|
 | 
						|
void *mqb_hazard_ptr[MAX_MPQ_THREADS][THREAD_HPTRS] __attribute__((aligned(64)));
 | 
						|
 | 
						|
int is_hazard_ptr(void *ptr, int a, int b) {
 | 
						|
  barrier();
 | 
						|
  int k = mpq_threads, q = mpq_this_thread_id;
 | 
						|
  barrier();
 | 
						|
  int i, j, r = 0;
 | 
						|
  for (j = a; j <= b; j++) {
 | 
						|
    if (mqb_hazard_ptr[q][j] == ptr) {
 | 
						|
      r = 1;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  for (i = 1; i <= k; i++) {
 | 
						|
    if (i == q) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    for (j = a; j <= b; j++) {
 | 
						|
      if (mqb_hazard_ptr[i][j] == ptr) {
 | 
						|
        barrier();
 | 
						|
        return r + 2;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  barrier();
 | 
						|
  return r;
 | 
						|
}
 | 
						|
 | 
						|
void clear_thread_ids() {
 | 
						|
  mpq_threads = 0;
 | 
						|
  mpq_this_thread_id = 0;
 | 
						|
}
 | 
						|
/* initialize this thread id and return it */
 | 
						|
int get_this_thread_id(void) {
 | 
						|
  int i = mpq_this_thread_id;
 | 
						|
  if (i) {
 | 
						|
    return i;
 | 
						|
  }
 | 
						|
  i = __sync_fetch_and_add(&mpq_threads, 1) + 1;
 | 
						|
  assert(i > 0 && i < MAX_MPQ_THREADS);
 | 
						|
  thread_hazard_pointers = mqb_hazard_ptr[i];
 | 
						|
  return mpq_this_thread_id = i;
 | 
						|
}
 | 
						|
 | 
						|
/* custom semaphore implementation using futexes */
 | 
						|
 | 
						|
int mp_sem_post(mp_sem_t *sem) {
 | 
						|
  __sync_fetch_and_add(&sem->value, 1);
 | 
						|
  if (sem->waiting > 0) {
 | 
						|
    syscall(__NR_futex, &sem->value, FUTEX_WAKE, 1, NULL, 0, 0);
 | 
						|
  }
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int mp_sem_wait(mp_sem_t *sem) {
 | 
						|
  int v = sem->value, q = 0;
 | 
						|
  while (1) {
 | 
						|
    if (v > 0) {
 | 
						|
      v = __sync_fetch_and_add(&sem->value, -1);
 | 
						|
      if (v > 0) {
 | 
						|
        return 0;
 | 
						|
      }
 | 
						|
      v = __sync_add_and_fetch(&sem->value, 1);
 | 
						|
    } else {
 | 
						|
      if (v < 0 && q++ < 10) {
 | 
						|
        barrier();
 | 
						|
        v = sem->value;
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
      __sync_fetch_and_add(&sem->waiting, 1);
 | 
						|
      syscall(__NR_futex, &sem->value, FUTEX_WAIT, v, NULL, 0, 0);
 | 
						|
      __sync_fetch_and_add(&sem->waiting, -1);
 | 
						|
      v = sem->value;
 | 
						|
      q = 0;
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
int mp_sem_trywait(mp_sem_t *sem) {
 | 
						|
  int v = sem->value;
 | 
						|
  if (v > 0) {
 | 
						|
    v = __sync_fetch_and_add(&sem->value, -1);
 | 
						|
    if (v > 0) {
 | 
						|
      return 0;
 | 
						|
    }
 | 
						|
    __sync_fetch_and_add(&sem->value, 1);
 | 
						|
  }
 | 
						|
  return -1;
 | 
						|
}
 | 
						|
 | 
						|
/* functions for one mp_queue_block */
 | 
						|
 | 
						|
// may invoke mpq_pop()/mpq_push() if allow_recursion=1
 | 
						|
struct mp_queue_block *alloc_mpq_block(mqn_value_t first_val, int allow_recursion, int is_small) {
 | 
						|
  is_small = 0;
 | 
						|
  struct mp_queue_block *QB = 0;
 | 
						|
  int prepared = 0, align_bytes = 0;
 | 
						|
  long size = (is_small ? MPQ_SMALL_BLOCK_SIZE : MPQ_BLOCK_SIZE);
 | 
						|
  if (allow_recursion) {
 | 
						|
    QB = mpq_pop(is_small ? &MqGarbageSmallBlocks : &MqGarbageBlocks, MPQF_RECURSIVE);
 | 
						|
    if (QB) {
 | 
						|
      if (!is_hazard_ptr(QB, 0, 2)) {
 | 
						|
        // reclaiming garbage
 | 
						|
        assert(QB->mqb_magic == MQ_BLOCK_GARBAGE_MAGIC);
 | 
						|
        __sync_fetch_and_add(&mpq_blocks_wasted, -1);
 | 
						|
        align_bytes = QB->mqb_align_bytes;
 | 
						|
      } else {
 | 
						|
        mpq_push(is_small ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB, MPQF_RECURSIVE);
 | 
						|
        QB = 0;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    if (!QB) {
 | 
						|
      QB = mpq_pop(is_small ? &MqPreparedSmallBlocks : &MqPreparedBlocks, MPQF_RECURSIVE);
 | 
						|
      if (QB) {
 | 
						|
        assert(QB->mqb_magic == MQ_BLOCK_PREPARED_MAGIC);
 | 
						|
        prepared = 1;
 | 
						|
        __sync_fetch_and_add(&mpq_blocks_prepared, -1);
 | 
						|
        align_bytes = QB->mqb_align_bytes;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  if (!QB) {
 | 
						|
    char *new_block = malloc(offsetof(struct mp_queue_block, mqb_nodes) + size * (2 * sizeof(void *)) +
 | 
						|
                             MPQ_BLOCK_ALIGNMENT - sizeof(void *));
 | 
						|
    assert(new_block);
 | 
						|
    assert(!((long)new_block & (sizeof(void *) - 1)));
 | 
						|
    align_bytes = -(int)(long)new_block & (MPQ_BLOCK_ALIGNMENT - 1);
 | 
						|
    QB = (struct mp_queue_block *)(new_block + align_bytes);
 | 
						|
 | 
						|
    __sync_fetch_and_add(&mpq_blocks_true_allocations, 1);
 | 
						|
    if (is_small) {
 | 
						|
      int t = __sync_fetch_and_add(&mpq_small_blocks_allocated, 1);
 | 
						|
      if (t >= mpq_small_blocks_allocated_max) {
 | 
						|
        __sync_bool_compare_and_swap(&mpq_small_blocks_allocated_max, mpq_small_blocks_allocated_max, t + 1);
 | 
						|
      }
 | 
						|
    } else {
 | 
						|
      int t = __sync_fetch_and_add(&mpq_blocks_allocated, 1);
 | 
						|
      if (t >= mpq_blocks_allocated_max) {
 | 
						|
        __sync_bool_compare_and_swap(&mpq_blocks_allocated_max, mpq_blocks_allocated_max, t + 1);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  } else {
 | 
						|
    assert(QB->mqb_size == size);
 | 
						|
  }
 | 
						|
  __sync_fetch_and_add(&mpq_blocks_allocations, 1);
 | 
						|
 | 
						|
  memset(QB, 0, offsetof(struct mp_queue_block, mqb_nodes));
 | 
						|
  QB->mqb_align_bytes = align_bytes;
 | 
						|
  QB->mqb_size = size;
 | 
						|
 | 
						|
  QB->mqb_nodes[0].idx = MQN_SAFE;
 | 
						|
  QB->mqb_nodes[0].val = first_val;
 | 
						|
 | 
						|
  if (!prepared) {
 | 
						|
    long i;
 | 
						|
    for (i = 1; i < size; i++) {
 | 
						|
      QB->mqb_nodes[i].idx = MQN_SAFE + i;
 | 
						|
      QB->mqb_nodes[i].val = 0;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (first_val) {
 | 
						|
    QB->mqb_tail = 1;
 | 
						|
  }
 | 
						|
 | 
						|
  QB->mqb_magic = MQ_BLOCK_USED_MAGIC;
 | 
						|
  return QB;
 | 
						|
}
 | 
						|
 | 
						|
void free_mpq_block(struct mp_queue_block *QB) {
 | 
						|
  assert(QB->mqb_magic == MQ_BLOCK_USED_MAGIC);
 | 
						|
  assert((unsigned)QB->mqb_align_bytes < MPQ_BLOCK_ALIGNMENT && !(QB->mqb_align_bytes & (sizeof(void *) - 1)));
 | 
						|
  QB->mqb_magic = MQ_BLOCK_FREE_MAGIC;
 | 
						|
  if (QB->mqb_size == MPQ_SMALL_BLOCK_SIZE) {
 | 
						|
    __sync_fetch_and_add(&mpq_small_blocks_allocated, -1);
 | 
						|
  } else {
 | 
						|
    assert(QB->mqb_size == MPQ_BLOCK_SIZE);
 | 
						|
    __sync_fetch_and_add(&mpq_blocks_allocated, -1);
 | 
						|
  }
 | 
						|
  free((char *)QB - QB->mqb_align_bytes);
 | 
						|
}
 | 
						|
 | 
						|
static inline void mpq_fix_state(struct mp_queue_block *QB) {
 | 
						|
  long h, t;
 | 
						|
  while (1) {
 | 
						|
    barrier();
 | 
						|
    t = QB->mqb_tail;
 | 
						|
    barrier();
 | 
						|
    h = QB->mqb_head;
 | 
						|
    barrier();
 | 
						|
    if ((unsigned long)h <= (unsigned long)t) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    if (QB->mqb_tail != t) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    // here tail < head ; try to advance tail to head
 | 
						|
    // (or to some value h such that tail < h <= head)
 | 
						|
    if (__sync_bool_compare_and_swap(&QB->mqb_tail, t, h)) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
mqn_value_t mpq_block_pop(struct mp_queue_block *QB) {
 | 
						|
  // fprintf (stderr, "%d:mpq_block_pop(%p)\n", mpq_this_thread_id, QB);
 | 
						|
  long size = QB->mqb_size;
 | 
						|
  while (1) {
 | 
						|
    long h = __sync_fetch_and_add(&QB->mqb_head, 1);
 | 
						|
    // fprintf (stderr, "%d:  mpq_block_pop(%ld)\n", mpq_this_thread_id, h);
 | 
						|
    mpq_node_t *node = &QB->mqb_nodes[h & (size - 1)];
 | 
						|
    while (1) {
 | 
						|
      mpq_node_t d, e;
 | 
						|
      barrier();
 | 
						|
      mqn_value_t val = node->val;
 | 
						|
      barrier();
 | 
						|
      long safe_idx = node->idx;
 | 
						|
      barrier();
 | 
						|
      long idx = safe_idx & MQN_IDX_MASK;
 | 
						|
      if (idx > h) {
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      d.val = val;
 | 
						|
      d.idx = safe_idx;
 | 
						|
      if (val) {
 | 
						|
        if (idx == h) {
 | 
						|
          e.idx = safe_idx + size;
 | 
						|
          e.val = 0;
 | 
						|
          if (__sync_bool_compare_and_swap(&node->pair, d.pair, e.pair)) {
 | 
						|
            // fprintf (stderr, "%d:  mpq_block_pop(%ld) -> %lx\n", mpq_this_thread_id, h, (long) val);
 | 
						|
            return val;
 | 
						|
          }
 | 
						|
        } else {
 | 
						|
          e.val = val;
 | 
						|
          e.idx = idx;  // clear 'safe' flag
 | 
						|
          if (__sync_bool_compare_and_swap(&node->pair, d.pair, e.pair)) {
 | 
						|
            break;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      } else {
 | 
						|
        e.idx = (safe_idx & MQN_SAFE) + h + size;
 | 
						|
        e.val = 0;
 | 
						|
        if (__sync_bool_compare_and_swap(&node->pair, d.pair, e.pair)) {
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      /* somebody changed this element while we were inspecting it, make another loop iteration */
 | 
						|
    }
 | 
						|
    barrier();
 | 
						|
    long t = QB->mqb_tail & MQN_IDX_MASK;
 | 
						|
    barrier();
 | 
						|
    if (t <= h + 1) {
 | 
						|
      mpq_fix_state(QB);
 | 
						|
      return 0;
 | 
						|
    }
 | 
						|
    /* now try again with a new value of h */
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
long mpq_block_push(struct mp_queue_block *QB, mqn_value_t val) {
 | 
						|
  int iterations = 0;
 | 
						|
  long size = QB->mqb_size;
 | 
						|
  // fprintf (stderr, "%d:mpq_block_push(%p)\n", mpq_this_thread_id, QB);
 | 
						|
  while (1) {
 | 
						|
    long t = __sync_fetch_and_add(&QB->mqb_tail, 1);
 | 
						|
    // fprintf (stderr, "%d:  mpq_block_push(%ld)\n", mpq_this_thread_id, t);
 | 
						|
    if (t & MQN_SAFE) {
 | 
						|
      return -1L;  // bad luck
 | 
						|
    }
 | 
						|
    mpq_node_t *node = &QB->mqb_nodes[t & (size - 1)];
 | 
						|
    barrier();
 | 
						|
    mqn_value_t old_val = node->val;
 | 
						|
    barrier();
 | 
						|
    long safe_idx = node->idx;
 | 
						|
    barrier();
 | 
						|
    long idx = safe_idx & MQN_IDX_MASK;
 | 
						|
    if (!old_val && idx <= t && ((safe_idx & MQN_SAFE) || QB->mqb_head <= t)) {
 | 
						|
      mpq_node_t d, e;
 | 
						|
      d.idx = safe_idx;
 | 
						|
      d.val = 0;
 | 
						|
      e.idx = MQN_SAFE + t;
 | 
						|
      e.val = val;
 | 
						|
      if (__sync_bool_compare_and_swap(&node->pair, d.pair, e.pair)) {
 | 
						|
        // fprintf (stderr, "%d:  mpq_block_push(%ld) <- %lx\n", mpq_this_thread_id, t, (long) val);
 | 
						|
        return t;  // pushed OK
 | 
						|
      }
 | 
						|
    }
 | 
						|
    barrier();
 | 
						|
    long h = QB->mqb_head;
 | 
						|
    barrier();
 | 
						|
    if (t - h >= size || ++iterations > 10) {
 | 
						|
      __sync_fetch_and_or(&QB->mqb_tail, MQN_SAFE);  // closing queue
 | 
						|
      return -1L;                                    // bad luck
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
/* functions for mp_queue = list of mp_queue_block's */
 | 
						|
void init_mp_queue(struct mp_queue *MQ) {
 | 
						|
  assert(MQ->mq_magic != MQ_MAGIC && MQ->mq_magic != MQ_MAGIC_SEM);
 | 
						|
  memset(MQ, 0, sizeof(struct mp_queue));
 | 
						|
  MQ->mq_head = MQ->mq_tail = alloc_mpq_block(0, 0, 1);
 | 
						|
  MQ->mq_magic = MQ_MAGIC;
 | 
						|
 | 
						|
  if (!MqGarbageBlocks.mq_magic) {
 | 
						|
    init_mp_queue(&MqGarbageBlocks);
 | 
						|
    init_mp_queue(&MqGarbageSmallBlocks);
 | 
						|
  } else if (!MqPreparedBlocks.mq_magic) {
 | 
						|
    init_mp_queue(&MqPreparedBlocks);
 | 
						|
    init_mp_queue(&MqPreparedSmallBlocks);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void init_mp_queue_w(struct mp_queue *MQ) {
 | 
						|
  init_mp_queue(MQ);
 | 
						|
#if MPQ_USE_POSIX_SEMAPHORES
 | 
						|
  sem_init(&MQ->mq_sem, 0, 0);
 | 
						|
#endif
 | 
						|
  MQ->mq_magic = MQ_MAGIC_SEM;
 | 
						|
}
 | 
						|
 | 
						|
struct mp_queue *alloc_mp_queue(void) {
 | 
						|
  struct mp_queue *MQ = NULL;
 | 
						|
  assert(!posix_memalign((void **)&MQ, 64, sizeof(*MQ)));
 | 
						|
  memset(MQ, 0, sizeof(*MQ));
 | 
						|
  init_mp_queue(MQ);
 | 
						|
  return MQ;
 | 
						|
}
 | 
						|
 | 
						|
struct mp_queue *alloc_mp_queue_w(void) {
 | 
						|
  struct mp_queue *MQ = NULL;
 | 
						|
  assert(!posix_memalign((void **)&MQ, 64, sizeof(*MQ)));
 | 
						|
  memset(MQ, 0, sizeof(*MQ));
 | 
						|
  init_mp_queue_w(MQ);
 | 
						|
  return MQ;
 | 
						|
}
 | 
						|
 | 
						|
/* invoke only if sure that nobody else may be using this mp_queue in parallel */
 | 
						|
void clear_mp_queue(struct mp_queue *MQ) {
 | 
						|
  assert(MQ->mq_magic == MQ_MAGIC || MQ->mq_magic == MQ_MAGIC_SEM);
 | 
						|
  assert(MQ->mq_head && MQ->mq_tail);
 | 
						|
  struct mp_queue_block *QB = MQ->mq_head, *QBN;
 | 
						|
  for (QB = MQ->mq_head; QB; QB = QBN) {
 | 
						|
    QBN = QB->mqb_next;
 | 
						|
    assert(QB->mqb_next || QB == MQ->mq_tail);
 | 
						|
    QB->mqb_next = 0;
 | 
						|
    free_mpq_block(QB);
 | 
						|
  }
 | 
						|
  MQ->mq_head = MQ->mq_tail = 0;
 | 
						|
  MQ->mq_magic = 0;
 | 
						|
}
 | 
						|
 | 
						|
void free_mp_queue(struct mp_queue *MQ) {
 | 
						|
  clear_mp_queue(MQ);
 | 
						|
  free(MQ);
 | 
						|
}
 | 
						|
 | 
						|
// may invoke mpq_push() to discard new empty block
 | 
						|
mqn_value_t mpq_pop(struct mp_queue *MQ, int flags) {
 | 
						|
  void **hptr = &mqb_hazard_ptr[get_this_thread_id()][0];
 | 
						|
  long r = ((flags & MPQF_RECURSIVE) != 0);
 | 
						|
  struct mp_queue_block *QB;
 | 
						|
  mqn_value_t v;
 | 
						|
  while (1) {
 | 
						|
    QB = MQ->mq_head;
 | 
						|
    barrier();
 | 
						|
    hptr[r] = QB;
 | 
						|
    barrier();
 | 
						|
    __sync_synchronize();
 | 
						|
    if (MQ->mq_head != QB) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
 | 
						|
    v = mpq_block_pop(QB);
 | 
						|
    if (v) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    barrier();
 | 
						|
    if (!QB->mqb_next) {
 | 
						|
      QB = 0;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    v = mpq_block_pop(QB);
 | 
						|
    if (v) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    if (__sync_bool_compare_and_swap(&MQ->mq_head, QB, QB->mqb_next)) {
 | 
						|
      // want to free QB here, but this is complicated if somebody else holds a pointer
 | 
						|
      if (is_hazard_ptr(QB, 0, 2) <= 1) {
 | 
						|
        free_mpq_block(QB);
 | 
						|
      } else {
 | 
						|
        __sync_fetch_and_add(&mpq_blocks_wasted, 1);
 | 
						|
        // ... put QB into some GC queue? ...
 | 
						|
        QB->mqb_magic = MQ_BLOCK_GARBAGE_MAGIC;
 | 
						|
        mpq_push(QB->mqb_size == MPQ_SMALL_BLOCK_SIZE ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB,
 | 
						|
                 flags & MPQF_RECURSIVE);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  if (flags & MPQF_STORE_PTR) {
 | 
						|
    hptr[2] = QB;
 | 
						|
  }
 | 
						|
  hptr[r] = 0;
 | 
						|
  return v;
 | 
						|
}
 | 
						|
 | 
						|
/* 1 = definitely empty (for some serialization), 0 = possibly non-empty;
 | 
						|
   may invoke mpq_push() to discard empty block */
 | 
						|
int mpq_is_empty(struct mp_queue *MQ) {
 | 
						|
  void **hptr = &mqb_hazard_ptr[get_this_thread_id()][0];
 | 
						|
  struct mp_queue_block *QB;
 | 
						|
  while (1) {
 | 
						|
    QB = MQ->mq_head;
 | 
						|
    barrier();
 | 
						|
    *hptr = QB;
 | 
						|
    barrier();
 | 
						|
    __sync_synchronize();
 | 
						|
    if (MQ->mq_head != QB) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    barrier();
 | 
						|
    long h = QB->mqb_head;
 | 
						|
    barrier();
 | 
						|
    long t = QB->mqb_tail;
 | 
						|
    barrier();
 | 
						|
    if (!(t & MQN_SAFE)) {
 | 
						|
      *hptr = 0;
 | 
						|
      return t <= h;
 | 
						|
    }
 | 
						|
    t &= MQN_IDX_MASK;
 | 
						|
    if (t > h) {
 | 
						|
      *hptr = 0;
 | 
						|
      return 0;
 | 
						|
    }
 | 
						|
    barrier();
 | 
						|
    if (!QB->mqb_next) {
 | 
						|
      *hptr = 0;
 | 
						|
      return 1;
 | 
						|
    }
 | 
						|
    if (__sync_bool_compare_and_swap(&MQ->mq_head, QB, QB->mqb_next)) {
 | 
						|
      // want to free QB here, but this is complicated if somebody else holds a pointer
 | 
						|
      if (is_hazard_ptr(QB, 0, 2) <= 1) {
 | 
						|
        free_mpq_block(QB);
 | 
						|
      } else {
 | 
						|
        __sync_fetch_and_add(&mpq_blocks_wasted, 1);
 | 
						|
        // ... put QB into some GC queue? ...
 | 
						|
        QB->mqb_magic = MQ_BLOCK_GARBAGE_MAGIC;
 | 
						|
        mpq_push(QB->mqb_size == MPQ_SMALL_BLOCK_SIZE ? &MqGarbageSmallBlocks : &MqGarbageBlocks, QB, 0);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  *hptr = 0;
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
/* may invoke mpq_alloc_block (which recursively invokes mpq_pop)
 | 
						|
   or mpq_push() (without needing to hold hazard pointer) to deal with blocks */
 | 
						|
long mpq_push(struct mp_queue *MQ, mqn_value_t val, int flags) {
 | 
						|
  void **hptr = mqb_hazard_ptr[get_this_thread_id()];
 | 
						|
  long r = ((flags & MPQF_RECURSIVE) != 0);
 | 
						|
  while (1) {
 | 
						|
    struct mp_queue_block *QB = MQ->mq_tail;
 | 
						|
    barrier();
 | 
						|
    hptr[r] = QB;
 | 
						|
    barrier();
 | 
						|
    __sync_synchronize();
 | 
						|
    if (MQ->mq_tail != QB) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
 | 
						|
    if (QB->mqb_next) {
 | 
						|
      __sync_bool_compare_and_swap(&MQ->mq_tail, QB, QB->mqb_next);
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    long pos = mpq_block_push(QB, val);
 | 
						|
    if (pos >= 0) {
 | 
						|
      if (flags & MPQF_STORE_PTR) {
 | 
						|
        hptr[2] = QB;
 | 
						|
      }
 | 
						|
      hptr[r] = 0;
 | 
						|
      return pos;
 | 
						|
    }
 | 
						|
#define DBG(c)  // fprintf (stderr, "[%d] pushing %lx to %p,%p: %c\n", mpq_this_thread_id, (long) val, MQ, QB, c);
 | 
						|
    DBG('A');
 | 
						|
    /*
 | 
						|
    if (__sync_fetch_and_add (&QB->mqb_next_allocators, 1)) {
 | 
						|
      // somebody else will allocate next block; busy wait instead of spuruous alloc/free
 | 
						|
      DBG('B')
 | 
						|
      while (!QB->mqb_next) {
 | 
						|
        barrier ();
 | 
						|
      }
 | 
						|
      DBG('C')
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    */
 | 
						|
    int is_small = (QB == MQ->mq_head);
 | 
						|
    struct mp_queue_block *NQB;
 | 
						|
    if (!r) {
 | 
						|
      assert(!hptr[1]);
 | 
						|
      NQB = alloc_mpq_block(val, 1, is_small);
 | 
						|
      assert(!hptr[1]);
 | 
						|
    } else {
 | 
						|
      NQB = alloc_mpq_block(val, 0, is_small);
 | 
						|
    }
 | 
						|
    assert(hptr[r] == QB);
 | 
						|
    DBG('D')
 | 
						|
    if (__sync_bool_compare_and_swap(&QB->mqb_next, 0, NQB)) {
 | 
						|
      __sync_bool_compare_and_swap(&MQ->mq_tail, QB, NQB);
 | 
						|
      DBG('E')
 | 
						|
      if (flags & MPQF_STORE_PTR) {
 | 
						|
        hptr[2] = NQB;
 | 
						|
      }
 | 
						|
      hptr[r] = 0;
 | 
						|
      return 0;
 | 
						|
    } else {
 | 
						|
      DBG('F');
 | 
						|
      NQB->mqb_magic = MQ_BLOCK_PREPARED_MAGIC;
 | 
						|
      mpq_push(is_small ? &MqPreparedSmallBlocks : &MqPreparedBlocks, NQB, 0);
 | 
						|
      __sync_fetch_and_add(&mpq_blocks_prepared, 1);
 | 
						|
    }
 | 
						|
  }
 | 
						|
#undef DBG
 | 
						|
}
 | 
						|
 | 
						|
mqn_value_t mpq_pop_w(struct mp_queue *MQ, int flags) {
 | 
						|
  assert(MQ->mq_magic == MQ_MAGIC_SEM);
 | 
						|
  int s = -1, iterations = flags & MPQF_MAX_ITERATIONS;
 | 
						|
  while (iterations-- > 0) {
 | 
						|
#if MPQ_USE_POSIX_SEMAPHORES
 | 
						|
    s = sem_trywait(&MQ->mq_sem);
 | 
						|
#else
 | 
						|
    s = mp_sem_trywait(&MQ->mq_sem);
 | 
						|
#endif
 | 
						|
    if (!s) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
#if MPQ_USE_POSIX_SEMAPHORES
 | 
						|
    assert(errno == EAGAIN || errno == EINTR);
 | 
						|
#endif
 | 
						|
  }
 | 
						|
  while (s < 0) {
 | 
						|
#if MPQ_USE_POSIX_SEMAPHORES
 | 
						|
    s = sem_wait(&MQ->mq_sem);
 | 
						|
#else
 | 
						|
    s = mp_sem_wait(&MQ->mq_sem);
 | 
						|
#endif
 | 
						|
    if (!s) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
#if MPQ_USE_POSIX_SEMAPHORES
 | 
						|
    assert(errno == EAGAIN);
 | 
						|
#endif
 | 
						|
  }
 | 
						|
  mqn_value_t *v = mpq_pop(MQ, flags);
 | 
						|
  assert(v);
 | 
						|
  return v;
 | 
						|
}
 | 
						|
 | 
						|
mqn_value_t mpq_pop_nw(struct mp_queue *MQ, int flags) {
 | 
						|
  assert(MQ->mq_magic == MQ_MAGIC_SEM);
 | 
						|
  int s = -1, iterations = flags & MPQF_MAX_ITERATIONS;
 | 
						|
  while (iterations-- > 0) {
 | 
						|
#if MPQ_USE_POSIX_SEMAPHORES
 | 
						|
    s = sem_trywait(&MQ->mq_sem);
 | 
						|
#else
 | 
						|
    s = mp_sem_trywait(&MQ->mq_sem);
 | 
						|
#endif
 | 
						|
    if (s >= 0) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
#if MPQ_USE_POSIX_SEMAPHORES
 | 
						|
    assert(errno == EAGAIN || errno == EINTR);
 | 
						|
#endif
 | 
						|
  }
 | 
						|
  if (s < 0) {
 | 
						|
    return 0;
 | 
						|
  }
 | 
						|
  mqn_value_t *v = mpq_pop(MQ, flags);
 | 
						|
  assert(v);
 | 
						|
  return v;
 | 
						|
}
 | 
						|
 | 
						|
long mpq_push_w(struct mp_queue *MQ, mqn_value_t v, int flags) {
 | 
						|
  assert(MQ->mq_magic == MQ_MAGIC_SEM);
 | 
						|
  long res = mpq_push(MQ, v, flags);
 | 
						|
#if MPQ_USE_POSIX_SEMAPHORES
 | 
						|
  assert(sem_post(&MQ->mq_sem) >= 0);
 | 
						|
#else
 | 
						|
  assert(mp_sem_post(&MQ->mq_sem) >= 0);
 | 
						|
#endif
 | 
						|
  return res;
 | 
						|
}
 | 
						|
 | 
						|
void *get_ptr_multithread_copy(void **ptr, void (*incref)(void *ptr)) {
 | 
						|
  void **hptr = &mqb_hazard_ptr[get_this_thread_id()][COMMON_HAZARD_PTR_NUM];
 | 
						|
  assert(*hptr == NULL);
 | 
						|
 | 
						|
  void *R;
 | 
						|
  while (1) {
 | 
						|
    R = *ptr;
 | 
						|
    barrier();
 | 
						|
    *hptr = R;
 | 
						|
    barrier();
 | 
						|
    mfence();
 | 
						|
 | 
						|
    if (R != *ptr) {
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
 | 
						|
    incref(R);
 | 
						|
 | 
						|
    barrier();
 | 
						|
    *hptr = NULL;
 | 
						|
 | 
						|
    break;
 | 
						|
  }
 | 
						|
  return R;
 | 
						|
}
 | 
						|
#endif
 |