품질관리(QAQC) 데이터 부트캠프(본캠프)

본캠프_20주차(월)_TIL(머신러닝 모델 최적화 & 코드 정리)

Wat_zy 2026. 1. 26. 09:56

✅ 오늘 한 것

최종 프로젝트


✏️ 오늘 배운 점

오늘은 머신러닝 모델 최적화를 진행하고 데이터 분석 과정에서 진행된 코드들을 정리하여 어떻게 데이터 분석 과정이 이루어졌는지 확인하는 과정을 가졌다.

 

라이브러리 설치 & 설정

라이브러리 설치
#!pip install transformers sentencepiece accelerate
#!pip install catboost
#!pip install koreanize_matplotlib
#!pip install pandas numpy tqdm

라이브러리 설정
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import koreanize_matplotlib
import seaborn as sns
import torch
import os
import gc
import joblib
from transformers import T5Tokenizer, T5EncoderModel
from tqdm.auto import t
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score,
    matthews_corrcoef, confusion_matrix
)
from catboost import CatBoostClassifier
from xgboost import XGBClassifier
from sklearn.preprocessing import OneHotEncoder

warnings.filterwarnings('ignore')

transformers, sentencepiece, accelerate: 거대 언어 모델(LLM)이나 단백질 언어 모델(PLM)을 다룰 때 세트로 사용되는 라이브러리

transformers: 단백질 서열을 수치화하는 데 사용하는 ESM-1b나 ProtBert, ProtT5 같은 모델을 로드하고 실행하는 핵심 도구

sentencepiece: 텍스트나 서열 데이터를 모델이 이해할 수 있는 작은 단위(Token)로 쪼개는 토크나이저 알고리즘

accelerate: 모델 학습이나 추론 시 GPU/TPU 자원을 효율적으로 사용하여 속도를 높여주는 도구

 

데이터 전처리

# 변수 설정
df= pd.read_csv('/content/train.csv')

# 예측에 활용되지 않는 컬럼 제거
cols_to_drop = [
    'number_of_tested', 'number_of_responses', 'reference_date', 'qualitative_label',
    'reference_journal', 'reference_title', 'reference_IRI', 'antigen_code'
]

df = df.drop(columns=cols_to_drop)

# 컬럼명 변경
columns = {'epitope_seq': 'epitope', 'antigen_seq': 'antigen', 'start_position': 'start',
           'end_position': 'end', 'assay_method_technique': 'method', 'assay_group': 'assay',
        'disease_type': 'disease', 'disease_state': 'state'}

df.rename(columns=columns, inplace=True)

# Antigen aa ≥ 16 전처리
df = df[df['antigen'].str.len() >= 16].copy()
df = df.reset_index(drop=True)

# 8 ≤ Epitope aa ≤ 16 전처리
df = df[df['epitope'].str.len().between(8, 16)].copy()
df = df.reset_index(drop=True)

# Epitope & Antigen 불일치 컬럼 제거
def is_match(row):
    return row['antigen'][int(row['start'])-1 : int(row['end'])] == row['epitope']

mismatch_ids = df[df.apply(is_match, axis=1) == False]['id'].tolist()

df = df[df.apply(is_match, axis=1)].reset_index(drop=True)

# 결측치 제거
df = df.dropna(subset=['state']).copy()

# 태깅
df = df[df["assay"].isin(["antibody binding", "qualitative binding"])].copy()

METHOD_GROUPS = {
    "Immunoassay": [
        "elisa", "western blot", "radio immuno assay", "ria",
        "immuno staining", "immunohistochemistry",
        "immunoprecipitation", "elispot", "binding assay"
    ],
    "High Throughput": [
        "microarray", "phage display", "high throughput"
    ]
}

def tag_method(method):
    m = method.lower()
    for group, keywords in METHOD_GROUPS.items():
        if any(k in m for k in keywords):
            return group
    return "Other"

DISEASE_GROUPS = {
    "Disease": [
        "occurrence of infectious disease",
        "occurrence of allergy",
        "occurrence of autoimmune disease",
        "occurrence of cancer",
        "occurrence of disease"
    ],
    "Healthy/Exposed": [
        "exposure without evidence for disease",
        "documented exposure",
        "environmental exposure"
    ],
    "Induced/Medical": [
        "administration in vivo",
        "transplant",
        "transfusion"
    ],
    "Naive": [
        "no immunization"
    ]
}

