Source code for bioneuralnet.downstream_task.dpmon

import os
import re
import logging
import statistics
import tempfile
from typing import Optional, List

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pandas as pd
import numpy as np
import networkx as nx
from torch_geometric.data import Data
from torch_geometric.transforms import RandomNodeSplit
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from ray import train
from ray.train import Checkpoint

from bioneuralnet.network_embedding import GCN, GAT, SAGE, GIN

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

class DPMON:
    def __init__(
        self,
        adjacency_matrix: pd.DataFrame,
        omics_list: List[pd.DataFrame],
        phenotype_data: pd.DataFrame,  
        clinical_data: pd.DataFrame,  
        model: str='GCN',
        gnn_hidden_dim: int = 16,
        layer_num: int = 5,
        nn_hidden_dim1: int = 8,
        nn_hidden_dim2: int = 8,
        epoch_num: int = 5,
        repeat_num: int = 5,
        lr: float = 0.01,
        weight_decay: float = 1e-4,
        tune: bool = False,
        gpu: bool = False,
        cuda: int = 0,
        output_dir: Optional[str] = None
    ):
        if clinical_data is None or clinical_data.empty:
            raise ValueError("Clinical data (features_file) is required and cannot be empty.")

        if phenotype_data is None or phenotype_data.empty or 'finalgold_visit' not in phenotype_data.columns:
            raise ValueError("Phenotype data must contain 'finalgold_visit' and cannot be empty.")

        if not omics_list or any(df.empty for df in omics_list):
            raise ValueError("All provided omics data files must be non-empty.")

        if adjacency_matrix is None or adjacency_matrix.empty:
            raise ValueError("Adjacency matrix is required and cannot be empty.")

        self.adjacency_matrix = adjacency_matrix
        self.omics_list = omics_list
        self.phenotype_data = phenotype_data
        self.clinical_data = clinical_data
        self.model = model
        self.gnn_hidden_dim = gnn_hidden_dim
        self.layer_num = layer_num
        self.nn_hidden_dim1 = nn_hidden_dim1
        self.nn_hidden_dim2 = nn_hidden_dim2
        self.epoch_num = epoch_num
        self.repeat_num = repeat_num
        self.lr = lr
        self.weight_decay = weight_decay
        self.tune = tune
        self.gpu = gpu
        self.cuda = cuda
        self.output_dir = output_dir if output_dir else f"dpmon_output_{os.getpid()}"

        os.makedirs(self.output_dir, exist_ok=True)
        logger.info("Initialized DPMON with the provided parameters.")


