Source code for pipeline.code.dintegrate.main

'''
@Time: 2024/02/11 09:11:30
@Author: Shangzhengii 
@Version: 1.0
@Contact: huangshangzheng@ibp.ac.cn
@Desc: Using to integrate Allen sample and single-nucleus dataset.
'''
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
import os

import anndata
import scanpy as sc
from tqdm import tqdm
from scipy import stats

from collections import Counter
from pipeline.code.dintegrate.data_loader import load_data
from multiprocessing import Pool


[docs]class DataIntegrator(object): """Single-nucleus dataset and AHBA dataset integration processor. For weighted integration of Allen Brain Atlas sample expression data with single-nucleus RNA-seq data to construct integrated expression matrices. Supports parallel processing and optimized downsampling. """
[docs] def __init__(self, config: None) -> None: """ Initialize the data integration processor. Parameters ---------- config : object Configuration object containing parameters for integration. """ self.cfg = config
[docs] def run(self)-> None: """ Execute the integration pipeline. Steps: - Construct the cell pool if cfg.pool is True. - Clean the pool to remove repeatedly assigned cells. - Perform weighted integration over multiple iterations. - Generate final integrated anndata files. """ if self.cfg.pool: args = [row for _, row in align_csv.iterrows()] with Pool(self.cfg.jobs) as p: list(tqdm( p.imap(self.pool_construction, args), total=len(align_csv), desc='Constructing the pool......')) if self.cfg.integrate: self.clean_pool() regions = align_csv['brain_region'].tolist() for k in range(self.cfg.iterations): with Pool(self.cfg.jobs) as p: list(tqdm( p.imap(self.weighted_average, regions), total=len(regions), desc=f'Generating integrated dataset (round {k+1}/{self.cfg.iterations})')) self.generate_integrated_anndata(k_=k)
[docs] def pool_construction(self,row) -> None: """ Retain smooth cells with expression similarity in the top 10% of AHBA regional mean expression values for the sampling pool. Parameters ---------- row : pandas.Series A row from the alignment CSV representing a brain region mapping. """ pool_csv = pd.DataFrame() corr_list = [] cell_id = [] region_ = [] allen_region = row['brain_region'] sn_region = row['sn_region'] Allen_ex_region = ahba_mean_ex.loc[allen_region].values h19_data_region = h19_data[h19_data.index.isin(sn_region.split('\\'))] for i,j in enumerate(h19_data_region.iloc[:,:-1].values): corr = stats.pearsonr(Allen_ex_region,j)[0] if corr >= thre_df.loc[allen_region,'correlation']: pool_csv = pool_csv.append(pd.Series(j),ignore_index=True) corr_list.append(corr) cell_id.append(h19_data_region.iloc[i,-1]) region_.append((allen_region,sn_region)) pool_csv['corr']=corr_list pool_csv['cell_id'] = cell_id pool_csv['sample_region'] = region_ save_path = './{0}/{1}.csv'.format(self.cfg.data_files['pool_s_path'],allen_region) os.makedirs(os.path.dirname(save_path),exist_ok=True) pool_csv.to_csv(save_path)
[docs] def clean_pool(self) -> None: """ Screening of repeatedly assigned cells. Cells that are assigned to more than a configured threshold number of regions are removed from the integration pool. """ csv_paths = [ os.path.join(self.cfg.data_files['pool_s_path'], f"{region}.csv") for region in align_csv['brain_region'] ] align_dataframe = pd.concat([pd.read_csv(p,index_col=0) for p in csv_paths]) cell_counts = Counter(align_dataframe['cell_id']) over_assigned = [cell for cell, count in cell_counts.items() if count > self.cfg.screening_thre] filtered_df = align_dataframe[~align_dataframe['cell_id'].isin(over_assigned)] filtered_df.index = ( filtered_df['sample_region'] .str.split(',') .str[0] .str.replace(r"[(']", "", regex=True) ) final_df = filtered_df.iloc[:, :-3] final_df.columns = gene_filter final_df.rename_axis('index', inplace=True) self.align_dataframe = final_df.copy()
[docs] def weighted_average(self,brain_region): """ Perform weighted integration of cell expressions from the pool with samples. Parameters ---------- brain_region : str The brain region for which integration is performed. """ region_sample = sample_ex.loc[[brain_region]] region_pool = self.align_dataframe.loc[[brain_region]] pool_data = self.downsample_data(region_pool) if self.cfg.downsample else region_pool results = [] save_path = f"./{self.cfg.data_files['integrated_s_path']}/{brain_region}.csv" os.makedirs(os.path.dirname(save_path), exist_ok=True) for sample_idx, sample_exp in enumerate(region_sample.values): sample_norm = self._normalize(sample_exp) for cell_idx, cell_exp in enumerate(pool_data.values): cell_norm = self._normalize(cell_exp) integrated_name = f'{brain_region}_sample{sample_idx}_cell{cell_idx}' if stats.pearsonr(sample_norm, cell_norm)[0] > self.cfg.corr_threshold: weighted_avg = self.cfg.ahba_weight * sample_norm + self.cfg.sn_weight * cell_norm else: weighted_avg = sample_norm results.append(pd.Series(weighted_avg, name=integrated_name)) result_df = pd.concat(results, axis=1).T result_df.to_csv(save_path)
[docs] def downsample_data(self,region_pool): """ Downsample cells from the constructed pool. Parameters ---------- region_pool : pandas.DataFrame DataFrame of pooled cells for a brain region. Returns ------- pandas.DataFrame Downsampled pool DataFrame. """ if region_pool.values.shape[0] >= self.cfg.downsample_n: region_pool_sample = region_pool.sample(n=self.cfg.downsample_n) else: region_pool_sample = region_pool.copy() return region_pool_sample
def _normalize(self, x): x_min, x_max = np.min(x), np.max(x) return 2 * (x - x_min) / (x_max - x_min) - 1
[docs] def generate_integrated_anndata(self, k_=0): """ Generate integrated AnnData object and save as h5ad file. Reads the integrated data of all brain regions, optionally performs z-score normalization, shuffles samples, and stores the result as an h5ad file. Parameters ---------- k_ : int, optional Iteration index for output file naming (default is 0). """ integrated_files = [ f"./{self.cfg.data_files['integrated_s_path']}/{region}.csv" for region in align_csv['brain_region'] ] integrated_data = pd.concat([pd.read_csv(f,index_col=0) for f in tqdm(integrated_files, desc='Loading region data')]) if self.cfg.zscore: adata = anndata.AnnData(integrated_data.values) sc.pp.scale(adata, max_value=self.cfg.max_value) integrated_data = pd.DataFrame( adata.X, index=integrated_data.index, columns=integrated_data.columns ) shuffled_data = integrated_data.sample(frac=1) shuffled_index = shuffled_data.index integrated_adata = anndata.AnnData( X=shuffled_data.values, obs=pd.DataFrame({'region_index': shuffled_index}), var=pd.DataFrame({'Gene': gene_filter}) ) os.makedirs(self.cfg.data_files['integrated_s_path'], exist_ok=True) integrated_adata.write(os.path.join(self.cfg.data_files['integrated_s_path'], f'Integrated_dataset_{k_}.h5ad'))
if __name__=='__main__': global align_csv, ahba_mean_ex, sample_ex, h19_data, thre_df, gene_filter, config align_csv, ahba_mean_ex, sample_ex, h19_data, thre_df, gene_filter, config = load_data() # Initialize processor integrator = DataIntegrator(config=config) # Execute pipeline integrator.run()