def tag_disease(disease):
    d = disease.lower()
    for group, keywords in DISEASE_GROUPS.items():
        if any(k in d for k in keywords):
            return group
    return "Other"

STATE_KEYWORDS = {
    "Allergy": [
        "allergy", "allergic", "hypersensitivity",
        "asthma", "rhinitis", "dermatitis"
    ],
    "Infectious": [
        "infection", "virus", "viral", "bacterial",
        "parasitic", "influenza", "hepatitis",
        "fever", "malaria", "chagas", "syndrome"
    ],
    "Autoimmune": [
        "autoimmune", "diabetes", "sclerosis",
        "lupus", "arthritis", "celiac"
    ],
    "Cancer": [
        "cancer", "carcinoma", "tumor",
        "melanoma", "leukemia", "lymphoma"
    ],
    "Healthy": [
        "healthy", "normal", "naive", "control", "donor"
    ]
}

def tag_state(text):
    t = text.lower()
    for group, keywords in STATE_KEYWORDS.items():
        if any(k in t for k in keywords):
            return group
    return "Other/Unknown"
    
df["method"] = df["method"].apply(tag_method)
df["disease"] = df["disease"].apply(tag_disease)
df["state"] = df["state"].apply(tag_state)

df = df[df["method"].isin(["Immunoassay", "High Throughput"])].copy()
df = df[df['state'] != 'Other/Unknown'].copy()
df = df[df['disease'] != 'Other'].copy()

df.reset_index(drop=True, inplace=True)

 

FASTA 파일 생성

def create_fasta_core(df):
    valid_rows = []
    
    for _, row in df.iterrows():
        seq = row['antigen']
        start, end = int(row['start']) - 1, int(row['end'])
        
        center = start + (end - start) // 2
        w_start, w_end = center - 8, center + 8
        
        if w_start < 0:
            w_start, w_end = 0, 16
        elif w_end > len(seq):
            w_end, w_start = len(seq), len(seq) - 16
            
        context_seq = seq[w_start:w_end]
        
        if len(context_seq) == 16:
            row['context_seq'] = context_seq
            valid_rows.append(row)

    df_final = pd.DataFrame(valid_rows).reset_index(drop=True)

    with open("epitopes.fasta", "w") as f_epi, open("antigens.fasta", "w") as f_ant:
        for _, row in df_final.iterrows():
            f_epi.write(f">{row['id']}\n{row['epitope']}\n")
            f_ant.write(f">{row['id']}\n{row['context_seq']}\n")

    return df_final

 

Metadata 파일 생성

def create_metadata_core(df):
    valid_rows = []
    
    for _, row in df.iterrows():
        seq = row['antigen']
        start, end = int(row['start']) - 1, int(row['end'])
        
        center = start + (end - start) // 2
        w_start, w_end = center - 8, center + 8
        
        if w_start < 0:
            w_start, w_end = 0, 16
        elif w_end > len(seq):
            w_end, w_start = len(seq), len(seq) - 16
            
        sliced = seq[w_start:w_end]
        
        if len(sliced) == 16:
            new_row = row.copy()
            new_row['antigen'] = sliced
            valid_rows.append(new_row)

    df_final = pd.DataFrame(valid_rows)
    target_cols = ['id', 'label', 'epitope', 'antigen', 'assay', 'method', 'disease', 'state']
    
    df_final = df_final[[c for c in target_cols if c in df_final.columns]].reset_index(drop=True)
    df_final.to_csv("train_metadata.csv", index=False)
    
    return df_final

 

ProtT5-XL-U50 임베딩

