use std::{
    convert::{TryFrom, TryInto},
    fmt, io,
    sync::Arc,
};
use bitvec::{prelude::BitVec, slice::BitSlice};
use crate::{
    constants::{ColumnFlags, ColumnType},
    io::ParseBuf,
    misc::raw::int::*,
    packets::Column,
    proto::MyDeserialize,
    row::{new_row_raw, Row},
    value::Value,
};
use super::{
    events::{OptionalMetadataField, TableMapEvent},
    value::{BinlogValue, BinlogValueToValueError},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[allow(non_camel_case_types)]
#[repr(u64)]
pub enum BinlogRowValueOptions {
    PARTIAL_JSON_UPDATES = 1,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, thiserror::Error)]
#[error("Unknown binlog version {}", _0)]
#[repr(transparent)]
pub struct UnknownBinlogRowValueOptions(pub u64);
impl From<UnknownBinlogRowValueOptions> for u64 {
    fn from(x: UnknownBinlogRowValueOptions) -> Self {
        x.0
    }
}
impl TryFrom<u64> for BinlogRowValueOptions {
    type Error = UnknownBinlogRowValueOptions;
    fn try_from(value: u64) -> Result<Self, Self::Error> {
        match value {
            1 => Ok(Self::PARTIAL_JSON_UPDATES),
            x => Err(UnknownBinlogRowValueOptions(x)),
        }
    }
}
#[derive(Clone, PartialEq)]
pub struct BinlogRow {
    values: Vec<Option<BinlogValue<'static>>>,
    columns: Arc<[Column]>,
}
impl BinlogRow {
    pub fn new(values: Vec<Option<BinlogValue<'static>>>, columns: Arc<[Column]>) -> Self {
        Self { values, columns }
    }
    pub fn len(&self) -> usize {
        self.values.len()
    }
    pub fn is_empty(&self) -> bool {
        self.values.is_empty()
    }
    pub fn columns_ref(&self) -> &[Column] {
        &*self.columns
    }
    pub fn columns(&self) -> Arc<[Column]> {
        self.columns.clone()
    }
    pub fn as_ref(&self, index: usize) -> Option<&BinlogValue> {
        self.values.get(index).and_then(|x| x.as_ref())
    }
    pub fn take(&mut self, index: usize) -> Option<BinlogValue> {
        self.values.get_mut(index).and_then(|x| x.take())
    }
    pub fn unwrap(self) -> Vec<BinlogValue<'static>> {
        self.values
            .into_iter()
            .map(|x| x.expect("Can't unwrap row if some of columns was taken"))
            .collect()
    }
    #[doc(hidden)]
    pub fn place(&mut self, index: usize, value: BinlogValue<'static>) {
        self.values[index] = Some(value);
    }
}
impl<'de> MyDeserialize<'de> for BinlogRow {
    const SIZE: Option<usize> = None;
    type Ctx = (u64, &'de BitSlice<u8>, bool, &'de TableMapEvent<'de>);
    fn deserialize(
        (num_columns, cols, have_shared_image, table_info): Self::Ctx,
        buf: &mut ParseBuf<'de>,
    ) -> io::Result<Self> {
        let mut values: Vec<Option<BinlogValue<'static>>> = vec![];
        let mut columns = vec![];
        let mut partial_cols = if have_shared_image {
            let value_options = *buf.parse::<RawInt<LenEnc>>(())?;
            if value_options & BinlogRowValueOptions::PARTIAL_JSON_UPDATES as u64 > 0 {
                let json_columns_count = table_info.json_column_count();
                let partial_columns_len = (json_columns_count + 7) / 8;
                let partial_columns: &[u8] = buf.parse(partial_columns_len)?;
                let partial_columns = BitSlice::<u8>::from_slice(partial_columns);
                Some(partial_columns.into_iter().take(json_columns_count))
            } else {
                None
            }
        } else {
            None
        };
        let num_bits = cols.count_ones();
        let bitmap_len = (num_bits + 7) / 8;
        let bitmap_buf: &[u8] = buf.parse(bitmap_len)?;
        let mut null_bitmap = BitVec::<u8>::from_slice(bitmap_buf);
        null_bitmap.truncate(num_bits);
        let mut image_idx = 0;
        let signedness = table_info.iter_optional_meta().find_map(|m| {
            m.map(|f| match f {
                OptionalMetadataField::Signedness(bit_slice) => Some(bit_slice),
                _ => None,
            })
            .unwrap_or(None)
        });
        let mut numeric_index = 0;
        for i in 0..(num_columns as usize) {
            if cols.get(i).as_deref().copied().unwrap_or(false) {
                let column_type = table_info.get_column_type(i);
                let column_type = match column_type {
                    Ok(Some(ty)) => ty,
                    Ok(None) => {
                        return Err(io::Error::new(io::ErrorKind::InvalidData, "No column type"))
                    }
                    Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e)),
                };
                let column_meta = table_info.get_column_metadata(i).unwrap_or(&[]);
                let is_partial = column_type == ColumnType::MYSQL_TYPE_JSON
                    && partial_cols
                        .as_mut()
                        .and_then(|bits| bits.next().as_deref().copied())
                        .unwrap_or(false);
                let is_unsigned = if column_type.is_numeric_type() {
                    let is_unsigned = signedness
                        .as_ref()
                        .and_then(|bits| bits.get(numeric_index).as_deref().copied())
                        .unwrap_or_default();
                    numeric_index += 1;
                    is_unsigned
                } else {
                    false
                };
                let mut column_flags = ColumnFlags::empty();
                if is_unsigned {
                    column_flags |= ColumnFlags::UNSIGNED_FLAG;
                }
                let column = Column::new(column_type)
                    .with_name(format!("@{}", i).as_bytes())
                    .with_flags(column_flags)
                    .with_schema(table_info.database_name_raw())
                    .with_org_table(table_info.table_name_raw())
                    .with_table(table_info.table_name_raw());
                columns.push(column);
                if null_bitmap
                    .get(image_idx)
                    .as_deref()
                    .copied()
                    .unwrap_or(true)
                {
                    values.push(Some(BinlogValue::Value(Value::NULL)));
                } else {
                    let ctx = (column_type, column_meta, is_unsigned, is_partial);
                    values.push(Some(buf.parse::<BinlogValue>(ctx)?.into_owned()));
                }
                image_idx += 1;
            }
        }
        Ok(BinlogRow::new(values, columns.into_boxed_slice().into()))
    }
}
impl fmt::Debug for BinlogRow {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut debug = f.debug_struct("BinlogRow");
        for (val, column) in self.values.iter().zip(self.columns.iter()) {
            match *val {
                Some(ref val) => {
                    debug.field(column.name_str().as_ref(), val);
                }
                None => {
                    debug.field(column.name_str().as_ref(), &"<taken>");
                }
            }
        }
        debug.finish()
    }
}
#[derive(Debug, thiserror::Error)]
#[error(
    "Can't convert BinlogRow to Row at column offset {}: {}",
    column_offset,
    error
)]
pub struct BinlogRowToRowError {
    pub column_offset: usize,
    pub error: BinlogValueToValueError,
}
impl TryFrom<BinlogRow> for Row {
    type Error = BinlogRowToRowError;
    fn try_from(binlog_row: BinlogRow) -> Result<Self, Self::Error> {
        let mut values = Vec::with_capacity(binlog_row.values.len());
        for (column_offset, value) in binlog_row.values.into_iter().enumerate() {
            match value {
                Some(x) => {
                    values.push(Some(x.try_into().map_err(|error| BinlogRowToRowError {
                        column_offset,
                        error,
                    })?))
                }
                None => values.push(None),
            }
        }
        Ok(new_row_raw(values, binlog_row.columns))
    }
}