use futures_util::FutureExt;
use priority_queue::PriorityQueue;
use tokio::sync::mpsc;
use std::{
    cmp::{Ordering, Reverse},
    collections::VecDeque,
    convert::TryFrom,
    hash::{Hash, Hasher},
    pin::Pin,
    str::FromStr,
    sync::{atomic, Arc, Mutex},
    task::{Context, Poll, Waker},
    time::{Duration, Instant},
};
use crate::{
    conn::{pool::futures::*, Conn},
    error::*,
    opts::{Opts, PoolOpts},
    queryable::transaction::{Transaction, TxOpts, TxStatus},
};
mod recycler;
pub mod futures;
mod ttl_check_inerval;
#[derive(Debug)]
struct IdlingConn {
    since: Instant,
    conn: Conn,
}
impl IdlingConn {
    fn elapsed(&self) -> Duration {
        self.since.elapsed()
    }
}
impl From<Conn> for IdlingConn {
    fn from(conn: Conn) -> Self {
        Self {
            since: Instant::now(),
            conn,
        }
    }
}
#[derive(Debug)]
struct Exchange {
    waiting: Waitlist,
    available: VecDeque<IdlingConn>,
    exist: usize,
    recycler: Option<(mpsc::UnboundedReceiver<Option<Conn>>, PoolOpts)>,
}
impl Exchange {
    fn spawn_futures_if_needed(&mut self, inner: &Arc<Inner>) {
        use recycler::Recycler;
        use ttl_check_inerval::TtlCheckInterval;
        if let Some((dropped, pool_opts)) = self.recycler.take() {
            tokio::spawn(Recycler::new(pool_opts.clone(), inner.clone(), dropped));
            if pool_opts.inactive_connection_ttl() > Duration::from_secs(0) {
                tokio::spawn(TtlCheckInterval::new(pool_opts, inner.clone()));
            }
        }
    }
}
#[derive(Default, Debug)]
struct Waitlist {
    queue: PriorityQueue<QueuedWaker, QueueId>,
}
impl Waitlist {
    fn push(&mut self, w: Waker, queue_id: QueueId) {
        self.queue.push(
            QueuedWaker {
                queue_id,
                waker: Some(w),
            },
            queue_id,
        );
    }
    fn pop(&mut self) -> Option<Waker> {
        match self.queue.pop() {
            Some((qw, _)) => Some(qw.waker.unwrap()),
            None => None,
        }
    }
    fn remove(&mut self, id: QueueId) {
        let tmp = QueuedWaker {
            queue_id: id,
            waker: None,
        };
        self.queue.remove(&tmp);
    }
    fn is_empty(&self) -> bool {
        self.queue.is_empty()
    }
}
const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) struct QueueId(Reverse<u64>);
impl QueueId {
    fn next() -> Self {
        static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
        let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
        QueueId(Reverse(id))
    }
}
#[derive(Debug)]
struct QueuedWaker {
    queue_id: QueueId,
    waker: Option<Waker>,
}
impl Eq for QueuedWaker {}
impl PartialEq for QueuedWaker {
    fn eq(&self, other: &Self) -> bool {
        self.queue_id == other.queue_id
    }
}
impl Ord for QueuedWaker {
    fn cmp(&self, other: &Self) -> Ordering {
        self.queue_id.cmp(&other.queue_id)
    }
}
impl PartialOrd for QueuedWaker {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}
impl Hash for QueuedWaker {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.queue_id.hash(state)
    }
}
#[derive(Debug)]
pub struct Inner {
    close: atomic::AtomicBool,
    closed: atomic::AtomicBool,
    exchange: Mutex<Exchange>,
}
#[derive(Debug, Clone)]
pub struct Pool {
    opts: Opts,
    inner: Arc<Inner>,
    drop: mpsc::UnboundedSender<Option<Conn>>,
}
impl Pool {
    pub fn new<O>(opts: O) -> Pool
    where
        Opts: TryFrom<O>,
        <Opts as TryFrom<O>>::Error: std::error::Error,
    {
        let opts = Opts::try_from(opts).unwrap();
        let pool_opts = opts.pool_opts().clone();
        let (tx, rx) = mpsc::unbounded_channel();
        Pool {
            opts,
            inner: Arc::new(Inner {
                close: false.into(),
                closed: false.into(),
                exchange: Mutex::new(Exchange {
                    available: VecDeque::with_capacity(pool_opts.constraints().max()),
                    waiting: Waitlist::default(),
                    exist: 0,
                    recycler: Some((rx, pool_opts)),
                }),
            }),
            drop: tx,
        }
    }
    pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
        let opts = Opts::from_str(url.as_ref())?;
        Ok(Pool::new(opts))
    }
    pub fn get_conn(&self) -> GetConn {
        GetConn::new(self)
    }
    pub async fn start_transaction(&self, options: TxOpts) -> Result<Transaction<'static>> {
        let conn = self.get_conn().await?;
        Transaction::new(conn, options).await
    }
    pub fn disconnect(self) -> DisconnectPool {
        DisconnectPool::new(self)
    }
    fn return_conn(&mut self, conn: Conn) {
        if conn.inner.stream.is_some()
            && !conn.inner.disconnected
            && !conn.expired()
            && conn.inner.tx_status == TxStatus::None
            && !conn.has_pending_result()
            && !self.inner.close.load(atomic::Ordering::Acquire)
        {
            let mut exchange = self.inner.exchange.lock().unwrap();
            if exchange.available.len() < self.opts.pool_opts().active_bound() {
                exchange.available.push_back(conn.into());
                if let Some(w) = exchange.waiting.pop() {
                    w.wake();
                }
                return;
            }
        }
        self.send_to_recycler(conn);
    }
    fn send_to_recycler(&self, conn: Conn) {
        if let Err(conn) = self.drop.send(Some(conn)) {
            let conn = conn.0.unwrap();
            if !self.inner.closed.load(atomic::Ordering::SeqCst) {
                assert!(conn.inner.pool.is_none());
                drop(conn);
            } else {
                unreachable!("Recycler exited while connections still exist");
            }
        }
    }
    fn cancel_connection(&self) {
        let mut exchange = self.inner.exchange.lock().unwrap();
        exchange.exist -= 1;
        if let Some(w) = exchange.waiting.pop() {
            w.wake();
        }
    }
    fn poll_new_conn(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        queued: bool,
        queue_id: QueueId,
    ) -> Poll<Result<GetConnInner>> {
        self.poll_new_conn_inner(cx, queued, queue_id)
    }
    fn poll_new_conn_inner(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        queued: bool,
        queue_id: QueueId,
    ) -> Poll<Result<GetConnInner>> {
        let mut exchange = self.inner.exchange.lock().unwrap();
        if self.inner.close.load(atomic::Ordering::Acquire) {
            return Err(Error::Driver(DriverError::PoolDisconnected)).into();
        }
        exchange.spawn_futures_if_needed(&self.inner);
        if !exchange.waiting.is_empty() && !queued {
            exchange.waiting.push(cx.waker().clone(), queue_id);
            return Poll::Pending;
        }
        while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() {
            if !conn.expired() {
                return Poll::Ready(Ok(GetConnInner::Checking(
                    async move {
                        conn.stream_mut()?.check().await?;
                        Ok(conn)
                    }
                    .boxed(),
                )));
            } else {
                self.send_to_recycler(conn);
            }
        }
        if exchange.exist < self.opts.pool_opts().constraints().max() {
            exchange.exist += 1;
            return Poll::Ready(Ok(GetConnInner::Connecting(
                Conn::new(self.opts.clone()).boxed(),
            )));
        }
        exchange.waiting.push(cx.waker().clone(), queue_id);
        Poll::Pending
    }
    fn unqueue(&self, queue_id: QueueId) {
        let mut exchange = self.inner.exchange.lock().unwrap();
        exchange.waiting.remove(queue_id);
    }
}
impl Drop for Conn {
    fn drop(&mut self) {
        self.inner.infile_handler = None;
        if std::thread::panicking() {
            if let Some(pool) = self.inner.pool.take() {
                pool.cancel_connection();
            }
            return;
        }
        if let Some(mut pool) = self.inner.pool.take() {
            pool.return_conn(self.take());
        } else if self.inner.stream.is_some() && !self.inner.disconnected {
            crate::conn::disconnect(self.take());
        }
    }
}
#[cfg(test)]
mod test {
    use futures_util::{
        future::{join_all, select, select_all, try_join_all},
        try_join, FutureExt,
    };
    use mysql_common::row::Row;
    use tokio::time::{sleep, timeout};
    use std::{
        cmp::Reverse,
        task::{RawWaker, RawWakerVTable, Waker},
        time::Duration,
    };
    use crate::{
        conn::pool::{Pool, QueueId, Waitlist, QUEUE_END_ID},
        opts::PoolOpts,
        prelude::*,
        test_misc::get_opts,
        PoolConstraints, TxOpts,
    };
    macro_rules! conn_ex_field {
        ($conn:expr, $field:tt) => {
            ex_field!($conn.inner.pool.as_ref().unwrap(), $field)
        };
    }
    macro_rules! ex_field {
        ($pool:expr, $field:tt) => {
            $pool.inner.exchange.lock().unwrap().$field
        };
    }
    #[test]
    fn should_not_hang() -> super::Result<()> {
        pub struct Database {
            pool: Pool,
        }
        impl Database {
            pub async fn disconnect(self) -> super::Result<()> {
                self.pool.disconnect().await?;
                Ok(())
            }
        }
        let runtime = tokio::runtime::Runtime::new().unwrap();
        let database = Database {
            pool: Pool::new(get_opts()),
        };
        runtime.block_on(database.disconnect())
    }
    #[tokio::test]
    async fn should_track_conn_if_disconnected_outside_of_a_pool() -> super::Result<()> {
        let pool = Pool::new(get_opts());
        let conn = pool.get_conn().await?;
        conn.disconnect().await?;
        pool.disconnect().await?;
        Ok(())
    }
    #[tokio::test]
    async fn should_connect() -> super::Result<()> {
        let pool = Pool::new(get_opts());
        pool.get_conn().await?.ping().await?;
        pool.disconnect().await?;
        Ok(())
    }
    #[tokio::test]
    async fn should_reconnect() -> super::Result<()> {
        let mut master = crate::Conn::new(get_opts()).await?;
        async fn test(master: &mut crate::Conn, opts: crate::OptsBuilder) -> super::Result<()> {
            const NUM_CONNS: usize = 5;
            let pool = Pool::new(opts);
            let connections = (0..NUM_CONNS).map(|_| {
                async {
                    let mut conn = pool.get_conn().await?;
                    conn.ping().await?;
                    crate::Result::Ok(conn)
                }
                .boxed()
            });
            let ids = try_join_all(connections)
                .await?
                .into_iter()
                .map(|conn| conn.id())
                .collect::<Vec<_>>();
            pool.get_conn().await?;
            for id in ids {
                master.query_drop(format!("KILL {}", id)).await?;
            }
            assert_eq!(ex_field!(pool, available).len(), NUM_CONNS);
            sleep(Duration::from_millis(500)).await;
            let _conn = pool.get_conn().await?;
            assert_eq!(ex_field!(pool, available).len(), 0);
            drop(_conn);
            pool.disconnect().await
        }
        println!("Check socket/pipe..");
        test(&mut master, get_opts()).await?;
        println!("Check tcp..");
        test(&mut master, get_opts().prefer_socket(false)).await?;
        master.disconnect().await
    }
    #[tokio::test]
    async fn should_reuse_connections() -> super::Result<()> {
        let constraints = PoolConstraints::new(1, 1).unwrap();
        let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints));
        let pool = Pool::new(opts);
        let mut conn = pool.get_conn().await?;
        let server_version = conn.server_version();
        let connection_id = conn.id();
        for _ in 0..16 {
            drop(conn);
            conn = pool.get_conn().await?;
            println!("CONN connection_id={}", conn.id());
            assert!(conn.id() == connection_id || server_version < (5, 7, 2));
        }
        Ok(())
    }
    #[tokio::test]
    #[ignore]
    async fn can_handle_the_pressure() {
        let pool = Pool::new(get_opts());
        for _ in 0..10i32 {
            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
            for i in 0..10_000 {
                let pool = pool.clone();
                let tx = tx.clone();
                tokio::spawn(async move {
                    let _ = pool.get_conn().await.unwrap();
                    tx.send(i).unwrap();
                });
            }
            drop(tx);
            while let Some(_) = rx.recv().await {}
        }
        drop(pool);
    }
    #[tokio::test]
    async fn should_start_transaction() -> super::Result<()> {
        let constraints = PoolConstraints::new(1, 1).unwrap();
        let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints));
        let pool = Pool::new(opts);
        "CREATE TEMPORARY TABLE tmp(id int)".ignore(&pool).await?;
        let mut tx = pool.start_transaction(TxOpts::default()).await?;
        tx.exec_batch("INSERT INTO tmp (id) VALUES (?)", vec![(1_u8,), (2_u8,)])
            .await?;
        tx.exec_drop("SELECT * FROM tmp", ()).await?;
        drop(tx);
        let row_opt = pool
            .get_conn()
            .await?
            .query_first("SELECT COUNT(*) FROM tmp")
            .await?;
        assert_eq!(row_opt, Some((0u8,)));
        pool.get_conn().await?.query_drop("DROP TABLE tmp").await?;
        pool.disconnect().await?;
        Ok(())
    }
    #[tokio::test]
    async fn should_check_inactive_connection_ttl() -> super::Result<()> {
        const POOL_MIN: usize = 5;
        const POOL_MAX: usize = 10;
        const INACTIVE_CONNECTION_TTL: Duration = Duration::from_millis(500);
        const TTL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
        let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
        let pool_opts = PoolOpts::default()
            .with_constraints(constraints)
            .with_inactive_connection_ttl(INACTIVE_CONNECTION_TTL)
            .with_ttl_check_interval(TTL_CHECK_INTERVAL);
        let pool = Pool::new(get_opts().pool_opts(pool_opts));
        let pool_clone = pool.clone();
        let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
        let conns = try_join_all(conns).await?;
        assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
        drop(conns);
        sleep(Duration::from_millis(100)).await;
        assert_eq!(ex_field!(pool_clone, available).len(), POOL_MAX);
        sleep(TTL_CHECK_INTERVAL).await;
        sleep(Duration::from_millis(500)).await;
        assert_eq!(ex_field!(pool_clone, available).len(), POOL_MIN);
        Ok(())
    }
    #[tokio::test]
    async fn aa_should_hold_bounds2() -> super::Result<()> {
        use std::cmp::min;
        const POOL_MIN: usize = 5;
        const POOL_MAX: usize = 10;
        let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
        let pool_opts = PoolOpts::default().with_constraints(constraints);
        let pool = Pool::new(get_opts().pool_opts(pool_opts));
        let pool_clone = pool.clone();
        let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
        let mut conns = try_join_all(conns).await?;
        assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
        while !conns.is_empty() {
            let _ = conns.pop();
            sleep(Duration::from_millis(50)).await;
            let dropped = POOL_MAX - conns.len();
            let idle = min(dropped, POOL_MIN);
            let expected = conns.len() + idle;
            let have = ex_field!(pool_clone, exist);
            assert_eq!(have, expected);
        }
        pool.disconnect().await?;
        Ok(())
    }
    #[tokio::test]
    async fn should_hold_bounds1() -> super::Result<()> {
        let constraints = PoolConstraints::new(1, 2).unwrap();
        let opts = get_opts().pool_opts(PoolOpts::default().with_constraints(constraints));
        let pool = Pool::new(opts);
        let pool_clone = pool.clone();
        let (conn1, conn2) = try_join!(pool.get_conn(), pool.get_conn()).unwrap();
        assert_eq!(conn_ex_field!(conn1, exist), 2);
        assert_eq!(conn_ex_field!(conn1, available).len(), 0);
        drop(conn1);
        drop(conn2);
        let conn1 = pool_clone.get_conn().await?;
        assert_eq!(conn_ex_field!(conn1, available).len(), 0);
        drop(conn1);
        assert!(ex_field!(pool, available).len() <= 1);
        pool.disconnect().await?;
        Ok(())
    }
    #[tokio::test]
    async fn should_hold_bounds_on_error() -> super::Result<()> {
        let pool = Pool::new("mysql://255.255.255.255");
        assert!(try_join!(pool.get_conn(), pool.get_conn()).is_err());
        assert_eq!(ex_field!(pool, exist), 0);
        Ok(())
    }
    #[tokio::test]
    async fn zz_should_check_wait_timeout_on_get_conn() -> super::Result<()> {
        let pool = Pool::new(get_opts());
        let mut conn = pool.get_conn().await?;
        let wait_timeout_orig: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
        conn.query_drop("SET GLOBAL wait_timeout = 3").await?;
        conn.disconnect().await?;
        let mut conn = pool.get_conn().await?;
        let wait_timeout: Option<usize> = conn.query_first("SELECT @@wait_timeout").await?;
        let id1: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
        drop(conn);
        assert_eq!(wait_timeout, Some(3));
        assert_eq!(ex_field!(pool, exist), 1);
        sleep(Duration::from_secs(6)).await;
        let mut conn = pool.get_conn().await?;
        let id2: Option<usize> = conn.query_first("SELECT CONNECTION_ID()").await?;
        assert_eq!(ex_field!(pool, exist), 1);
        assert_ne!(id1, id2);
        conn.exec_drop("SET GLOBAL wait_timeout = ?", (wait_timeout_orig,))
            .await?;
        drop(conn);
        pool.disconnect().await?;
        Ok(())
    }
    #[tokio::test]
    async fn droptest() -> super::Result<()> {
        let pool = Pool::new(get_opts());
        let conns = try_join_all((0..10).map(|_| pool.get_conn()))
            .await
            .unwrap();
        drop(conns);
        drop(pool);
        let pool = Pool::new(get_opts());
        let conns = try_join_all((0..10).map(|_| pool.get_conn()))
            .await
            .unwrap();
        drop(pool);
        drop(conns);
        Ok(())
    }
    #[test]
    fn drop_impl_for_conn_should_not_panic_within_unwind() {
        use tokio::runtime;
        const PANIC_MESSAGE: &str = "ORIGINAL_PANIC";
        let result = std::panic::catch_unwind(|| {
            runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async {
                    let pool = Pool::new(get_opts());
                    let _conn = pool.get_conn().await.unwrap();
                    std::panic::panic_any(PANIC_MESSAGE);
                });
        });
        assert_eq!(
            *result.unwrap_err().downcast::<&str>().unwrap(),
            "ORIGINAL_PANIC",
        );
    }
    #[test]
    fn should_not_panic_on_unclean_shutdown() {
        for _ in 0..10 {
            let rt = tokio::runtime::Runtime::new().unwrap();
            let (tx, rx) = tokio::sync::oneshot::channel();
            rt.block_on(async move {
                let pool = Pool::new(get_opts());
                let mut c = pool.get_conn().await.unwrap();
                tokio::spawn(async move {
                    let _ = rx.await;
                    let _ = c.query_drop("SELECT 1").await;
                });
            });
            drop(rt);
            let _ = tx.send(());
        }
    }
    #[test]
    fn should_perform_clean_shutdown() {
        for _ in 0..10 {
            let rt = tokio::runtime::Runtime::new().unwrap();
            let (tx, rx) = tokio::sync::oneshot::channel();
            let jh = rt.spawn(async move {
                let pool = Pool::new(get_opts());
                let mut c = pool.get_conn().await.unwrap();
                tokio::spawn(async move {
                    let _ = rx.await;
                    let _ = c.query_drop("SELECT 1").await;
                });
                let _ = pool.disconnect().await;
            });
            let _ = tx.send(());
            rt.block_on(jh).unwrap();
        }
    }
    #[tokio::test]
    async fn issue_126_should_cleanup_errors_in_multiresult_sets() -> super::Result<()> {
        let pool_constraints = PoolConstraints::new(0, 1).unwrap();
        let pool_opts = PoolOpts::default().with_constraints(pool_constraints);
        let pool = Pool::new(get_opts().pool_opts(pool_opts));
        for _ in 0u8..100 {
            pool.get_conn()
                .await?
                .query_iter("DO '42'; BLABLA;")
                .await?;
        }
        Ok(())
    }
    #[tokio::test]
    async fn should_ignore_non_fatal_errors_while_returning_to_a_pool() -> super::Result<()> {
        let pool_constraints = PoolConstraints::new(1, 1).unwrap();
        let pool_opts = PoolOpts::default().with_constraints(pool_constraints);
        let pool = Pool::new(get_opts().pool_opts(pool_opts));
        let id = pool.get_conn().await?.id();
        for _ in 0u8..10 {
            let mut conn = pool.get_conn().await?;
            conn.query_iter("DO '42'; BLABLA;").await?;
            assert_eq!(id, conn.id());
        }
        Ok(())
    }
    #[tokio::test]
    async fn should_remove_waker_of_cancelled_task() {
        let pool_constraints = PoolConstraints::new(1, 1).unwrap();
        let pool_opts = PoolOpts::default().with_constraints(pool_constraints);
        let pool = Pool::new(get_opts().pool_opts(pool_opts));
        let only_conn = pool.get_conn().await.unwrap();
        let join_handle = tokio::spawn(timeout(Duration::from_secs(1), pool.get_conn()));
        sleep(Duration::from_secs(2)).await;
        match join_handle.await.unwrap() {
            Err(_elapsed) => (),
            _ => panic!("unexpected Ok()"),
        }
        drop(only_conn);
        assert_eq!(0, pool.inner.exchange.lock().unwrap().waiting.queue.len());
    }
    #[tokio::test]
    async fn should_work_if_pooled_connection_operation_is_cancelled() -> super::Result<()> {
        let pool = Pool::new(get_opts());
        join_all((0..10).map(|_| pool.get_conn())).await;
        async fn op(pool: &Pool) {
            let _: Option<Row> = pool
                .get_conn()
                .await
                .unwrap()
                .exec_first("SELECT ?, ?", (42, "foo"))
                .await
                .unwrap();
        }
        let mut max_delay = 0_u128;
        for _ in 0..10_usize {
            let start = std::time::Instant::now();
            op(&pool).await;
            max_delay = std::cmp::max(max_delay, start.elapsed().as_micros());
        }
        for _ in 0_usize..128 {
            let fut = select_all((0_usize..5).map(|_| op(&pool).boxed()));
            let delay_micros = rand::random::<u128>() % max_delay;
            select(
                sleep(Duration::from_micros(delay_micros as u64)).boxed(),
                fut,
            )
            .await;
            sleep(Duration::from_millis(100)).await;
        }
        Ok(())
    }
    #[test]
    fn waitlist_integrity() {
        const DATA: *const () = &();
        const NOOP_CLONE_FN: unsafe fn(*const ()) -> RawWaker = |_| RawWaker::new(DATA, &RW_VTABLE);
        const NOOP_FN: unsafe fn(*const ()) = |_| {};
        static RW_VTABLE: RawWakerVTable =
            RawWakerVTable::new(NOOP_CLONE_FN, NOOP_FN, NOOP_FN, NOOP_FN);
        let w = unsafe { Waker::from_raw(RawWaker::new(DATA, &RW_VTABLE)) };
        let mut waitlist = Waitlist::default();
        assert_eq!(0, waitlist.queue.len());
        waitlist.push(w.clone(), QueueId(Reverse(4)));
        waitlist.push(w.clone(), QueueId(Reverse(2)));
        waitlist.push(w.clone(), QueueId(Reverse(8)));
        waitlist.push(w.clone(), QUEUE_END_ID);
        waitlist.push(w.clone(), QueueId(Reverse(10)));
        waitlist.remove(QueueId(Reverse(8)));
        assert_eq!(4, waitlist.queue.len());
        let (_, id) = waitlist.queue.pop().unwrap();
        assert_eq!(2, id.0 .0);
        let (_, id) = waitlist.queue.pop().unwrap();
        assert_eq!(4, id.0 .0);
        let (_, id) = waitlist.queue.pop().unwrap();
        assert_eq!(10, id.0 .0);
        let (_, id) = waitlist.queue.pop().unwrap();
        assert_eq!(QUEUE_END_ID, id);
        assert_eq!(0, waitlist.queue.len());
    }
    #[cfg(feature = "nightly")]
    mod bench {
        use futures_util::future::{FutureExt, TryFutureExt};
        use tokio::runtime::Runtime;
        use crate::{prelude::Queryable, test_misc::get_opts, Pool, PoolConstraints, PoolOpts};
        use std::time::Duration;
        #[bench]
        fn get_conn(bencher: &mut test::Bencher) {
            let mut runtime = Runtime::new().unwrap();
            let pool = Pool::new(get_opts());
            bencher.iter(|| {
                let fut = pool
                    .get_conn()
                    .and_then(|mut conn| async { conn.ping().await.map(|_| conn) });
                runtime.block_on(fut).unwrap();
            });
            runtime.block_on(pool.disconnect()).unwrap();
        }
        #[bench]
        fn new_conn_on_pool_soft_boundary(bencher: &mut test::Bencher) {
            let mut runtime = Runtime::new().unwrap();
            let pool_constraints = PoolConstraints::new(0, 1).unwrap();
            let pool_opts = PoolOpts::default()
                .with_constraints(pool_constraints)
                .with_inactive_connection_ttl(Duration::from_secs(1));
            let pool = Pool::new(get_opts().pool_opts(pool_opts));
            bencher.iter(|| {
                let fut = pool.get_conn().map(drop);
                runtime.block_on(fut);
            });
            runtime.block_on(pool.disconnect()).unwrap();
        }
    }
}