def extract_embeddings_core(fasta_file, save_path):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    tokenizer = T5Tokenizer.from_pretrained("Rostlab/prot_t5_xl_half_uniref50-enc", do_lower_case=False)
    model = T5EncoderModel.from_pretrained("Rostlab/prot_t5_xl_half_uniref50-enc").to(device)
    if device.type == 'cuda': model.half()
    model.eval()

    with open(fasta_file, "r") as f:
        seqs = [line.strip() for line in f if not line.startswith(">")]
    processed_seqs = [" ".join(list(seq.replace("U","X").replace("Z","X").replace("O","X").replace("B","X"))) for seq in seqs]

    all_embeddings = []
    batch_size = 128

    for i in range(0, len(processed_seqs), batch_size):
        batch = processed_seqs[i : i + batch_size]
        inputs = tokenizer.batch_encode_plus(batch, add_special_tokens=True, padding="longest", return_tensors="pt").to(device)
        
        with torch.no_grad():
            output = model(input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'])
            
        mask = inputs['attention_mask'].unsqueeze(-1)
        pooled = (output.last_hidden_state * mask).sum(dim=1) / mask.sum(dim=1)
        all_embeddings.append(pooled.float().cpu().numpy())

    np.save(save_path, np.vstack(all_embeddings))

 

최적의 임계값 탐색

X_antigen = np.load("embedding_antigen.npy").astype("float32")
df = pd.read_csv("train_metadata.csv")
y = df["label"].values.astype('int8')

cat_features = ['assay', 'method', 'state', 'disease']
X_cat = df[cat_features].astype(str)

X_tr_ant, X_te_ant, X_tr_cat, X_te_cat, y_tr, y_te = train_test_split(
    X_antigen, X_cat, y, test_size=0.2, stratify=y, random_state=42
)

X_train = pd.concat([pd.DataFrame(X_tr_ant), X_tr_cat.reset_index(drop=True)], axis=1)
X_test = pd.concat([pd.DataFrame(X_te_ant), X_te_cat.reset_index(drop=True)], axis=1)

del X_antigen, X_tr_ant, X_te_ant, X_tr_cat, X_te_cat; gc.collect()

final_params = {
    'iterations': 1000,
    'learning_rate': 0.1502244848675105,
    'depth': 7,
    'l2_leaf_reg': 9,
    'random_strength': 0.006377488070571219,
    'bagging_temperature': 0.815153925527,
    'cat_features': cat_features,
    'scale_pos_weight': 93.14303050576892,
    'random_state': 42,
    'verbose': 0,
    'early_stopping_rounds': 50,
    'allow_writing_files': False
}

model_cat = CatBoostClassifier(**final_params)
model_cat.fit(X_train, y_tr)

y_proba = model_cat.predict_proba(X_test)[:, 1]
thresholds = np.arange(0.01, 0.99, 0.001)

candidates = []
for th in thresholds:
    pred_temp = (y_proba >= th).astype(int)
    if np.sum(pred_temp) == 0: continue
    
    rec = recall_score(y_te, pred_temp)
    mcc = matthews_corrcoef(y_te, pred_temp)
    prec = precision_score(y_te, pred_temp, zero_division=0)
    
    if rec >= 0.80 and mcc >= 0.3:
        candidates.append((th, prec, rec, mcc))

if candidates:
    candidates.sort(key=lambda x: (x[1], x[2], x[3]), reverse=True) 
    best_th = candidates[0][0]
else:
    recs = [recall_score(y_te, (y_proba >= t).astype(int)) for t in thresholds]
    best_th = thresholds[np.argmin(np.abs(np.array(recs) - 0.80))]

y_pred = (y_proba >= best_th).astype(int)
tn, fp, fn, tp = confusion_matrix(y_te, y_pred).ravel()

print(f"✅ 최적 임계값: {best_th:.4f}")
print(f"✅ Recall: {recall_score(y_te, y_pred):.4f} / MCC: {matthews_corrcoef(y_te, y_pred):.4f}")
print(f"✅ Confusion Matrix: TP={tp}, FP={fp}, TN={tn}, FN={fn}")
xgb_params = {
    'n_estimators': 651,
    'learning_rate': 0.11315872880741529,
    'max_depth': 9,
    'min_child_weight': 9,
    'gamma': 0.9220645919247981,
    'subsample': 0.8254055947982024,
    'colsample_bytree': 0.7187242526421143,
    'reg_alpha': 0.38223013399106515,
    'reg_lambda': 2.9959267450953574e-05,
    'scale_pos_weight': 19.440386429589005,
    'tree_method': 'hist',
    'random_state': 42,
    'verbosity': 0,
    'n_jobs': -1
}

X_tr_ant, X_te_ant, X_tr_cat, X_te_cat, y_tr, y_te = train_test_split(
    X_antigen, X_cat, y, test_size=0.2, stratify=y, random_state=42
)

encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore", dtype=np.float32)
X_train = np.hstack([X_tr_ant, encoder.fit_transform(X_tr_cat)])
X_test = np.hstack([X_te_ant, encoder.transform(X_te_cat)])

del X_tr_ant, X_te_ant, X_tr_cat, X_te_cat; gc.collect()

model_xgb = XGBClassifier(**xgb_params)
model_xgb.fit(X_train, y_tr)

y_proba = model_xgb.predict_proba(X_test)[:, 1]
thresholds = np.arange(0.1, 0.99, 0.001)

candidates = []
for th in thresholds:
    pred_temp = (y_proba >= th).astype(int)
    if np.sum(pred_temp) == 0: continue
    
    prec = precision_score(y_te, pred_temp, zero_division=0)
    mcc = matthews_corrcoef(y_te, pred_temp)
    
    if prec >= 0.70 and mcc >= 0.30:
        candidates.append((th, prec, mcc))

if candidates:
    candidates.sort(key=lambda x: (x[2], x[1]), reverse=True)
    best_th = candidates[0][0]
else:
    precs = [precision_score(y_te, (y_proba >= t).astype(int), zero_division=0) for t in thresholds]
    best_th = thresholds[np.argmin(np.abs(np.array(precs) - 0.70))]

y_pred = (y_proba >= best_th).astype(int)
tn, fp, fn, tp = confusion_matrix(y_te, y_pred).ravel()

print(f"🎯 Best Threshold: {best_th:.4f}")
print(f"📊 Precision: {precision_score(y_te, y_pred):.4f} / MCC: {matthews_corrcoef(y_te, y_pred):.4f}")
print(f"📌 TP: {tp}, FP: {fp}, TN: {tn}, FN: {fn}")

 

단계별 모델 학습

import numpy as np
import pandas as pd
import gc
from catboost import CatBoostClassifier

X_antigen = np.load("embedding_antigen.npy").astype("float32")

df_train = pd.read_csv("train_metadata.csv")
y_train = df_train["label"].values.astype('int8')

cat_features = ['assay', 'method', 'state', 'disease']

X_train_final = pd.concat([
    pd.DataFrame(X_antigen), 
    df_train[cat_features].astype(str)
], axis=1)

X_train_final.columns = [str(col) for col in X_train_final.columns]

del X_antigen; gc.collect()

final_params = {
    'iterations': 1000,
    'learning_rate': 0.1502244848675105,
    'depth': 7,
    'l2_leaf_reg': 9,
    'random_strength': 0.006377488070571219,
    'bagging_temperature': 0.815153925527,
    'cat_features': cat_features,
    'scale_pos_weight': 93.14303050576892, 
    'random_state': 42,
    'verbose': 100,
    'early_stopping_rounds': 50,
    'allow_writing_files': False
}

print(f"🚀 Stage 1 CatBoost 학습 시작 (Antigen Only 모드 | 차원: {X_train_final.shape})")
model_s1_cat = CatBoostClassifier(**final_params)
model_s1_cat.fit(X_train_final, y_train)

print("✅ Stage 1 모델 학습 완료!")
X_antigen = np.load("embedding_antigen.npy").astype("float32")
df_train = pd.read_csv("train_metadata.csv")
y_train = df_train["label"].values.astype('int8')

target_features = ['assay', 'method', 'state', 'disease']
encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore", dtype=np.float32)
X_cat_encoded = encoder.fit_transform(df_train[target_features])

X_train_final = np.hstack([X_antigen, X_cat_encoded])

del X_antigen, X_cat_encoded; gc.collect()

xgb_params = {
    'n_estimators': 651,
    'learning_rate': 0.11315872880741529,
    'max_depth': 9,
    'min_child_weight': 9,
    'gamma': 0.9220645919247981,
    'subsample': 0.8254055947982024,
    'colsample_bytree': 0.7187242526421143,
    'reg_alpha': 0.38223013399106515,
    'reg_lambda': 2.9959267450953574e-05,
    'scale_pos_weight': 19.440386429589005,
    'tree_method': 'hist',
    'random_state': 42,
    'verbosity': 0,
    'n_jobs': -1
}

print(f"🚀 Stage 2 XGBoost 학습 시작 (차원: {X_train_final.shape})")
model_s2_xgb = XGBClassifier(**xgb_params)
model_s2_xgb.fit(X_train_final, y_train)

print("✅ Stage 2 모델 학습 완료!")

 

Test 데이터로 모델링 결과 확인

MODEL_DIR = "Antibody_Dashboard/models"
model_s1 = joblib.load(os.path.join(MODEL_DIR, "stage1_catboost.pkl"))
model_s2 = joblib.load(os.path.join(MODEL_DIR, "stage2_xgboost.pkl"))
enc_s2 = joblib.load(os.path.join(MODEL_DIR, "encoder_s2.pkl"))

THRESHOLD_S1, THRESHOLD_FINAL = 0.4620, 0.9570
W1, W2 = 0.4, 0.6

X_test_antigen = np.load("test_embedding_antigen.npy").astype("float32")
df_test = pd.read_csv("test_metadata.csv")

X_input_s1 = pd.concat([
    pd.DataFrame(X_test_antigen, columns=[str(i) for i in range(1024)]),
    df_test[['assay', 'method', 'state', 'disease']].reset_index(drop=True)
], axis=1)

probs_s1 = model_s1.predict_proba(X_input_s1)[:, 1]
pass_mask = probs_s1 >= THRESHOLD_S1

if pass_mask.any():
    df_passed = df_test[pass_mask].copy()
    X_ant_passed = X_test_antigen[pass_mask]
    
    X_cat_s2 = enc_s2.transform(df_passed[['assay', 'method', 'state', 'disease']])
    X_input_s2 = np.hstack([X_ant_passed, X_cat_s2])
    
    probs_s2 = model_s2.predict_proba(X_input_s2)[:, 1]
    
    final_scores = (probs_s1[pass_mask] * W1) + (probs_s2 * W2)
    df_passed['Final_Score'] = final_scores
    df_passed['S1_Prob'] = probs_s1[pass_mask]
    df_passed['S2_Prob'] = probs_s2

    top_5 = df_passed[final_scores >= THRESHOLD_FINAL].sort_values(by='Final_Score', ascending=False).head(5)

    print("\n" + "="*70)
    print(f"🏆 TOP 5 항체-항원 결합 예측 후보 (Threshold: {THRESHOLD_FINAL})")
    print("="*70)
    
    if not top_5.empty:
        display_cols = ['id', 'antigen_code', 'Final_Score', 'S1_Prob', 'S2_Prob', 'method', 'state']
        existing_cols = [c for c in display_cols if c in top_5.columns]
        
        top_5_display = top_5[existing_cols].reset_index(drop=True)
        top_5_display.index = top_5_display.index + 1
        
        print(top_5_display)
        print("="*70)
    else:
        print("⚠️ 설정한 임계값(THRESHOLD_FINAL)을 넘는 후보가 없습니다.")
        print("   임계값을 낮추거나 데이터를 다시 확인해 주세요.")
else:
    print("❌ Stage 1을 통과한 데이터가 하나도 없습니다.")

 

데이터 분석의 전반적인 과정을 정리하였고 아이디어로만 구현하고 있는 대시보드의 기능을 구현하기 위하여 하나하나 코드를 바꿔가고 추가하면서 수행하고 있다.

 


✏️ 오늘의 질문

1. 토크나이저 알고리즘이란?

컴퓨터는 숫자만 처리할 수 있기에 글자를 의미 있는 조각으로 나누고 각 조각에 고유한 번호를 매기는 과정이 필요하다.

여기서 토크나이저는 자연어나 단백질 서열 같은 데이터를 컴퓨터가 이해할 수 있는 작은 단위인 토크(Token)로 쪼개는 알고리즘이다.


📌추가로 해야 할 점

최종 프로젝트