1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
///! Busy handler (when the database is locked)
use std::convert::TryInto;
use std::mem;
use std::os::raw::{c_int, c_void};
use std::panic::catch_unwind;
use std::ptr;
use std::time::Duration;

use crate::ffi;
use crate::{Connection, InnerConnection, Result};

impl Connection {
    /// Set a busy handler that sleeps for a specified amount of time when a
    /// table is locked. The handler will sleep multiple times until at
    /// least "ms" milliseconds of sleeping have accumulated.
    ///
    /// Calling this routine with an argument equal to zero turns off all busy
    /// handlers.
    ///
    /// There can only be a single busy handler for a particular database
    /// connection at any given moment. If another busy handler was defined
    /// (using [`busy_handler`](Connection::busy_handler)) prior to calling this
    /// routine, that other busy handler is cleared.
    ///
    /// Newly created connections currently have a default busy timeout of
    /// 5000ms, but this may be subject to change.
    pub fn busy_timeout(&self, timeout: Duration) -> Result<()> {
        let ms: i32 = timeout
            .as_secs()
            .checked_mul(1000)
            .and_then(|t| t.checked_add(timeout.subsec_millis().into()))
            .and_then(|t| t.try_into().ok())
            .expect("too big");
        self.db.borrow_mut().busy_timeout(ms)
    }

    /// Register a callback to handle `SQLITE_BUSY` errors.
    ///
    /// If the busy callback is `None`, then `SQLITE_BUSY` is returned
    /// immediately upon encountering the lock. The argument to the busy
    /// handler callback is the number of times that the
    /// busy handler has been invoked previously for the
    /// same locking event. If the busy callback returns `false`, then no
    /// additional attempts are made to access the
    /// database and `SQLITE_BUSY` is returned to the
    /// application. If the callback returns `true`, then another attempt
    /// is made to access the database and the cycle repeats.
    ///
    /// There can only be a single busy handler defined for each database
    /// connection. Setting a new busy handler clears any previously set
    /// handler. Note that calling [`busy_timeout()`](Connection::busy_timeout)
    /// or evaluating `PRAGMA busy_timeout=N` will change the busy handler
    /// and thus clear any previously set busy handler.
    ///
    /// Newly created connections default to a
    /// [`busy_timeout()`](Connection::busy_timeout) handler with a timeout
    /// of 5000ms, although this is subject to change.
    pub fn busy_handler(&self, callback: Option<fn(i32) -> bool>) -> Result<()> {
        unsafe extern "C" fn busy_handler_callback(p_arg: *mut c_void, count: c_int) -> c_int {
            let handler_fn: fn(i32) -> bool = mem::transmute(p_arg);
            c_int::from(catch_unwind(|| handler_fn(count)).unwrap_or_default())
        }
        let c = self.db.borrow_mut();
        let r = match callback {
            Some(f) => unsafe {
                ffi::sqlite3_busy_handler(c.db(), Some(busy_handler_callback), f as *mut c_void)
            },
            None => unsafe { ffi::sqlite3_busy_handler(c.db(), None, ptr::null_mut()) },
        };
        c.decode_result(r)
    }
}

impl InnerConnection {
    #[inline]
    fn busy_timeout(&mut self, timeout: c_int) -> Result<()> {
        let r = unsafe { ffi::sqlite3_busy_timeout(self.db, timeout) };
        self.decode_result(r)
    }
}

#[cfg(test)]
mod test {
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::mpsc::sync_channel;
    use std::thread;
    use std::time::Duration;

    use crate::{Connection, ErrorCode, Result, TransactionBehavior};

    #[test]
    fn test_default_busy() -> Result<()> {
        let temp_dir = tempfile::tempdir().unwrap();
        let path = temp_dir.path().join("test.db3");

        let mut db1 = Connection::open(&path)?;
        let tx1 = db1.transaction_with_behavior(TransactionBehavior::Exclusive)?;
        let db2 = Connection::open(&path)?;
        let r: Result<()> = db2.query_row("PRAGMA schema_version", [], |_| unreachable!());
        assert_eq!(
            r.unwrap_err().sqlite_error_code(),
            Some(ErrorCode::DatabaseBusy)
        );
        tx1.rollback()
    }

    #[test]
    #[ignore] // FIXME: unstable
    fn test_busy_timeout() {
        let temp_dir = tempfile::tempdir().unwrap();
        let path = temp_dir.path().join("test.db3");

        let db2 = Connection::open(&path).unwrap();
        db2.busy_timeout(Duration::from_secs(1)).unwrap();

        let (rx, tx) = sync_channel(0);
        let child = thread::spawn(move || {
            let mut db1 = Connection::open(&path).unwrap();
            let tx1 = db1
                .transaction_with_behavior(TransactionBehavior::Exclusive)
                .unwrap();
            rx.send(1).unwrap();
            thread::sleep(Duration::from_millis(100));
            tx1.rollback().unwrap();
        });

        assert_eq!(tx.recv().unwrap(), 1);
        let _ = db2
            .query_row("PRAGMA schema_version", [], |row| row.get::<_, i32>(0))
            .expect("unexpected error");

        child.join().unwrap();
    }

    #[test]
    #[ignore] // FIXME: unstable
    fn test_busy_handler() {
        static CALLED: AtomicBool = AtomicBool::new(false);
        fn busy_handler(_: i32) -> bool {
            CALLED.store(true, Ordering::Relaxed);
            thread::sleep(Duration::from_millis(100));
            true
        }

        let temp_dir = tempfile::tempdir().unwrap();
        let path = temp_dir.path().join("test.db3");

        let db2 = Connection::open(&path).unwrap();
        db2.busy_handler(Some(busy_handler)).unwrap();

        let (rx, tx) = sync_channel(0);
        let child = thread::spawn(move || {
            let mut db1 = Connection::open(&path).unwrap();
            let tx1 = db1
                .transaction_with_behavior(TransactionBehavior::Exclusive)
                .unwrap();
            rx.send(1).unwrap();
            thread::sleep(Duration::from_millis(100));
            tx1.rollback().unwrap();
        });

        assert_eq!(tx.recv().unwrap(), 1);
        let _ = db2
            .query_row("PRAGMA schema_version", [], |row| row.get::<_, i32>(0))
            .expect("unexpected error");
        assert!(CALLED.load(Ordering::Relaxed));

        child.join().unwrap();
    }
}