[docs] def run(self) -> pd.DataFrame: """ Execute the DPMON pipeline for disease prediction. **Steps:** 1. **Combining Omics and Phenotype Data**: - Merges the provided omics datasets and ensures that the phenotype (`finalgold_visit`) column is included. 2. **Tuning or Training**: - **Tuning**: If `tune=True`, performs hyperparameter tuning using Ray Tune and returns an empty DataFrame. - **Training**: If `tune=False`, runs standard training to generate predictions. 3. **Predictions**: - If training is performed, returns a DataFrame of predictions with 'Actual' and 'Predicted' columns. **Returns**: pd.DataFrame - If `tune=False`, a DataFrame containing disease phenotype predictions for each sample. - If `tune=True`, returns an empty DataFrame since no predictions are generated. **Raises**: - **ValueError**: If the input data is improperly formatted or missing. - **Exception**: For any unforeseen errors encountered during preprocessing, tuning, or training. **Notes**: - DPMON relies on internally-generated embeddings (via GNNs), node correlations, and a downstream neural network. - Ensure that the adjacency matrix and omics data are properly aligned and that clinical/phenotype data match the sample indices. **Example**: .. code-block:: python dpmon = DPMON(adjacency_matrix, omics_list, phenotype_data, clinical_data, model='GCN') predictions = dpmon.run() print(predictions.head()) """ logger.info("Starting DPMON run.") dpmon_params = { 'model': self.model, 'gnn_hidden_dim': self.gnn_hidden_dim, 'layer_num': self.layer_num, 'nn_hidden_dim1': self.nn_hidden_dim1, 'nn_hidden_dim2': self.nn_hidden_dim2, 'epoch_num': self.epoch_num, 'repeat_num': self.repeat_num, 'lr': self.lr, 'weight_decay': self.weight_decay, 'gpu': self.gpu, 'cuda': self.cuda, 'tune': self.tune } combined_omics = pd.concat(self.omics_list, axis=1) if 'finalgold_visit' not in combined_omics.columns: combined_omics = combined_omics.merge(self.phenotype_data[['finalgold_visit']], left_index=True, right_index=True) if self.tune: logger.info("Running hyperparameter tuning for DPMON.") run_hyperparameter_tuning( dpmon_params, self.adjacency_matrix, combined_omics, self.clinical_data ) return pd.DataFrame() logger.info("Running standard training for DPMON.") predictions = run_standard_training( dpmon_params, self.adjacency_matrix, combined_omics, self.clinical_data, output_dir=self.output_dir ) logger.info("DPMON run completed.") return predictions
def setup_device(gpu, cuda): if gpu: os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ['CUDA_VISIBLE_DEVICES'] = str(cuda) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') if torch.cuda.is_available(): logger.info(f'Using GPU {cuda}') else: logger.warning(f'GPU {cuda} requested but not available, using CPU') else: device = torch.device('cpu') logger.info('Using CPU') return device def slice_omics_datasets(omics_dataset: pd.DataFrame, adjacency_matrix: pd.DataFrame) -> List[pd.DataFrame]: logger.info("Slicing omics dataset based on network nodes.") omics_network_nodes_names = adjacency_matrix.index.tolist() # Clean omics dataset columns clean_columns = [] for node in omics_dataset.columns: node_clean = re.sub(r'[^0-9a-zA-Z_]', '.', node) if not node_clean[0].isalpha(): node_clean = 'X' + node_clean clean_columns.append(node_clean) omics_dataset.columns = clean_columns missing_nodes = set(omics_network_nodes_names) - set(omics_dataset.columns) if missing_nodes: logger.error(f"Nodes missing in omics data: {missing_nodes}") raise ValueError("Missing nodes in omics dataset.") selected_columns = omics_network_nodes_names + ['finalgold_visit'] return [omics_dataset[selected_columns]] def build_omics_networks_tg(adjacency_matrix: pd.DataFrame, omics_datasets: List[pd.DataFrame], clinical_data: pd.DataFrame) -> List[Data]: logger.info("Building PyTorch Geometric Data object from adjacency matrix.") omics_network_nodes_names = adjacency_matrix.index.tolist() G = nx.from_pandas_adjacency(adjacency_matrix) node_mapping = {node_name: idx for idx, node_name in enumerate(omics_network_nodes_names)} G = nx.relabel_nodes(G, node_mapping) num_nodes = len(node_mapping) logger.info(f"Number of nodes in network: {num_nodes}") if clinical_data is not None and not clinical_data.empty: clinical_vars = clinical_data.columns.tolist() logger.info(f"Using clinical vars for node features: {clinical_vars}") omics_dataset = omics_datasets[0] missing_nodes = set(omics_network_nodes_names) - set(omics_dataset.columns) if missing_nodes: raise ValueError("Missing nodes for correlation computation.") node_features = [] for node_name in omics_network_nodes_names: correlations = [] for var in clinical_vars: corr_value = abs(omics_dataset[node_name].corr(clinical_data[var].astype('float64'))) correlations.append(corr_value) node_features.append(correlations) x = torch.tensor(node_features, dtype=torch.float) else: x = torch.randn((num_nodes, 10), dtype=torch.float) logger.info("No clinical data provided or empty. Using random features.") edge_index = torch.tensor(list(G.edges()), dtype=torch.long).t().contiguous() edge_index = torch.cat([edge_index, edge_index.flip(0)], dim=1) edge_weight = torch.tensor([ data.get('weight', 1.0) for _, _, data in G.edges(data=True) ], dtype=torch.float) edge_weight = torch.cat([edge_weight, edge_weight], dim=0) data = Data(x=x, edge_index=edge_index, edge_attr=edge_weight) num_val_nodes = 10 if num_nodes >= 30 else 5 num_test_nodes = 12 if num_nodes >= 30 else 5 transform = RandomNodeSplit(num_val=num_val_nodes, num_test=num_test_nodes) data = transform(data) return [data] def run_standard_training(dpmon_params, adjacency_matrix, combined_omics, clinical_data, output_dir): device = setup_device(dpmon_params['gpu'], dpmon_params['cuda']) omics_dataset = slice_omics_datasets(combined_omics, adjacency_matrix) omics_networks_tg = build_omics_networks_tg(adjacency_matrix, omics_dataset, clinical_data) accuracies = [] last_predictions_df = pd.DataFrame() for omics_data, omics_network in zip(omics_dataset, omics_networks_tg): for i in range(dpmon_params['repeat_num']): logger.info(f"Training iteration {i+1}/{dpmon_params['repeat_num']}") model = NeuralNetwork( model_type=dpmon_params['model'], gnn_input_dim=omics_network.x.shape[1], gnn_hidden_dim=dpmon_params['gnn_hidden_dim'], gnn_layer_num=dpmon_params['layer_num'], ae_encoding_dim=1, nn_input_dim=omics_data.drop(['finalgold_visit'], axis=1).shape[1], nn_hidden_dim1=dpmon_params['nn_hidden_dim1'], nn_hidden_dim2=dpmon_params['nn_hidden_dim2'], nn_output_dim=omics_data['finalgold_visit'].nunique() ).to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=dpmon_params['lr'], weight_decay=dpmon_params['weight_decay']) train_features = torch.FloatTensor(omics_data.drop(['finalgold_visit'], axis=1).values).to(device) train_labels = { 'labels': torch.LongTensor(omics_data['finalgold_visit'].values.copy()).to(device), 'omics_network': omics_network.to(device) } accuracy = train_model(model, criterion, optimizer, train_features, train_labels, dpmon_params['epoch_num']) accuracies.append(accuracy) model_path = os.path.join(output_dir, f'dpm_model_iter_{i+1}.pth') torch.save(model.state_dict(), model_path) logger.info(f"Model saved to {model_path}") model.eval() with torch.no_grad(): predictions, _ = model(train_features, omics_network.to(device)) _, predicted = torch.max(predictions, 1) predictions_path = os.path.join(output_dir, f'predictions_iter_{i+1}.csv') predictions_df = pd.DataFrame({ 'Actual': omics_data['finalgold_visit'].values, 'Predicted': predicted.cpu().numpy() }) predictions_df.to_csv(predictions_path, index=False) logger.info(f"Predictions saved to {predictions_path}") last_predictions_df = predictions_df if accuracies: max_accuracy = max(accuracies) avg_accuracy = sum(accuracies) / len(accuracies) std_accuracy = statistics.stdev(accuracies) if len(accuracies) > 1 else 0.0 logger.info(f"Best Accuracy: {max_accuracy:.4f}") logger.info(f"Average Accuracy: {avg_accuracy:.4f}") logger.info(f"Standard Deviation of Accuracy: {std_accuracy:.4f}") return last_predictions_df def run_hyperparameter_tuning(dpmon_params, adjacency_matrix, combined_omics, clinical_data): device = setup_device(dpmon_params['gpu'], dpmon_params['cuda']) omics_dataset = slice_omics_datasets(combined_omics, adjacency_matrix) omics_networks_tg = build_omics_networks_tg(adjacency_matrix, omics_dataset, clinical_data) pipeline_configs = { 'gnn_layer_num': tune.choice([2, 4, 8, 16, 32, 64, 128]), 'gnn_hidden_dim': tune.choice([4, 8, 16, 32, 64, 128]), 'lr': tune.loguniform(1e-4, 1e-1), 'weight_decay': tune.loguniform(1e-4, 1e-1), 'nn_hidden_dim1': tune.choice([4, 8, 16, 32, 64, 128]), 'nn_hidden_dim2': tune.choice([4, 8, 16, 32, 64, 128]), 'num_epochs': tune.choice([2, 16, 64, 512, 1024, 4096, 8192]), } reporter = CLIReporter(metric_columns=["loss", "accuracy", "training_iteration"]) scheduler = ASHAScheduler(metric="loss", mode="min", grace_period=10, reduction_factor=2) gpu_resources = 1 if dpmon_params['gpu'] else 0 for omics_data, omics_network_tg in zip(omics_dataset, omics_networks_tg): os.environ['TUNE_DISABLE_STRICT_METRIC_CHECKING'] = '1' logger.info(f"Starting hyperparameter tuning for dataset shape: {omics_data.shape}") def tune_train_n(config): model = NeuralNetwork( model_type=dpmon_params['model'], gnn_input_dim=omics_network_tg.x.shape[1], gnn_hidden_dim=config['gnn_hidden_dim'], gnn_layer_num=config['gnn_layer_num'], ae_encoding_dim=1, nn_input_dim=omics_data.drop(['finalgold_visit'], axis=1).shape[1], nn_hidden_dim1=config['nn_hidden_dim1'], nn_hidden_dim2=config['nn_hidden_dim2'], nn_output_dim=omics_data['finalgold_visit'].nunique() ).to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=config['lr'], weight_decay=config['weight_decay']) train_features = torch.FloatTensor(omics_data.drop(['finalgold_visit'], axis=1).values).to(device) train_labels = { 'labels': torch.LongTensor(omics_data['finalgold_visit'].values.copy()).to(device), 'omics_network': omics_network_tg.to(device) } for epoch in range(config['num_epochs']): model.train() optimizer.zero_grad() outputs, _ = model(train_features, train_labels['omics_network']) loss = criterion(outputs, train_labels['labels']) loss.backward() optimizer.step() _, predicted = torch.max(outputs, 1) total = train_labels['labels'].size(0) correct = (predicted == train_labels['labels']).sum().item() accuracy = correct / total metrics = {"loss": loss.item(), "accuracy": accuracy} with tempfile.TemporaryDirectory() as tempdir: torch.save( {"epoch": epoch, "model_state": model.state_dict()}, os.path.join(tempdir, "checkpoint.pt"), ) train.report(metrics=metrics, checkpoint=Checkpoint.from_directory(tempdir)) model.eval() with torch.no_grad(): outputs, _ = model(train_features, train_labels['omics_network']) loss = criterion(outputs, train_labels['labels']) _, predicted = torch.max(outputs, 1) total = train_labels['labels'].size(0) correct = (predicted == train_labels['labels']).sum().item() accuracy = correct / total metrics = {"loss": loss.item(), "accuracy": accuracy} with tempfile.TemporaryDirectory() as tempdir: torch.save( {"epoch": config['num_epochs'], "model_state": model.state_dict()}, os.path.join(tempdir, "checkpoint.pt"), ) train.report(metrics=metrics, checkpoint=Checkpoint.from_directory(tempdir)) result = tune.run( tune_train_n, resources_per_trial={"cpu": 2, "gpu": gpu_resources}, config=pipeline_configs, num_samples=10, scheduler=scheduler, name='Hyperparameter_Tuning', progress_reporter=reporter, keep_checkpoints_num=1, checkpoint_score_attr='min-loss' ) best_trial = result.get_best_trial("loss", "min", "last") logger.info("Best trial config: {}".format(best_trial.config)) logger.info("Best trial final loss: {}".format(best_trial.last_result["loss"])) logger.info("Best trial final accuracy: {}".format(best_trial.last_result["accuracy"])) def train_model(model, criterion, optimizer, train_data, train_labels, epoch_num): model.train() for epoch in range(epoch_num): optimizer.zero_grad() outputs, _ = model(train_data, train_labels['omics_network']) loss = criterion(outputs, train_labels['labels']) loss.backward() optimizer.step() if (epoch + 1) % 10 == 0 or epoch == 0: logger.info(f"Epoch [{epoch+1}/{epoch_num}], Loss: {loss.item():.4f}") model.eval() with torch.no_grad(): predictions, _ = model(train_data, train_labels['omics_network']) _, predicted = torch.max(predictions, 1) accuracy = (predicted == train_labels['labels']).sum().item() / len(train_labels['labels']) logger.info(f"Training Accuracy: {accuracy:.4f}") return accuracy class NeuralNetwork(nn.Module): def __init__(self, model_type, gnn_input_dim, gnn_hidden_dim, gnn_layer_num, ae_encoding_dim, nn_input_dim, nn_hidden_dim1, nn_hidden_dim2, nn_output_dim): super(NeuralNetwork, self).__init__() if model_type == 'GCN': self.gnn = GCN(input_dim=gnn_input_dim, hidden_dim=gnn_hidden_dim, layer_num=gnn_layer_num) elif model_type == 'GAT': self.gnn = GAT(input_dim=gnn_input_dim, hidden_dim=gnn_hidden_dim, layer_num=gnn_layer_num) elif model_type == 'SAGE': self.gnn = SAGE(input_dim=gnn_input_dim, hidden_dim=gnn_hidden_dim, output_dim=gnn_hidden_dim, layer_num=gnn_layer_num) elif model_type == 'GIN': self.gnn = GIN(input_dim=gnn_input_dim, hidden_dim=gnn_hidden_dim, output_dim=gnn_hidden_dim, layer_num=gnn_layer_num) else: raise ValueError(f"Unsupported GNN model type: {model_type}") self.autoencoder = Autoencoder(input_dim=gnn_hidden_dim, encoding_dim=ae_encoding_dim) self.dim_averaging = DimensionAveraging() self.predictor = DownstreamTaskNN(nn_input_dim, nn_hidden_dim1, nn_hidden_dim2, nn_output_dim) def forward(self, omics_dataset, omics_network_tg): omics_network_nodes_embedding = self.gnn(omics_network_tg) omics_network_nodes_embedding_ae = self.autoencoder(omics_network_nodes_embedding) omics_network_nodes_embedding_avg = self.dim_averaging(omics_network_nodes_embedding_ae) omics_dataset_with_embeddings = torch.mul( omics_dataset, omics_network_nodes_embedding_avg.expand(omics_dataset.shape[1], omics_dataset.shape[0]).t() ) predictions = self.predictor(omics_dataset_with_embeddings) return predictions, omics_dataset_with_embeddings class Autoencoder(nn.Module): def __init__(self, input_dim, encoding_dim): super(Autoencoder, self).__init__() self.encoder = nn.Sequential( nn.Linear(input_dim, 8), nn.ReLU(), nn.Linear(8, 4), nn.ReLU(), nn.Linear(4, encoding_dim) ) self.decoder = nn.Sequential( nn.Linear(encoding_dim, 4), nn.ReLU(), nn.Linear(4, 8), nn.ReLU(), nn.Linear(8, input_dim) ) def forward(self, x): x = self.encoder(x) return x class DimensionAveraging(nn.Module): def __init__(self): super(DimensionAveraging, self).__init__() def forward(self, x): return torch.mean(x, dim=1, keepdim=True) class DownstreamTaskNN(nn.Module): def __init__(self, input_size, hidden_dim1, hidden_dim2, output_dim): super(DownstreamTaskNN, self).__init__() self.fc1 = nn.Linear(input_size, hidden_dim1) self.bn1 = nn.BatchNorm1d(hidden_dim1) self.relu1 = nn.ReLU() self.fc2 = nn.Linear(hidden_dim1, hidden_dim2) self.bn2 = nn.BatchNorm1d(hidden_dim2) self.relu2 = nn.ReLU() self.fc3 = nn.Linear(hidden_dim2, output_dim) self.softmax = nn.Softmax(dim=1) def forward(self, x): x = self.fc1(x) x = self.bn1(x) x = self.relu1(x) x = self.fc2(x) x = self.bn2(x) x = self.relu2(x) x = self.fc3(x) x = self.softmax(x) return x