#![allow(clippy::expect_used)]
use crate::blob_schema::*;
use icu_provider::export::*;
use icu_provider::{marker::DataMarkerPathHash, prelude::*};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Mutex;
use zerotrie::ZeroTrieSimpleAscii;
use zerovec::vecs::Index32;
use zerovec::vecs::VarZeroVecOwned;
use zerovec::VarZeroVec;
use zerovec::ZeroVec;
use postcard::ser_flavors::{AllocVec, Flavor};
pub struct BlobExporter<'w> {
#[allow(clippy::type_complexity)]
resources: Mutex<BTreeMap<DataMarkerPathHash, BTreeMap<Vec<u8>, usize>>>,
all_markers: Mutex<BTreeSet<DataMarkerPathHash>>,
unique_resources: Mutex<HashMap<Vec<u8>, usize>>,
sink: Box<dyn std::io::Write + Sync + 'w>,
}
impl core::fmt::Debug for BlobExporter<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("BlobExporter")
.field("resources", &self.resources)
.field("unique_resources", &self.unique_resources)
.field("all_markers", &self.all_markers)
.field("sink", &"<sink>")
.finish()
}
}
impl<'w> BlobExporter<'w> {
pub fn new_with_sink(sink: Box<dyn std::io::Write + Sync + 'w>) -> Self {
Self {
resources: Default::default(),
unique_resources: Default::default(),
all_markers: Default::default(),
sink,
}
}
}
impl DataExporter for BlobExporter<'_> {
fn put_payload(
&self,
marker: DataMarkerInfo,
id: DataIdentifierBorrowed,
payload: &DataPayload<ExportMarker>,
) -> Result<(), DataError> {
let mut serializer = postcard::Serializer {
output: AllocVec::new(),
};
payload.serialize(&mut serializer)?;
let output = serializer
.output
.finalize()
.expect("Failed to finalize serializer output");
let idx = {
let mut unique_resources = self.unique_resources.lock().expect("poison");
let len = unique_resources.len();
*unique_resources.entry(output).or_insert(len)
};
#[allow(clippy::expect_used)]
self.resources
.lock()
.expect("poison")
.entry(marker.path.hashed())
.or_default()
.entry({
let mut key = id.locale.to_string();
if !id.marker_attributes.is_empty() {
key.push(crate::blob_schema::REQUEST_SEPARATOR);
key.push_str(id.marker_attributes);
}
key.into_bytes()
})
.or_insert(idx);
Ok(())
}
fn flush(&self, marker: DataMarkerInfo, _metadata: FlushMetadata) -> Result<(), DataError> {
self.all_markers
.lock()
.expect("poison")
.insert(marker.path.hashed());
Ok(())
}
fn close(&mut self) -> Result<ExporterCloseMetadata, DataError> {
self.close_internal()
}
}
struct FinalizedBuffers {
vzv: VarZeroVec<'static, [u8], Index32>,
remap: HashMap<usize, usize>,
}
impl BlobExporter<'_> {
fn finalize_buffers(&mut self) -> FinalizedBuffers {
let sorted: Vec<(Vec<u8>, usize)> = {
let mut unique_resources = self.unique_resources.lock().expect("poison");
let mut sorted: Vec<(Vec<u8>, usize)> = unique_resources.drain().collect();
sorted.sort();
sorted
};
let remap: HashMap<usize, usize> = sorted
.iter()
.enumerate()
.map(|(new_id, (_, old_id))| (*old_id, new_id))
.collect();
let vzv: VarZeroVec<[u8], Index32> = {
let buffers: Vec<Vec<u8>> = sorted.into_iter().map(|(blob, _)| blob).collect();
buffers.as_slice().into()
};
FinalizedBuffers { vzv, remap }
}
fn close_internal(&mut self) -> Result<ExporterCloseMetadata, DataError> {
let FinalizedBuffers { vzv, remap } = self.finalize_buffers();
let all_markers = self.all_markers.lock().expect("poison");
let resources = self.resources.lock().expect("poison");
let markers: ZeroVec<DataMarkerPathHash> = all_markers.iter().copied().collect();
let locales_vec: Vec<Vec<u8>> = all_markers
.iter()
.map(|marker_path_hash| resources.get(marker_path_hash))
.map(|option_sub_map| {
if let Some(sub_map) = option_sub_map {
let mut sub_map = sub_map.clone();
sub_map
.iter_mut()
.for_each(|(_, id)| *id = *remap.get(id).expect("in-bound index"));
let zerotrie = ZeroTrieSimpleAscii::try_from(&sub_map).expect("in-bounds");
zerotrie.into_store()
} else {
ZeroTrieSimpleAscii::default().into_store()
}
})
.collect();
if !markers.is_empty() {
if let Ok(locales_vzv) =
VarZeroVecOwned::<[u8]>::try_from_elements(locales_vec.as_slice())
{
let blob = BlobSchema::V003(BlobSchemaV3 {
markers: &markers,
locales: &locales_vzv,
buffers: &vzv,
});
log::info!("Serializing blob to output stream...");
let output = postcard::to_allocvec(&blob)?;
self.sink.write_all(&output)?;
} else {
log::info!("Upgrading to BlobSchemaV3 (bigger)...");
let locales_vzv =
VarZeroVecOwned::<[u8], Index32>::try_from_elements(locales_vec.as_slice())
.expect("Locales vector does not fit in Index32 buffer!");
let blob = BlobSchema::V003Bigger(BlobSchemaV3 {
markers: &markers,
locales: &locales_vzv,
buffers: &vzv,
});
log::info!("Serializing blob to output stream...");
let output = postcard::to_allocvec(&blob)?;
self.sink.write_all(&output)?;
}
}
Ok(Default::default())
}
}