use std::{borrow::Cow, cmp::min, convert::TryFrom, io};
use bitvec::prelude::*;
use byteorder::ReadBytesExt;
use saturating::Saturating as S;
use crate::{
binlog::{
consts::{BinlogVersion, EventType, OptionalMetadataFieldType},
BinlogCtx, BinlogEvent, BinlogStruct,
},
constants::{ColumnType, GeometryType, UnknownColumnType},
io::ParseBuf,
misc::raw::{
bytes::{BareBytes, EofBytes, LenEnc, U8Bytes},
int::*,
RawBytes, RawConst, RawSeq, Skip,
},
proto::{MyDeserialize, MySerialize},
};
use super::BinlogEventHeader;
#[derive(Debug, Clone, Copy, Eq, PartialEq, thiserror::Error)]
pub enum BadColumnType {
#[error(transparent)]
Unknown(#[from] UnknownColumnType),
#[error("Unexpected column type: {}", _0)]
Unexpected(u8),
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct TableMapEvent<'a> {
table_id: RawInt<LeU48>,
flags: RawInt<LeU16>,
database_name: RawBytes<'a, U8Bytes>,
__null_1: Skip<1>,
table_name: RawBytes<'a, U8Bytes>,
__null_2: Skip<1>,
columns_count: RawInt<LenEnc>,
columns_type: RawSeq<'a, u8, ColumnType>,
columns_metadata: RawBytes<'a, LenEnc>,
null_bitmask: RawBytes<'a, BareBytes<0x2000000000000000>>,
optional_metadata: RawBytes<'a, EofBytes>,
}
impl<'a> TableMapEvent<'a> {
pub fn table_id(&self) -> u64 {
self.table_id.0
}
pub fn columns_count(&self) -> u64 {
self.columns_count.0
}
pub fn json_column_count(&self) -> usize {
self.columns_type
.0
.iter()
.filter(|x| **x == ColumnType::MYSQL_TYPE_JSON as u8)
.count()
}
pub fn null_bitmask(&'a self) -> &'a BitSlice<u8> {
let slice = BitSlice::from_slice(self.null_bitmask.as_bytes());
&slice[..self.columns_count() as usize]
}
pub fn database_name_raw(&'a self) -> &'a [u8] {
self.database_name.as_bytes()
}
pub fn database_name(&'a self) -> Cow<'a, str> {
self.database_name.as_str()
}
pub fn table_name_raw(&'a self) -> &'a [u8] {
self.table_name.as_bytes()
}
pub fn table_name(&'a self) -> Cow<'a, str> {
self.table_name.as_str()
}
pub fn get_raw_column_type(
&self,
col_idx: usize,
) -> Result<Option<ColumnType>, UnknownColumnType> {
self.columns_type.get(col_idx).map(|x| x.get()).transpose()
}
pub fn get_column_type(&self, col_idx: usize) -> Result<Option<ColumnType>, BadColumnType> {
self.columns_type
.get(col_idx)
.map(|x| {
x.get()
.map_err(BadColumnType::from)
.and_then(|column_type| self.get_real_type(col_idx, column_type))
})
.transpose()
}
pub fn get_column_metadata(&self, col_idx: usize) -> Option<&[u8]> {
let mut offset = 0;
for i in 0..=col_idx {
let ty = self.columns_type.get(i)?.get().ok()?;
let ptr = self.columns_metadata.as_bytes().get(offset..)?;
let (metadata, len) = ty.get_metadata(ptr, false)?;
if i == col_idx {
return Some(metadata);
} else {
offset += len;
}
}
None
}
pub fn iter_optional_meta(&'a self) -> OptionalMetadataIter<'a> {
OptionalMetadataIter {
columns: &self.columns_type,
data: self.optional_metadata.as_bytes(),
}
}
pub fn into_owned(self) -> TableMapEvent<'static> {
TableMapEvent {
table_id: self.table_id,
flags: self.flags,
database_name: self.database_name.into_owned(),
__null_1: self.__null_1,
table_name: self.table_name.into_owned(),
__null_2: self.__null_2,
columns_count: self.columns_count,
columns_type: self.columns_type.into_owned(),
columns_metadata: self.columns_metadata.into_owned(),
null_bitmask: self.null_bitmask.into_owned(),
optional_metadata: self.optional_metadata.into_owned(),
}
}
fn get_real_type(
&self,
col_idx: usize,
column_type: ColumnType,
) -> Result<ColumnType, BadColumnType> {
match column_type {
ColumnType::MYSQL_TYPE_DATE => {
return Ok(ColumnType::MYSQL_TYPE_NEWDATE);
}
ColumnType::MYSQL_TYPE_STRING => {
let mut real_type = column_type as u8;
if let Some(metadata_bytes) = self.get_column_metadata(col_idx) {
let f1 = metadata_bytes[0];
if f1 != 0 {
real_type = f1 | 0x30;
}
match real_type {
247 => return Ok(ColumnType::MYSQL_TYPE_ENUM),
248 => return Ok(ColumnType::MYSQL_TYPE_SET),
254 => return Ok(ColumnType::MYSQL_TYPE_STRING),
x => {
return Err(BadColumnType::Unexpected(x));
}
};
}
}
_ => (),
}
return Ok(column_type);
}
}
impl<'de> MyDeserialize<'de> for TableMapEvent<'de> {
const SIZE: Option<usize> = None;
type Ctx = BinlogCtx<'de>;
fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let table_id = if 6 == ctx.fde.get_event_type_header_length(Self::EVENT_TYPE) {
let table_id: RawInt<LeU32> = buf.parse(())?;
RawInt::new(table_id.0 as u64)
} else {
buf.parse(())?
};
let flags = buf.parse(())?;
let database_name = buf.parse(())?;
let __null_1 = buf.parse(())?;
let table_name = buf.parse(())?;
let __null_2 = buf.parse(())?;
let columns_count: RawInt<LenEnc> = buf.parse(())?;
let columns_type = buf.parse(columns_count.0 as usize)?;
let columns_metadata = buf.parse(())?;
let null_bitmask = buf.parse(((columns_count.0 + 7) / 8) as usize)?;
let optional_metadata = buf.parse(())?;
Ok(TableMapEvent {
table_id,
flags,
database_name,
__null_1,
table_name,
__null_2,
columns_count,
columns_type,
columns_metadata,
null_bitmask,
optional_metadata,
})
}
}
impl MySerialize for TableMapEvent<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.table_id.serialize(&mut *buf);
self.flags.serialize(&mut *buf);
self.database_name.serialize(&mut *buf);
self.__null_1.serialize(&mut *buf);
self.table_name.serialize(&mut *buf);
self.__null_2.serialize(&mut *buf);
self.columns_count.serialize(&mut *buf);
self.columns_type.serialize(&mut *buf);
self.columns_metadata.serialize(&mut *buf);
self.null_bitmask.serialize(&mut *buf);
self.optional_metadata.serialize(&mut *buf);
}
}
impl<'a> BinlogEvent<'a> for TableMapEvent<'a> {
const EVENT_TYPE: EventType = EventType::TABLE_MAP_EVENT;
}
impl<'a> BinlogStruct<'a> for TableMapEvent<'a> {
fn len(&self, _version: BinlogVersion) -> usize {
let mut len = S(0);
len += S(6);
len += S(2);
len += S(1);
len += S(min(self.database_name.0.len(), u8::MAX as usize));
len += S(1);
len += S(1);
len += S(min(self.table_name.0.len(), u8::MAX as usize));
len += S(1);
len += S(crate::misc::lenenc_int_len(self.columns_count() as u64) as usize);
len += S(self.columns_count() as usize);
len += S(crate::misc::lenenc_str_len(self.columns_metadata.as_bytes()) as usize);
len += S((self.columns_count() as usize + 8) / 7);
len += S(self.optional_metadata.len());
min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DefaultCharset<'a> {
default_charset: RawInt<LenEnc>,
non_default: RawBytes<'a, EofBytes>,
}
impl<'a> DefaultCharset<'a> {
pub fn default_charset(&self) -> u16 {
self.default_charset.0 as u16
}
pub fn iter_non_default(&'a self) -> impl Iterator<Item = io::Result<NonDefaultCharset>> + 'a {
let mut error = false;
let mut buf = ParseBuf(self.non_default.as_bytes());
std::iter::from_fn(move || {
if buf.is_empty() || error {
None
} else {
let result = buf.parse_unchecked(());
error = result.is_err();
Some(result)
}
})
}
}
impl<'de> MyDeserialize<'de> for DefaultCharset<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
default_charset: buf.parse(())?,
non_default: buf.parse(())?,
})
}
}
impl MySerialize for DefaultCharset<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.default_charset.serialize(&mut *buf);
self.non_default.serialize(&mut *buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct NonDefaultCharset {
column_index: RawInt<LenEnc>,
charset: RawInt<LenEnc>,
}
impl NonDefaultCharset {
pub fn new(column_index: u64, charset: u16) -> Self {
Self {
column_index: RawInt::new(column_index),
charset: RawInt::new(charset as u64),
}
}
pub fn column_index(&self) -> u64 {
self.column_index.0
}
pub fn charset(&self) -> u16 {
self.charset.0 as u16
}
}
impl<'de> MyDeserialize<'de> for NonDefaultCharset {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
column_index: buf.parse(())?,
charset: buf.parse(())?,
})
}
}
impl MySerialize for NonDefaultCharset {
fn serialize(&self, buf: &mut Vec<u8>) {
self.column_index.serialize(&mut *buf);
self.charset.serialize(&mut *buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ColumnCharsets<'a> {
charsets: RawBytes<'a, EofBytes>,
}
impl<'a> ColumnCharsets<'a> {
pub fn iter_charsets(&'a self) -> impl Iterator<Item = io::Result<u16>> + 'a {
let mut error = false;
let mut buf = ParseBuf(self.charsets.as_bytes());
std::iter::from_fn(move || {
if buf.is_empty() || error {
None
} else {
let result = buf.parse::<RawInt<LenEnc>>(());
error = result.is_err();
Some(result.map(|x| x.0 as u16))
}
})
}
}
impl<'de> MyDeserialize<'de> for ColumnCharsets<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
charsets: buf.parse(())?,
})
}
}
impl MySerialize for ColumnCharsets<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.charsets.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ColumnName<'a> {
name: RawBytes<'a, LenEnc>,
}
impl<'a> ColumnName<'a> {
pub fn new(name: impl Into<Cow<'a, [u8]>>) -> Self {
Self {
name: RawBytes::new(name),
}
}
pub fn name_raw(&'a self) -> &'a [u8] {
self.name.as_bytes()
}
pub fn name(&'a self) -> Cow<'a, str> {
self.name.as_str()
}
}
impl<'de> MyDeserialize<'de> for ColumnName<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
name: buf.parse(())?,
})
}
}
impl MySerialize for ColumnName<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.name.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ColumnNames<'a> {
names: RawBytes<'a, EofBytes>,
}
impl<'a> ColumnNames<'a> {
pub fn iter_names(&'a self) -> impl Iterator<Item = io::Result<ColumnName<'a>>> + 'a {
let mut error = false;
let mut buf = ParseBuf(self.names.as_bytes());
std::iter::from_fn(move || {
if buf.is_empty() || error {
None
} else {
let result = buf.parse(());
error = result.is_err();
Some(result)
}
})
}
}
impl<'de> MyDeserialize<'de> for ColumnNames<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
names: buf.parse(())?,
})
}
}
impl MySerialize for ColumnNames<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.names.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SetsStrValues<'a> {
values: RawBytes<'a, EofBytes>,
}
impl<'a> SetsStrValues<'a> {
pub fn iter_values(&'a self) -> impl Iterator<Item = io::Result<SetStrValues<'a>>> + 'a {
let mut error = false;
let mut buf = ParseBuf(self.values.as_bytes());
std::iter::from_fn(move || {
if buf.is_empty() || error {
None
} else {
let result = buf.parse(());
error = result.is_err();
Some(result)
}
})
}
}
impl<'de> MyDeserialize<'de> for SetsStrValues<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
values: buf.parse(())?,
})
}
}
impl MySerialize for SetsStrValues<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.values.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SetStrValue<'a> {
value: RawBytes<'a, LenEnc>,
}
impl<'a> SetStrValue<'a> {
pub fn new(value: impl Into<Cow<'a, [u8]>>) -> Self {
Self {
value: RawBytes::new(value),
}
}
pub fn value_raw(&'a self) -> &'a [u8] {
self.value.as_bytes()
}
pub fn value(&'a self) -> Cow<'a, str> {
self.value.as_str()
}
}
impl<'de> MyDeserialize<'de> for SetStrValue<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
value: buf.parse(())?,
})
}
}
impl MySerialize for SetStrValue<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.value.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SetStrValues<'a> {
num_variants: RawInt<LenEnc>,
values: Vec<SetStrValue<'a>>,
}
impl<'a> SetStrValues<'a> {
pub fn num_variants(&self) -> u64 {
self.num_variants.0
}
pub fn values(&'a self) -> &'a [SetStrValue<'a>] {
self.values.as_ref()
}
}
impl<'de> MyDeserialize<'de> for SetStrValues<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let num_variants: RawInt<LenEnc> = buf.parse(())?;
let mut values = Vec::with_capacity(num_variants.0 as usize);
for _ in 0..num_variants.0 {
values.push(buf.parse(())?);
}
Ok(Self {
num_variants,
values,
})
}
}
impl MySerialize for SetStrValues<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.num_variants.serialize(&mut *buf);
for value in &self.values {
value.serialize(buf);
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EnumsStrValues<'a> {
values: RawBytes<'a, EofBytes>,
}
impl<'a> EnumsStrValues<'a> {
pub fn iter_values(&'a self) -> impl Iterator<Item = io::Result<EnumStrValues<'a>>> + 'a {
let mut error = false;
let mut buf = ParseBuf(self.values.as_bytes());
std::iter::from_fn(move || {
if buf.is_empty() || error {
None
} else {
let result = buf.parse(());
error = result.is_err();
Some(result)
}
})
}
}
impl<'de> MyDeserialize<'de> for EnumsStrValues<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
values: buf.parse(())?,
})
}
}
impl MySerialize for EnumsStrValues<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.values.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EnumStrValue<'a> {
value: RawBytes<'a, LenEnc>,
}
impl<'a> EnumStrValue<'a> {
pub fn new(value: impl Into<Cow<'a, [u8]>>) -> Self {
Self {
value: RawBytes::new(value),
}
}
pub fn value_raw(&'a self) -> &'a [u8] {
self.value.as_bytes()
}
pub fn value(&'a self) -> Cow<'a, str> {
self.value.as_str()
}
}
impl<'de> MyDeserialize<'de> for EnumStrValue<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
value: buf.parse(())?,
})
}
}
impl MySerialize for EnumStrValue<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.value.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EnumStrValues<'a> {
num_variants: RawInt<LenEnc>,
values: Vec<EnumStrValue<'a>>,
}
impl<'a> EnumStrValues<'a> {
pub fn num_variants(&self) -> u64 {
self.num_variants.0
}
pub fn values(&'a self) -> &'a [EnumStrValue<'a>] {
self.values.as_ref()
}
}
impl<'de> MyDeserialize<'de> for EnumStrValues<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let num_variants: RawInt<LenEnc> = buf.parse(())?;
let mut values = Vec::with_capacity(num_variants.0 as usize);
for _ in 0..num_variants.0 {
values.push(buf.parse(())?);
}
Ok(Self {
num_variants,
values,
})
}
}
impl MySerialize for EnumStrValues<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.num_variants.serialize(&mut *buf);
for value in &self.values {
value.serialize(buf);
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct GeometryTypes<'a> {
geometry_types: RawBytes<'a, EofBytes>,
}
impl<'a> GeometryTypes<'a> {
pub fn iter_geometry_types(&'a self) -> impl Iterator<Item = io::Result<GeometryType>> + 'a {
let mut broken = false;
let mut buf = ParseBuf(self.geometry_types.as_bytes());
std::iter::from_fn(move || {
if buf.is_empty() || broken {
None
} else {
match buf.parse::<RawInt<LenEnc>>(()) {
Ok(x) => Some(
GeometryType::try_from(x.0 as u8)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)),
),
Err(err) => {
broken = true;
Some(Err(err))
}
}
}
})
}
}
impl<'de> MyDeserialize<'de> for GeometryTypes<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
geometry_types: buf.parse(())?,
})
}
}
impl MySerialize for GeometryTypes<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.geometry_types.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SimplePrimaryKey<'a> {
indexes: RawBytes<'a, EofBytes>,
}
impl<'a> SimplePrimaryKey<'a> {
pub fn iter_indexes(&'a self) -> impl Iterator<Item = io::Result<u64>> + 'a {
let mut error = false;
let mut buf = ParseBuf(self.indexes.as_bytes());
std::iter::from_fn(move || {
if buf.is_empty() || error {
None
} else {
let result = buf.parse::<RawInt<LenEnc>>(());
error = result.is_err();
Some(result.map(|x| x.0))
}
})
}
}
impl<'de> MyDeserialize<'de> for SimplePrimaryKey<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
indexes: buf.parse(())?,
})
}
}
impl MySerialize for SimplePrimaryKey<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.indexes.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct PrimaryKeysWithPrefix<'a> {
data: RawBytes<'a, EofBytes>,
}
impl<'a> PrimaryKeysWithPrefix<'a> {
pub fn iter_keys(&'a self) -> impl Iterator<Item = io::Result<PrimaryKeyWithPrefix>> + 'a {
let mut error = false;
let mut buf = ParseBuf(self.data.as_bytes());
std::iter::from_fn(move || {
if buf.is_empty() || error {
None
} else {
let result = buf.parse(());
error = result.is_err();
Some(result)
}
})
}
}
impl<'de> MyDeserialize<'de> for PrimaryKeysWithPrefix<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
data: buf.parse(())?,
})
}
}
impl MySerialize for PrimaryKeysWithPrefix<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.data.serialize(buf);
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct PrimaryKeyWithPrefix {
column_index: RawInt<LenEnc>,
prefix_length: RawInt<LenEnc>,
}
impl PrimaryKeyWithPrefix {
pub fn new(column_index: u64, prefix_length: u64) -> Self {
Self {
column_index: RawInt::new(column_index),
prefix_length: RawInt::new(prefix_length),
}
}
pub fn column_index(&self) -> u64 {
self.column_index.0
}
pub fn prefix_length(&self) -> u64 {
self.prefix_length.0
}
}
impl<'de> MyDeserialize<'de> for PrimaryKeyWithPrefix {
const SIZE: Option<usize> = None;
type Ctx = ();
fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
column_index: buf.parse(())?,
prefix_length: buf.parse(())?,
})
}
}
impl MySerialize for PrimaryKeyWithPrefix {
fn serialize(&self, buf: &mut Vec<u8>) {
self.column_index.serialize(buf);
self.prefix_length.serialize(buf);
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum OptionalMetadataField<'a> {
Signedness(
&'a BitSlice<u8, Msb0>,
),
DefaultCharset(DefaultCharset<'a>),
ColumnCharset(ColumnCharsets<'a>),
ColumnName(ColumnNames<'a>),
SetStrValue(SetsStrValues<'a>),
EnumStrValue(EnumsStrValues<'a>),
GeometryType(GeometryTypes<'a>),
SimplePrimaryKey(SimplePrimaryKey<'a>),
PrimaryKeyWithPrefix(PrimaryKeysWithPrefix<'a>),
EnumAndSetDefaultCharset(DefaultCharset<'a>),
EnumAndSetColumnCharset(ColumnCharsets<'a>),
ColumnVisibility(
&'a BitSlice<u8, Msb0>,
),
}
#[derive(Debug)]
pub struct OptionalMetadataIter<'a> {
columns: &'a RawSeq<'a, u8, ColumnType>,
data: &'a [u8],
}
impl<'a> OptionalMetadataIter<'a> {
fn read_tlv(&mut self) -> io::Result<(RawConst<u8, OptionalMetadataFieldType>, &'a [u8])> {
let t = self.data.read_u8()?;
let l = self.data.read_u8()? as usize;
let v = match self.data.get(..l) {
Some(v) => v,
None => {
self.data = &[];
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"can't read tlv value",
));
}
};
self.data = &self.data[l..];
Ok((RawConst::new(t), v))
}
fn next_tlv(
&mut self,
) -> Option<io::Result<(RawConst<u8, OptionalMetadataFieldType>, &'a [u8])>> {
if self.data.is_empty() {
return None;
}
self.read_tlv().map(Some).transpose()
}
fn num_columns(&self) -> usize {
self.columns.0.len()
}
fn count_columns(&self, f: fn(&ColumnType) -> bool) -> usize {
self.columns
.0
.iter()
.filter_map(|val| ColumnType::try_from(*val).ok())
.filter(|ty| f(ty))
.count()
}
}
impl<'a> Iterator for OptionalMetadataIter<'a> {
type Item = io::Result<OptionalMetadataField<'a>>;
fn next(&mut self) -> Option<Self::Item> {
use OptionalMetadataFieldType::*;
self.next_tlv()?
.and_then(|(t, v)| {
let mut v = ParseBuf(v);
match t.get() {
Ok(t) => match t {
SIGNEDNESS => {
let num_numeric = self.count_columns(ColumnType::is_numeric_type);
let num_flags_bytes = (num_numeric + 7) / 8;
let flags: &[u8] = v.parse(num_flags_bytes)?;
if !v.is_empty() {
return Err(io::Error::new(
io::ErrorKind::Other,
"bytes remaining on stream",
));
}
let flags = BitSlice::from_slice(flags);
Ok(OptionalMetadataField::Signedness(&flags[..num_numeric]))
}
DEFAULT_CHARSET => Ok(OptionalMetadataField::DefaultCharset(v.parse(())?)),
COLUMN_CHARSET => Ok(OptionalMetadataField::ColumnCharset(v.parse(())?)),
COLUMN_NAME => Ok(OptionalMetadataField::ColumnName(v.parse(())?)),
SET_STR_VALUE => Ok(OptionalMetadataField::SetStrValue(v.parse(())?)),
ENUM_STR_VALUE => Ok(OptionalMetadataField::EnumStrValue(v.parse(())?)),
GEOMETRY_TYPE => Ok(OptionalMetadataField::GeometryType(v.parse(())?)),
SIMPLE_PRIMARY_KEY => {
Ok(OptionalMetadataField::SimplePrimaryKey(v.parse(())?))
}
PRIMARY_KEY_WITH_PREFIX => {
Ok(OptionalMetadataField::PrimaryKeyWithPrefix(v.parse(())?))
}
ENUM_AND_SET_DEFAULT_CHARSET => Ok(
OptionalMetadataField::EnumAndSetDefaultCharset(v.parse(())?),
),
ENUM_AND_SET_COLUMN_CHARSET => {
Ok(OptionalMetadataField::EnumAndSetColumnCharset(v.parse(())?))
}
COLUMN_VISIBILITY => {
let num_columns = self.num_columns();
let num_flags_bytes = (num_columns + 7) / 8;
let flags: &[u8] = v.parse(num_flags_bytes)?;
if !v.is_empty() {
return Err(io::Error::new(
io::ErrorKind::Other,
"bytes remaining on stream",
));
}
let flags = BitSlice::from_slice(flags);
let flags = &flags[..num_columns];
Ok(OptionalMetadataField::ColumnVisibility(flags))
}
},
Err(_) => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unknown optional metadata field type",
)),
}
})
.map(Some)
.transpose()
}
}