icu_provider_blob/export/
blob_exporter.rs

1// This file is part of ICU4X. For terms of use, please see the file
2// called LICENSE at the top level of the ICU4X source tree
3// (online at: https://github.com/unicode-org/icu4x/blob/main/LICENSE ).
4
5// This is "export" feature, and there are many internal invariants
6#![allow(clippy::expect_used)]
7
8use crate::blob_schema::*;
9use icu_provider::export::*;
10use icu_provider::{marker::DataMarkerIdHash, prelude::*};
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::sync::Mutex;
13use zerotrie::ZeroTrieSimpleAscii;
14use zerovec::maps::MutableZeroVecLike;
15use zerovec::vecs::Index32;
16use zerovec::vecs::VarZeroVecOwned;
17use zerovec::VarZeroVec;
18use zerovec::ZeroVec;
19
20use postcard::ser_flavors::{AllocVec, Flavor};
21
22/// A data exporter that writes data to a single-file blob.
23/// See the module-level docs for an example.
24pub struct BlobExporter<'w> {
25    /// Map of marker path hash -> locale byte string -> blob ID
26    #[allow(clippy::type_complexity)]
27    resources: Mutex<BTreeMap<DataMarkerIdHash, BTreeMap<Vec<u8>, usize>>>,
28    checksums: Mutex<BTreeMap<DataMarkerIdHash, u64>>,
29    // All seen markers
30    all_markers: Mutex<BTreeSet<DataMarkerIdHash>>,
31    /// Map from blob to blob ID
32    unique_resources: Mutex<HashMap<Vec<u8>, usize>>,
33    sink: Box<dyn std::io::Write + Sync + 'w>,
34}
35
36impl core::fmt::Debug for BlobExporter<'_> {
37    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
38        f.debug_struct("BlobExporter")
39            .field("resources", &self.resources)
40            .field("unique_resources", &self.unique_resources)
41            .field("all_markers", &self.all_markers)
42            .field("sink", &"<sink>")
43            .finish()
44    }
45}
46
47impl<'w> BlobExporter<'w> {
48    /// Creates a version 1 [`BlobExporter`] that writes to the given I/O stream.
49    ///
50    /// Version 1 is needed if the blob may be consumed by ICU4X versions 1.0 through 1.3. If
51    /// targeting only ICU4X 1.4 and above, see [BlobExporter::new_with_sink()].
52    pub fn new_with_sink(sink: Box<dyn std::io::Write + Sync + 'w>) -> Self {
53        Self {
54            resources: Default::default(),
55            unique_resources: Default::default(),
56            checksums: Default::default(),
57            all_markers: Default::default(),
58            sink,
59        }
60    }
61}
62
63impl DataExporter for BlobExporter<'_> {
64    fn put_payload(
65        &self,
66        marker: DataMarkerInfo,
67        id: DataIdentifierBorrowed,
68        payload: &DataPayload<ExportMarker>,
69    ) -> Result<(), DataError> {
70        let mut serializer = postcard::Serializer {
71            output: AllocVec::new(),
72        };
73        payload.serialize(&mut serializer)?;
74        let output = serializer
75            .output
76            .finalize()
77            .expect("Failed to finalize serializer output");
78        let idx = {
79            let mut unique_resources = self.unique_resources.lock().expect("poison");
80            let len = unique_resources.len();
81            *unique_resources.entry(output).or_insert(len)
82        };
83        #[allow(clippy::expect_used)]
84        self.resources
85            .lock()
86            .expect("poison")
87            .entry(marker.id.hashed())
88            .or_default()
89            .entry({
90                let mut key = id.locale.to_string();
91                if !id.marker_attributes.is_empty() {
92                    key.push(crate::blob_schema::REQUEST_SEPARATOR);
93                    key.push_str(id.marker_attributes);
94                }
95                key.into_bytes()
96            })
97            .or_insert(idx);
98        Ok(())
99    }
100
101    fn flush(&self, marker: DataMarkerInfo, metadata: FlushMetadata) -> Result<(), DataError> {
102        if let Some(checksum) = metadata.checksum {
103            self.checksums
104                .lock()
105                .expect("poison")
106                .insert(marker.id.hashed(), checksum);
107        }
108        self.all_markers
109            .lock()
110            .expect("poison")
111            .insert(marker.id.hashed());
112        Ok(())
113    }
114
115    fn close(&mut self) -> Result<ExporterCloseMetadata, DataError> {
116        self.close_internal()
117    }
118}
119
120struct FinalizedBuffers {
121    /// Sorted list of blob to old ID; the index in the vec is the new ID
122    vzv: VarZeroVec<'static, [u8], Index32>,
123    /// Map from old ID to new ID
124    remap: HashMap<usize, usize>,
125}
126
127impl BlobExporter<'_> {
128    fn finalize_buffers(&mut self) -> FinalizedBuffers {
129        // The blob IDs are unstable due to the parallel nature of datagen.
130        // In order to make a canonical form, we sort them lexicographically now.
131
132        // This is a sorted list of blob to old ID; the index in the vec is the new ID
133        let sorted: Vec<(Vec<u8>, usize)> = {
134            let mut unique_resources = self.unique_resources.lock().expect("poison");
135            let mut sorted: Vec<(Vec<u8>, usize)> = unique_resources.drain().collect();
136            sorted.sort();
137            sorted
138        };
139
140        // This is a map from old ID to new ID
141        let remap: HashMap<usize, usize> = sorted
142            .iter()
143            .enumerate()
144            .map(|(new_id, (_, old_id))| (*old_id, new_id))
145            .collect();
146
147        // Convert the sorted list to a VarZeroVec
148        let vzv: VarZeroVec<[u8], Index32> = {
149            let buffers: Vec<Vec<u8>> = sorted.into_iter().map(|(blob, _)| blob).collect();
150            buffers.as_slice().into()
151        };
152
153        FinalizedBuffers { vzv, remap }
154    }
155
156    fn close_internal(&mut self) -> Result<ExporterCloseMetadata, DataError> {
157        let FinalizedBuffers { mut vzv, remap } = self.finalize_buffers();
158
159        let all_markers = self.all_markers.lock().expect("poison");
160        let resources = self.resources.lock().expect("poison");
161        let checksums = self.checksums.lock().expect("poison");
162
163        let markers: ZeroVec<DataMarkerIdHash> = all_markers.iter().copied().collect();
164
165        let locales_vec: Vec<Vec<u8>> = all_markers
166            .iter()
167            .map(|marker_path_hash| {
168                (
169                    resources.get(marker_path_hash),
170                    checksums.get(marker_path_hash),
171                )
172            })
173            .map(|(option_sub_map, checksum)| {
174                let mut sub_map = BTreeMap::new();
175                if let Some(sub_map_wrong) = option_sub_map {
176                    if let Some(&checksum) = checksum {
177                        sub_map.insert(CHECKSUM_KEY, vzv.len());
178                        vzv.zvl_push(checksum.to_le_bytes().as_slice());
179                    }
180                    sub_map.extend(sub_map_wrong.iter().map(|(key, id)| {
181                        (key.as_slice(), *remap.get(id).expect("in-bound index"))
182                    }));
183                }
184                ZeroTrieSimpleAscii::try_from(&sub_map)
185                    .expect("in-bounds")
186                    .into_store()
187            })
188            .collect();
189
190        if !markers.is_empty() {
191            if let Ok(locales_vzv) =
192                VarZeroVecOwned::<[u8]>::try_from_elements(locales_vec.as_slice())
193            {
194                let blob = BlobSchema::V003(BlobSchemaV1 {
195                    markers: &markers,
196                    locales: &locales_vzv,
197                    buffers: &vzv,
198                });
199                log::info!("Serializing blob to output stream...");
200
201                let output = postcard::to_allocvec(&blob)?;
202                self.sink.write_all(&output)?;
203            } else {
204                log::info!("Upgrading to BlobSchema (bigger)...");
205                let locales_vzv =
206                    VarZeroVecOwned::<[u8], Index32>::try_from_elements(locales_vec.as_slice())
207                        .expect("Locales vector does not fit in Index32 buffer!");
208                let blob = BlobSchema::V003Bigger(BlobSchemaV1 {
209                    markers: &markers,
210                    locales: &locales_vzv,
211                    buffers: &vzv,
212                });
213                log::info!("Serializing blob to output stream...");
214
215                let output = postcard::to_allocvec(&blob)?;
216                self.sink.write_all(&output)?;
217            }
218        }
219
220        Ok(Default::default())
221    }
222}