icu_provider_blob/export/
blob_exporter.rs
1#![allow(clippy::expect_used)]
7
8use crate::blob_schema::*;
9use icu_provider::export::*;
10use icu_provider::{marker::DataMarkerIdHash, prelude::*};
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::sync::Mutex;
13use zerotrie::ZeroTrieSimpleAscii;
14use zerovec::maps::MutableZeroVecLike;
15use zerovec::vecs::Index32;
16use zerovec::vecs::VarZeroVecOwned;
17use zerovec::VarZeroVec;
18use zerovec::ZeroVec;
19
20use postcard::ser_flavors::{AllocVec, Flavor};
21
22pub struct BlobExporter<'w> {
25 #[allow(clippy::type_complexity)]
27 resources: Mutex<BTreeMap<DataMarkerIdHash, BTreeMap<Vec<u8>, usize>>>,
28 checksums: Mutex<BTreeMap<DataMarkerIdHash, u64>>,
29 all_markers: Mutex<BTreeSet<DataMarkerIdHash>>,
31 unique_resources: Mutex<HashMap<Vec<u8>, usize>>,
33 sink: Box<dyn std::io::Write + Sync + 'w>,
34}
35
36impl core::fmt::Debug for BlobExporter<'_> {
37 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
38 f.debug_struct("BlobExporter")
39 .field("resources", &self.resources)
40 .field("unique_resources", &self.unique_resources)
41 .field("all_markers", &self.all_markers)
42 .field("sink", &"<sink>")
43 .finish()
44 }
45}
46
47impl<'w> BlobExporter<'w> {
48 pub fn new_with_sink(sink: Box<dyn std::io::Write + Sync + 'w>) -> Self {
53 Self {
54 resources: Default::default(),
55 unique_resources: Default::default(),
56 checksums: Default::default(),
57 all_markers: Default::default(),
58 sink,
59 }
60 }
61}
62
63impl DataExporter for BlobExporter<'_> {
64 fn put_payload(
65 &self,
66 marker: DataMarkerInfo,
67 id: DataIdentifierBorrowed,
68 payload: &DataPayload<ExportMarker>,
69 ) -> Result<(), DataError> {
70 let mut serializer = postcard::Serializer {
71 output: AllocVec::new(),
72 };
73 payload.serialize(&mut serializer)?;
74 let output = serializer
75 .output
76 .finalize()
77 .expect("Failed to finalize serializer output");
78 let idx = {
79 let mut unique_resources = self.unique_resources.lock().expect("poison");
80 let len = unique_resources.len();
81 *unique_resources.entry(output).or_insert(len)
82 };
83 #[allow(clippy::expect_used)]
84 self.resources
85 .lock()
86 .expect("poison")
87 .entry(marker.id.hashed())
88 .or_default()
89 .entry({
90 let mut key = id.locale.to_string();
91 if !id.marker_attributes.is_empty() {
92 key.push(crate::blob_schema::REQUEST_SEPARATOR);
93 key.push_str(id.marker_attributes);
94 }
95 key.into_bytes()
96 })
97 .or_insert(idx);
98 Ok(())
99 }
100
101 fn flush(&self, marker: DataMarkerInfo, metadata: FlushMetadata) -> Result<(), DataError> {
102 if let Some(checksum) = metadata.checksum {
103 self.checksums
104 .lock()
105 .expect("poison")
106 .insert(marker.id.hashed(), checksum);
107 }
108 self.all_markers
109 .lock()
110 .expect("poison")
111 .insert(marker.id.hashed());
112 Ok(())
113 }
114
115 fn close(&mut self) -> Result<ExporterCloseMetadata, DataError> {
116 self.close_internal()
117 }
118}
119
120struct FinalizedBuffers {
121 vzv: VarZeroVec<'static, [u8], Index32>,
123 remap: HashMap<usize, usize>,
125}
126
127impl BlobExporter<'_> {
128 fn finalize_buffers(&mut self) -> FinalizedBuffers {
129 let sorted: Vec<(Vec<u8>, usize)> = {
134 let mut unique_resources = self.unique_resources.lock().expect("poison");
135 let mut sorted: Vec<(Vec<u8>, usize)> = unique_resources.drain().collect();
136 sorted.sort();
137 sorted
138 };
139
140 let remap: HashMap<usize, usize> = sorted
142 .iter()
143 .enumerate()
144 .map(|(new_id, (_, old_id))| (*old_id, new_id))
145 .collect();
146
147 let vzv: VarZeroVec<[u8], Index32> = {
149 let buffers: Vec<Vec<u8>> = sorted.into_iter().map(|(blob, _)| blob).collect();
150 buffers.as_slice().into()
151 };
152
153 FinalizedBuffers { vzv, remap }
154 }
155
156 fn close_internal(&mut self) -> Result<ExporterCloseMetadata, DataError> {
157 let FinalizedBuffers { mut vzv, remap } = self.finalize_buffers();
158
159 let all_markers = self.all_markers.lock().expect("poison");
160 let resources = self.resources.lock().expect("poison");
161 let checksums = self.checksums.lock().expect("poison");
162
163 let markers: ZeroVec<DataMarkerIdHash> = all_markers.iter().copied().collect();
164
165 let locales_vec: Vec<Vec<u8>> = all_markers
166 .iter()
167 .map(|marker_path_hash| {
168 (
169 resources.get(marker_path_hash),
170 checksums.get(marker_path_hash),
171 )
172 })
173 .map(|(option_sub_map, checksum)| {
174 let mut sub_map = BTreeMap::new();
175 if let Some(sub_map_wrong) = option_sub_map {
176 if let Some(&checksum) = checksum {
177 sub_map.insert(CHECKSUM_KEY, vzv.len());
178 vzv.zvl_push(checksum.to_le_bytes().as_slice());
179 }
180 sub_map.extend(sub_map_wrong.iter().map(|(key, id)| {
181 (key.as_slice(), *remap.get(id).expect("in-bound index"))
182 }));
183 }
184 ZeroTrieSimpleAscii::try_from(&sub_map)
185 .expect("in-bounds")
186 .into_store()
187 })
188 .collect();
189
190 if !markers.is_empty() {
191 if let Ok(locales_vzv) =
192 VarZeroVecOwned::<[u8]>::try_from_elements(locales_vec.as_slice())
193 {
194 let blob = BlobSchema::V003(BlobSchemaV1 {
195 markers: &markers,
196 locales: &locales_vzv,
197 buffers: &vzv,
198 });
199 log::info!("Serializing blob to output stream...");
200
201 let output = postcard::to_allocvec(&blob)?;
202 self.sink.write_all(&output)?;
203 } else {
204 log::info!("Upgrading to BlobSchema (bigger)...");
205 let locales_vzv =
206 VarZeroVecOwned::<[u8], Index32>::try_from_elements(locales_vec.as_slice())
207 .expect("Locales vector does not fit in Index32 buffer!");
208 let blob = BlobSchema::V003Bigger(BlobSchemaV1 {
209 markers: &markers,
210 locales: &locales_vzv,
211 buffers: &vzv,
212 });
213 log::info!("Serializing blob to output stream...");
214
215 let output = postcard::to_allocvec(&blob)?;
216 self.sink.write_all(&output)?;
217 }
218 }
219
220 Ok(Default::default())
221 }
222}