From 2dbdfe173c3b237487db4e12e507cd471ad5bcb2 Mon Sep 17 00:00:00 2001 From: Pouriya Jahanbakhsh Date: Sat, 1 Apr 2023 02:21:44 +0330 Subject: [PATCH 1/2] ref(navi/dr_transform): fix clippy & formatting issues --- navi/dr_transform/src/all_config.rs | 3 +- navi/dr_transform/src/converter.rs | 293 ++++++++++++---------------- navi/dr_transform/src/util.rs | 6 +- 3 files changed, 133 insertions(+), 169 deletions(-) diff --git a/navi/dr_transform/src/all_config.rs b/navi/dr_transform/src/all_config.rs index 426d11cef5..29451bfd44 100644 --- a/navi/dr_transform/src/all_config.rs +++ b/navi/dr_transform/src/all_config.rs @@ -44,6 +44,5 @@ pub struct RenamedFeatures { } pub fn parse(json_str: &str) -> Result { - let all_config: AllConfig = serde_json::from_str(json_str)?; - return std::result::Result::Ok(all_config); + serde_json::from_str(json_str) } diff --git a/navi/dr_transform/src/converter.rs b/navi/dr_transform/src/converter.rs index 30d3ad0a64..578d766fd6 100644 --- a/navi/dr_transform/src/converter.rs +++ b/navi/dr_transform/src/converter.rs @@ -16,8 +16,7 @@ use segdense::util; use thrift::protocol::{TBinaryInputProtocol, TSerializable}; use thrift::transport::TBufferChannel; -use crate::{all_config}; -use crate::all_config::AllConfig; +use crate::{all_config, all_config::AllConfig}; pub fn log_feature_match( dr: &DataRecord, @@ -27,26 +26,22 @@ pub fn log_feature_match( // Note the following algorithm matches features from config using linear search. // Also the record source is MinDataRecord. This includes only binary and continous features for now. - for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap().into_iter() { + for (feature_id, feature_value) in dr.continuous_features.as_ref().unwrap() { debug!( - "{} - Continous Datarecord => Feature ID: {}, Feature value: {}", - dr_type, feature_id, feature_value + "{dr_type} - Continuous Datarecord => Feature ID: {feature_id}, Feature value: {feature_value}" ); for input_feature in &seg_dense_config.cont.input_features { if input_feature.feature_id == *feature_id { - debug!("Matching input feature: {:?}", input_feature) + debug!("Matching input feature: {input_feature:?}") } } } - for feature_id in dr.binary_features.as_ref().unwrap().into_iter() { - debug!( - "{} - Binary Datarecord => Feature ID: {}", - dr_type, feature_id - ); + for feature_id in dr.binary_features.as_ref().unwrap() { + debug!("{dr_type} - Binary Datarecord => Feature ID: {feature_id}"); for input_feature in &seg_dense_config.binary.input_features { if input_feature.feature_id == *feature_id { - debug!("Found input feature: {:?}", input_feature) + debug!("Found input feature: {input_feature:?}") } } } @@ -96,15 +91,13 @@ impl BatchPredictionRequestToTorchTensorConverter { reporting_feature_ids: Vec<(i64, &str)>, register_metric_fn: Option, ) -> BatchPredictionRequestToTorchTensorConverter { - let all_config_path = format!("{}/{}/all_config.json", model_dir, model_version); - let seg_dense_config_path = format!( - "{}/{}/segdense_transform_spec_home_recap_2022.json", - model_dir, model_version - ); + let all_config_path = format!("{model_dir}/{model_version}/all_config.json"); + let seg_dense_config_path = + format!("{model_dir}/{model_version}/segdense_transform_spec_home_recap_2022.json"); let seg_dense_config = util::load_config(&seg_dense_config_path); let all_config = all_config::parse( &fs::read_to_string(&all_config_path) - .unwrap_or_else(|error| panic!("error loading all_config.json - {}", error)), + .unwrap_or_else(|error| panic!("error loading all_config.json - {error}")), ) .unwrap(); @@ -138,11 +131,11 @@ impl BatchPredictionRequestToTorchTensorConverter { let (discrete_feature_metrics, continuous_feature_metrics) = METRICS.get_or_init(|| { let discrete = HistogramVec::new( HistogramOpts::new(":navi:feature_id:discrete", "Discrete Feature ID values") - .buckets(Vec::from(&[ - 0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, + .buckets(Vec::from([ + 0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0, 500.0, 1000.0, 10000.0, 100000.0, - ] as &'static [f64])), + ])), &["feature_id"], ) .expect("metric cannot be created"); @@ -151,18 +144,18 @@ impl BatchPredictionRequestToTorchTensorConverter { ":navi:feature_id:continuous", "continuous Feature ID values", ) - .buckets(Vec::from(&[ - 0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, - 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0, 500.0, - 1000.0, 10000.0, 100000.0, - ] as &'static [f64])), + .buckets(Vec::from([ + 0.0f64, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 110.0, + 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0, 190.0, 200.0, 250.0, 300.0, + 500.0, 1000.0, 10000.0, 100000.0, + ])), &["feature_id"], ) .expect("metric cannot be created"); - register_metric_fn.map(|r| { + if let Some(r) = register_metric_fn { r(&discrete); r(&continuous); - }); + } (discrete, continuous) }); @@ -171,16 +164,13 @@ impl BatchPredictionRequestToTorchTensorConverter { for (feature_id, feature_type) in reporting_feature_ids.iter() { match *feature_type { - "discrete" => discrete_features_to_report.insert(feature_id.clone()), - "continuous" => continuous_features_to_report.insert(feature_id.clone()), - _ => panic!( - "Invalid feature type {} for reporting metrics!", - feature_type - ), + "discrete" => discrete_features_to_report.insert(*feature_id), + "continuous" => continuous_features_to_report.insert(*feature_id), + _ => panic!("Invalid feature type {feature_type} for reporting metrics!"), }; } - return BatchPredictionRequestToTorchTensorConverter { + BatchPredictionRequestToTorchTensorConverter { all_config, seg_dense_config, all_config_path, @@ -193,7 +183,7 @@ impl BatchPredictionRequestToTorchTensorConverter { continuous_features_to_report, discrete_feature_metrics, continuous_feature_metrics, - }; + } } fn get_feature_id(feature_name: &str, seg_dense_config: &Root) -> i64 { @@ -203,7 +193,7 @@ impl BatchPredictionRequestToTorchTensorConverter { return feature.feature_id; } } - return -1; + -1 } fn parse_batch_prediction_request(bytes: Vec) -> BatchPredictionRequest { @@ -211,7 +201,7 @@ impl BatchPredictionRequestToTorchTensorConverter { let mut bc = TBufferChannel::with_capacity(bytes.len(), 0); bc.set_readable_bytes(&bytes); let mut protocol = TBinaryInputProtocol::new(bc, true); - return BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap(); + BatchPredictionRequest::read_from_in_protocol(&mut protocol).unwrap() } fn get_embedding_tensors( @@ -228,45 +218,43 @@ impl BatchPredictionRequestToTorchTensorConverter { let mut working_set = vec![0 as f32; total_size]; let mut bpr_start = 0; for (bpr, &bpr_end) in bprs.iter().zip(batch_size) { - if bpr.common_features.is_some() { - if bpr.common_features.as_ref().unwrap().tensors.is_some() { - if bpr - .common_features - .as_ref() - .unwrap() - .tensors - .as_ref() - .unwrap() - .contains_key(&feature_id) + if bpr.common_features.is_some() + && bpr.common_features.as_ref().unwrap().tensors.is_some() + && bpr + .common_features + .as_ref() + .unwrap() + .tensors + .as_ref() + .unwrap() + .contains_key(&feature_id) + { + let source_tensor = bpr + .common_features + .as_ref() + .unwrap() + .tensors + .as_ref() + .unwrap() + .get(&feature_id) + .unwrap(); + let tensor = match source_tensor { + GeneralTensor::FloatTensor(float_tensor) => + //Tensor::of_slice( { - let source_tensor = bpr - .common_features - .as_ref() - .unwrap() - .tensors - .as_ref() - .unwrap() - .get(&feature_id) - .unwrap(); - let tensor = match source_tensor { - GeneralTensor::FloatTensor(float_tensor) => - //Tensor::of_slice( - { - float_tensor - .floats - .iter() - .map(|x| x.into_inner() as f32) - .collect::>() - } - _ => vec![0 as f32; cols], - }; + float_tensor + .floats + .iter() + .map(|x| x.into_inner() as f32) + .collect::>() + } + _ => vec![0 as f32; cols], + }; - // since the tensor is found in common feature, add it in all batches - for row in bpr_start..bpr_end { - for col in 0..cols { - working_set[row * cols + col] = tensor[col]; - } - } + // since the tensor is found in common feature, add it in all batches + for row in bpr_start..bpr_end { + for col in 0..cols { + working_set[row * cols + col] = tensor[col]; } } } @@ -300,7 +288,7 @@ impl BatchPredictionRequestToTorchTensorConverter { } bpr_start = bpr_end; } - return Array2::::from_shape_vec([rows, cols], working_set).unwrap(); + Array2::::from_shape_vec([rows, cols], working_set).unwrap() } // Todo : Refactor, create a generic version with different type and field accessors @@ -310,9 +298,9 @@ impl BatchPredictionRequestToTorchTensorConverter { // (INT64 --> INT64, DataRecord.discrete_feature) fn get_continuous(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor { // These need to be part of model schema - let rows: usize = batch_ends[batch_ends.len() - 1]; - let cols: usize = 5293; - let full_size: usize = (rows * cols).try_into().unwrap(); + let rows = batch_ends[batch_ends.len() - 1]; + let cols = 5293; + let full_size = rows * cols; let default_val = f32::NAN; let mut tensor = vec![default_val; full_size]; @@ -337,55 +325,48 @@ impl BatchPredictionRequestToTorchTensorConverter { .unwrap(); for feature in common_features { - match self.feature_mapper.get(feature.0) { - Some(f_info) => { - let idx = f_info.index_within_tensor as usize; - if idx < cols { - // Set value in each row - for r in bpr_start..bpr_end { - let flat_index: usize = (r * cols + idx).try_into().unwrap(); - tensor[flat_index] = feature.1.into_inner() as f32; - } + if let Some(f_info) = self.feature_mapper.get(feature.0) { + let idx = f_info.index_within_tensor as usize; + if idx < cols { + // Set value in each row + for r in bpr_start..bpr_end { + let flat_index = r * cols + idx; + tensor[flat_index] = feature.1.into_inner() as f32; } } - None => (), } if self.continuous_features_to_report.contains(feature.0) { self.continuous_feature_metrics .with_label_values(&[feature.0.to_string().as_str()]) - .observe(feature.1.into_inner() as f64) + .observe(feature.1.into_inner()) } else if self.discrete_features_to_report.contains(feature.0) { self.discrete_feature_metrics .with_label_values(&[feature.0.to_string().as_str()]) - .observe(feature.1.into_inner() as f64) + .observe(feature.1.into_inner()) } } } // Process the batch of datarecords for r in bpr_start..bpr_end { - let dr: &DataRecord = - &bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()]; + let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start]; if dr.continuous_features.is_some() { for feature in dr.continuous_features.as_ref().unwrap() { - match self.feature_mapper.get(&feature.0) { - Some(f_info) => { - let idx = f_info.index_within_tensor as usize; - let flat_index: usize = (r * cols + idx).try_into().unwrap(); - if flat_index < tensor.len() && idx < cols { - tensor[flat_index] = feature.1.into_inner() as f32; - } + if let Some(f_info) = self.feature_mapper.get(feature.0) { + let idx = f_info.index_within_tensor as usize; + let flat_index = r * cols + idx; + if flat_index < tensor.len() && idx < cols { + tensor[flat_index] = feature.1.into_inner() as f32; } - None => (), } if self.continuous_features_to_report.contains(feature.0) { self.continuous_feature_metrics .with_label_values(&[feature.0.to_string().as_str()]) - .observe(feature.1.into_inner() as f64) + .observe(feature.1.into_inner()) } else if self.discrete_features_to_report.contains(feature.0) { self.discrete_feature_metrics .with_label_values(&[feature.0.to_string().as_str()]) - .observe(feature.1.into_inner() as f64) + .observe(feature.1.into_inner()) } } } @@ -393,22 +374,19 @@ impl BatchPredictionRequestToTorchTensorConverter { bpr_start = bpr_end; } - return InputTensor::FloatTensor( - Array2::::from_shape_vec( - [rows.try_into().unwrap(), cols.try_into().unwrap()], - tensor, - ) - .unwrap() - .into_dyn(), - ); + InputTensor::FloatTensor( + Array2::::from_shape_vec([rows, cols], tensor) + .unwrap() + .into_dyn(), + ) } fn get_binary(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor { // These need to be part of model schema - let rows: usize = batch_ends[batch_ends.len() - 1]; - let cols: usize = 149; - let full_size: usize = (rows * cols).try_into().unwrap(); - let default_val: i64 = 0; + let rows = batch_ends[batch_ends.len() - 1]; + let cols = 149; + let full_size = rows * cols; + let default_val = 0; let mut v = vec![default_val; full_size]; @@ -432,55 +410,48 @@ impl BatchPredictionRequestToTorchTensorConverter { .unwrap(); for feature in common_features { - match self.feature_mapper.get(feature) { - Some(f_info) => { - let idx = f_info.index_within_tensor as usize; - if idx < cols { - // Set value in each row - for r in bpr_start..bpr_end { - let flat_index: usize = (r * cols + idx).try_into().unwrap(); - v[flat_index] = 1; - } + if let Some(f_info) = self.feature_mapper.get(feature) { + let idx = f_info.index_within_tensor as usize; + if idx < cols { + // Set value in each row + for r in bpr_start..bpr_end { + let flat_index = r * cols + idx; + v[flat_index] = 1; } } - None => (), } } } // Process the batch of datarecords for r in bpr_start..bpr_end { - let dr: &DataRecord = - &bpr.individual_features_list[usize::try_from(r - bpr_start).unwrap()]; + let dr: &DataRecord = &bpr.individual_features_list[r - bpr_start]; if dr.binary_features.is_some() { for feature in dr.binary_features.as_ref().unwrap() { - match self.feature_mapper.get(&feature) { - Some(f_info) => { - let idx = f_info.index_within_tensor as usize; - let flat_index: usize = (r * cols + idx).try_into().unwrap(); - v[flat_index] = 1; - } - None => (), + if let Some(f_info) = self.feature_mapper.get(feature) { + let idx = f_info.index_within_tensor as usize; + let flat_index = r * cols + idx; + v[flat_index] = 1; } } } } bpr_start = bpr_end; } - return InputTensor::Int64Tensor( - Array2::::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v) + InputTensor::Int64Tensor( + Array2::::from_shape_vec([rows, cols], v) .unwrap() .into_dyn(), - ); + ) } #[allow(dead_code)] fn get_discrete(&self, bprs: &[BatchPredictionRequest], batch_ends: &[usize]) -> InputTensor { // These need to be part of model schema - let rows: usize = batch_ends[batch_ends.len() - 1]; - let cols: usize = 320; - let full_size: usize = (rows * cols).try_into().unwrap(); - let default_val: i64 = 0; + let rows = batch_ends[batch_ends.len() - 1]; + let cols = 320; + let full_size = rows * cols; + let default_val = 0; let mut v = vec![default_val; full_size]; @@ -504,18 +475,15 @@ impl BatchPredictionRequestToTorchTensorConverter { .unwrap(); for feature in common_features { - match self.feature_mapper.get(feature.0) { - Some(f_info) => { - let idx = f_info.index_within_tensor as usize; - if idx < cols { - // Set value in each row - for r in bpr_start..bpr_end { - let flat_index: usize = (r * cols + idx).try_into().unwrap(); - v[flat_index] = *feature.1; - } + if let Some(f_info) = self.feature_mapper.get(feature.0) { + let idx = f_info.index_within_tensor as usize; + if idx < cols { + // Set value in each row + for r in bpr_start..bpr_end { + let flat_index = r * cols + idx; + v[flat_index] = *feature.1; } } - None => (), } if self.discrete_features_to_report.contains(feature.0) { self.discrete_feature_metrics @@ -527,18 +495,15 @@ impl BatchPredictionRequestToTorchTensorConverter { // Process the batch of datarecords for r in bpr_start..bpr_end { - let dr: &DataRecord = &bpr.individual_features_list[usize::try_from(r).unwrap()]; + let dr: &DataRecord = &bpr.individual_features_list[r]; if dr.discrete_features.is_some() { for feature in dr.discrete_features.as_ref().unwrap() { - match self.feature_mapper.get(&feature.0) { - Some(f_info) => { - let idx = f_info.index_within_tensor as usize; - let flat_index: usize = (r * cols + idx).try_into().unwrap(); - if flat_index < v.len() && idx < cols { - v[flat_index] = *feature.1; - } + if let Some(f_info) = self.feature_mapper.get(feature.0) { + let idx = f_info.index_within_tensor as usize; + let flat_index = r * cols + idx; + if flat_index < v.len() && idx < cols { + v[flat_index] = *feature.1; } - None => (), } if self.discrete_features_to_report.contains(feature.0) { self.discrete_feature_metrics @@ -550,11 +515,11 @@ impl BatchPredictionRequestToTorchTensorConverter { } bpr_start = bpr_end; } - return InputTensor::Int64Tensor( - Array2::::from_shape_vec([rows.try_into().unwrap(), cols.try_into().unwrap()], v) + InputTensor::Int64Tensor( + Array2::::from_shape_vec([rows, cols], v) .unwrap() .into_dyn(), - ); + ) } fn get_user_embedding( @@ -604,7 +569,7 @@ impl Converter for BatchPredictionRequestToTorchTensorConverter { .map(|bpr| bpr.individual_features_list.len()) .scan(0usize, |acc, e| { //running total - *acc = *acc + e; + *acc += e; Some(*acc) }) .collect::>(); diff --git a/navi/dr_transform/src/util.rs b/navi/dr_transform/src/util.rs index 8c87731856..541663ecbe 100644 --- a/navi/dr_transform/src/util.rs +++ b/navi/dr_transform/src/util.rs @@ -12,11 +12,11 @@ pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec> { for line in io::BufReader::new(file).lines() { match base64::decode(line.unwrap().trim()) { Ok(payload) => result.push(payload), - Err(err) => println!("error decoding line {}", err), + Err(err) => println!("error decoding line {err}"), } } - println!("reslt len: {}", result.len()); - return result; + println!("result len: {}", result.len()); + return result } pub fn save_to_npy(data: &[T], save_to: String) { let mut writer = WriteOptions::new() From ee5e7fc18dc0e971a6c02826b196294048765817 Mon Sep 17 00:00:00 2001 From: Pouriya Jahanbakhsh Date: Sat, 1 Apr 2023 02:33:51 +0330 Subject: [PATCH 2/2] feat(navi/dr_transform): add filename:line to file reader error message --- navi/dr_transform/src/util.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/navi/dr_transform/src/util.rs b/navi/dr_transform/src/util.rs index 541663ecbe..83b99805a0 100644 --- a/navi/dr_transform/src/util.rs +++ b/navi/dr_transform/src/util.rs @@ -9,15 +9,17 @@ use std::{ pub fn load_batch_prediction_request_base64(file_name: &str) -> Vec> { let file = File::open(file_name).expect("could not read file"); let mut result = vec![]; - for line in io::BufReader::new(file).lines() { + for (mut line_count, line) in io::BufReader::new(file).lines().enumerate() { + line_count += 1; match base64::decode(line.unwrap().trim()) { Ok(payload) => result.push(payload), - Err(err) => println!("error decoding line {err}"), + Err(err) => println!("error decoding line {file_name}:{line_count} - {err}"), } } println!("result len: {}", result.len()); - return result + result } + pub fn save_to_npy(data: &[T], save_to: String) { let mut writer = WriteOptions::new() .default_dtype()