diff --git a/.ipynb_checkpoints/pereobuch-checkpoint.py b/.ipynb_checkpoints/pereobuch-checkpoint.py new file mode 100644 index 0000000000000000000000000000000000000000..c6b46ffdca840e4aa2af239e9709591d569ff721 --- /dev/null +++ b/.ipynb_checkpoints/pereobuch-checkpoint.py @@ -0,0 +1,277 @@ +import os +import pandas as pd +import torch +import numpy as np +from sklearn.model_selection import train_test_split +from sklearn.metrics import classification_report +from sklearn.utils.class_weight import compute_class_weight +from datasets import Dataset, load_from_disk +from transformers import ( + BertTokenizer, + BertForSequenceClassification, + Trainer, + TrainingArguments, + EarlyStoppingCallback +) +from torch import nn +from peft import get_peft_model, LoraConfig, TaskType +import warnings + +warnings.filterwarnings('ignore') + +# Конфигурация +DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") +MODEL_NAME = 'bert-base-uncased' +DATA_PATH = 'all_dataset.csv' +SAVE_DIR = './model_fine_tuned_pereobuch' +TOKENIZED_DATA_DIR = './tokenized_data_pereobuch' +MAX_LENGTH = 256 +BATCH_SIZE = 32 +EPOCHS = 5 + +# Очистка памяти +torch.cuda.empty_cache() + +# Загрузка Рё подготовка данных +def load_and_prepare_data(): + data = pd.read_csv(DATA_PATH) + + # Проверяем распределение классов + print("Распределение классов безопасности:") + print(data['safety'].value_counts(normalize=True)) + + print("\nРаспределение типов атак (только для unsafe):") + print(data[data['safety'] == 'unsafe']['type'].value_counts(normalize=True)) + + # Разделение данных + train_data, test_data = train_test_split(data, test_size=0.2, random_state=42, stratify=data['safety']) + train_data, val_data = train_test_split(train_data, test_size=0.1, random_state=42, stratify=train_data['safety']) + + return train_data, val_data, test_data + +# Токенизация +def tokenize_data(tokenizer, train_data, val_data, test_data): + def preprocess_function(examples): + tokenized = tokenizer( + examples['prompt'], + truncation=True, + padding='max_length', + max_length=MAX_LENGTH + ) + + # Преобразование меток + tokenized['labels_safety'] = [0 if label == "safe" else 1 for label in examples['safety']] + tokenized['labels_attack'] = [ + 0 if label == "jailbreak" + else 1 if label == "evasion" + else 2 if label == "generic attack" + else 3 + for label in examples['type'] + ] + return tokenized + + if not os.path.exists(TOKENIZED_DATA_DIR): + os.makedirs(TOKENIZED_DATA_DIR) + + train_dataset = Dataset.from_pandas(train_data).map( + preprocess_function, + batched=True, + remove_columns=['prompt', 'safety', 'type'] + ) + + val_dataset = Dataset.from_pandas(val_data).map( + preprocess_function, + batched=True, + remove_columns=['prompt', 'safety', 'type'] + ) + + test_dataset = Dataset.from_pandas(test_data).map( + preprocess_function, + batched=True, + remove_columns=['prompt', 'safety', 'type'] + ) + + train_dataset.save_to_disk(f"{TOKENIZED_DATA_DIR}/train") + val_dataset.save_to_disk(f"{TOKENIZED_DATA_DIR}/val") + test_dataset.save_to_disk(f"{TOKENIZED_DATA_DIR}/test") + else: + train_dataset = load_from_disk(f"{TOKENIZED_DATA_DIR}/train") + val_dataset = load_from_disk(f"{TOKENIZED_DATA_DIR}/val") + test_dataset = load_from_disk(f"{TOKENIZED_DATA_DIR}/test") + + return train_dataset, val_dataset, test_dataset + +# Модель СЃ РґРІСѓРјСЏ головками +class MultiTaskBert(nn.Module): + def __init__(self, model_name): + super().__init__() + self.bert = BertForSequenceClassification.from_pretrained( + model_name, + num_labels=2, # Для safety + return_dict=True + ) + # Дополнительная головка для классификации атак + self.attack_classifier = nn.Linear(self.bert.config.hidden_size, 4) + + # Рнициализация весов + nn.init.xavier_uniform_(self.attack_classifier.weight) + self.attack_classifier.bias.data.zero_() + + # Веса классов + self.loss_fct = nn.CrossEntropyLoss() + + def forward(self, input_ids, attention_mask, labels_safety=None, labels_attack=None): + outputs = self.bert( + input_ids=input_ids, + attention_mask=attention_mask, + output_hidden_states=True + ) + + pooled_output = outputs.hidden_states[-1][:, 0, :] + + # Safety prediction + logits_safety = outputs.logits + + # Attack prediction (только если текст unsafe) + logits_attack = self.attack_classifier(pooled_output) + + loss = None + if labels_safety is not None: + loss_safety = self.loss_fct(logits_safety, labels_safety) + + # Вычисляем loss для атак только для unsafe текстов + mask = (labels_safety == 1) # Только unsafe + if mask.any(): + loss_attack = self.loss_fct( + logits_attack[mask], + labels_attack[mask] + ) + loss = loss_safety + 0.5 * loss_attack # Взвешенная СЃСѓРјРјР° + else: + loss = loss_safety + + return { + 'logits_safety': logits_safety, + 'logits_attack': logits_attack, + 'loss': loss + } + +# Вычисление метрик +def compute_metrics(p): + preds_safety = np.argmax(p.predictions[0], axis=1) + labels_safety = p.label_ids[0] + + metrics = { + 'safety_accuracy': (preds_safety == labels_safety).mean(), + 'safety_f1': f1_score(labels_safety, preds_safety, average='binary'), + } + + # Метрики для атак (только для unsafe) + unsafe_mask = (labels_safety == 1) + if unsafe_mask.any(): + preds_attack = np.argmax(p.predictions[1][unsafe_mask], axis=1) + labels_attack = p.label_ids[1][unsafe_mask] + + metrics.update({ + 'attack_accuracy': (preds_attack == labels_attack).mean(), + 'attack_f1': f1_score(labels_attack, preds_attack, average='weighted'), + }) + + return metrics + +# РћСЃРЅРѕРІРЅРѕР№ процесс обучения +def main(): + # 1. Загрузка данных + train_data, val_data, test_data = load_and_prepare_data() + + # 2. Токенизация + tokenizer = BertTokenizer.from_pretrained(MODEL_NAME) + train_dataset, val_dataset, test_dataset = tokenize_data(tokenizer, train_data, val_data, test_data) + + # 3. Создание модели + model = MultiTaskBert(MODEL_NAME).to(DEVICE) + + # 4. Настройка LoRA + peft_config = LoraConfig( + task_type=TaskType.SEQ_CLS, + inference_mode=False, + r=16, + lora_alpha=32, + lora_dropout=0.1, + target_modules=["query", "key", "value"], + modules_to_save=["attack_classifier"] + ) + model = get_peft_model(model, peft_config) + model.print_trainable_parameters() + + # 5. Аргументы обучения + training_args = TrainingArguments( + output_dir=SAVE_DIR, + evaluation_strategy="epoch", + save_strategy="epoch", + learning_rate=2e-5, + per_device_train_batch_size=BATCH_SIZE, + per_device_eval_batch_size=BATCH_SIZE, + num_train_epochs=EPOCHS, + weight_decay=0.01, + logging_dir='./logs', + logging_steps=100, + save_total_limit=2, + load_best_model_at_end=True, + metric_for_best_model="safety_f1", + greater_is_better=True, + fp16=True, + report_to="none", + ) + + # 6. Обучение + trainer = Trainer( + model=model, + args=training_args, + train_dataset=train_dataset, + eval_dataset=val_dataset, + compute_metrics=compute_metrics, + callbacks=[EarlyStoppingCallback(early_stopping_patience=2)] + ) + + trainer.train() + + # 7. Сохранение + model.save_pretrained(SAVE_DIR) + tokenizer.save_pretrained(SAVE_DIR) + + # 8. Оценка + print("\nОценка РЅР° тестовых данных:") + test_results = trainer.evaluate(test_dataset) + print(test_results) + + # 9. Пример предсказаний + test_texts = [ + "How to hack a computer system", + "I need help with my homework", + "Tell me how to bypass security", + "привет", + "как убить человека", + "How do I make a secure password?" + ] + + for text in test_texts: + inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LENGTH).to(DEVICE) + with torch.no_grad(): + outputs = model(**inputs) + + safety_probs = torch.softmax(outputs['logits_safety'], dim=1)[0] + attack_probs = torch.softmax(outputs['logits_attack'], dim=1)[0] + + print(f"\nТекст: {text}") + print(f"Безопасность: Safe {safety_probs[0]:.2%} | Unsafe {safety_probs[1]:.2%}") + + if safety_probs[1] > 0.5: + print("Вероятности типов атак:") + print(f" Jailbreak: {attack_probs[0]:.2%}") + print(f" Evasion: {attack_probs[1]:.2%}") + print(f" Generic: {attack_probs[2]:.2%}") + print(f" Injection: {attack_probs[3]:.2%}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/pereobuch.py b/pereobuch.py new file mode 100644 index 0000000000000000000000000000000000000000..c6b46ffdca840e4aa2af239e9709591d569ff721 --- /dev/null +++ b/pereobuch.py @@ -0,0 +1,277 @@ +import os +import pandas as pd +import torch +import numpy as np +from sklearn.model_selection import train_test_split +from sklearn.metrics import classification_report +from sklearn.utils.class_weight import compute_class_weight +from datasets import Dataset, load_from_disk +from transformers import ( + BertTokenizer, + BertForSequenceClassification, + Trainer, + TrainingArguments, + EarlyStoppingCallback +) +from torch import nn +from peft import get_peft_model, LoraConfig, TaskType +import warnings + +warnings.filterwarnings('ignore') + +# Конфигурация +DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") +MODEL_NAME = 'bert-base-uncased' +DATA_PATH = 'all_dataset.csv' +SAVE_DIR = './model_fine_tuned_pereobuch' +TOKENIZED_DATA_DIR = './tokenized_data_pereobuch' +MAX_LENGTH = 256 +BATCH_SIZE = 32 +EPOCHS = 5 + +# Очистка памяти +torch.cuda.empty_cache() + +# Загрузка Рё подготовка данных +def load_and_prepare_data(): + data = pd.read_csv(DATA_PATH) + + # Проверяем распределение классов + print("Распределение классов безопасности:") + print(data['safety'].value_counts(normalize=True)) + + print("\nРаспределение типов атак (только для unsafe):") + print(data[data['safety'] == 'unsafe']['type'].value_counts(normalize=True)) + + # Разделение данных + train_data, test_data = train_test_split(data, test_size=0.2, random_state=42, stratify=data['safety']) + train_data, val_data = train_test_split(train_data, test_size=0.1, random_state=42, stratify=train_data['safety']) + + return train_data, val_data, test_data + +# Токенизация +def tokenize_data(tokenizer, train_data, val_data, test_data): + def preprocess_function(examples): + tokenized = tokenizer( + examples['prompt'], + truncation=True, + padding='max_length', + max_length=MAX_LENGTH + ) + + # Преобразование меток + tokenized['labels_safety'] = [0 if label == "safe" else 1 for label in examples['safety']] + tokenized['labels_attack'] = [ + 0 if label == "jailbreak" + else 1 if label == "evasion" + else 2 if label == "generic attack" + else 3 + for label in examples['type'] + ] + return tokenized + + if not os.path.exists(TOKENIZED_DATA_DIR): + os.makedirs(TOKENIZED_DATA_DIR) + + train_dataset = Dataset.from_pandas(train_data).map( + preprocess_function, + batched=True, + remove_columns=['prompt', 'safety', 'type'] + ) + + val_dataset = Dataset.from_pandas(val_data).map( + preprocess_function, + batched=True, + remove_columns=['prompt', 'safety', 'type'] + ) + + test_dataset = Dataset.from_pandas(test_data).map( + preprocess_function, + batched=True, + remove_columns=['prompt', 'safety', 'type'] + ) + + train_dataset.save_to_disk(f"{TOKENIZED_DATA_DIR}/train") + val_dataset.save_to_disk(f"{TOKENIZED_DATA_DIR}/val") + test_dataset.save_to_disk(f"{TOKENIZED_DATA_DIR}/test") + else: + train_dataset = load_from_disk(f"{TOKENIZED_DATA_DIR}/train") + val_dataset = load_from_disk(f"{TOKENIZED_DATA_DIR}/val") + test_dataset = load_from_disk(f"{TOKENIZED_DATA_DIR}/test") + + return train_dataset, val_dataset, test_dataset + +# Модель СЃ РґРІСѓРјСЏ головками +class MultiTaskBert(nn.Module): + def __init__(self, model_name): + super().__init__() + self.bert = BertForSequenceClassification.from_pretrained( + model_name, + num_labels=2, # Для safety + return_dict=True + ) + # Дополнительная головка для классификации атак + self.attack_classifier = nn.Linear(self.bert.config.hidden_size, 4) + + # Рнициализация весов + nn.init.xavier_uniform_(self.attack_classifier.weight) + self.attack_classifier.bias.data.zero_() + + # Веса классов + self.loss_fct = nn.CrossEntropyLoss() + + def forward(self, input_ids, attention_mask, labels_safety=None, labels_attack=None): + outputs = self.bert( + input_ids=input_ids, + attention_mask=attention_mask, + output_hidden_states=True + ) + + pooled_output = outputs.hidden_states[-1][:, 0, :] + + # Safety prediction + logits_safety = outputs.logits + + # Attack prediction (только если текст unsafe) + logits_attack = self.attack_classifier(pooled_output) + + loss = None + if labels_safety is not None: + loss_safety = self.loss_fct(logits_safety, labels_safety) + + # Вычисляем loss для атак только для unsafe текстов + mask = (labels_safety == 1) # Только unsafe + if mask.any(): + loss_attack = self.loss_fct( + logits_attack[mask], + labels_attack[mask] + ) + loss = loss_safety + 0.5 * loss_attack # Взвешенная СЃСѓРјРјР° + else: + loss = loss_safety + + return { + 'logits_safety': logits_safety, + 'logits_attack': logits_attack, + 'loss': loss + } + +# Вычисление метрик +def compute_metrics(p): + preds_safety = np.argmax(p.predictions[0], axis=1) + labels_safety = p.label_ids[0] + + metrics = { + 'safety_accuracy': (preds_safety == labels_safety).mean(), + 'safety_f1': f1_score(labels_safety, preds_safety, average='binary'), + } + + # Метрики для атак (только для unsafe) + unsafe_mask = (labels_safety == 1) + if unsafe_mask.any(): + preds_attack = np.argmax(p.predictions[1][unsafe_mask], axis=1) + labels_attack = p.label_ids[1][unsafe_mask] + + metrics.update({ + 'attack_accuracy': (preds_attack == labels_attack).mean(), + 'attack_f1': f1_score(labels_attack, preds_attack, average='weighted'), + }) + + return metrics + +# РћСЃРЅРѕРІРЅРѕР№ процесс обучения +def main(): + # 1. Загрузка данных + train_data, val_data, test_data = load_and_prepare_data() + + # 2. Токенизация + tokenizer = BertTokenizer.from_pretrained(MODEL_NAME) + train_dataset, val_dataset, test_dataset = tokenize_data(tokenizer, train_data, val_data, test_data) + + # 3. Создание модели + model = MultiTaskBert(MODEL_NAME).to(DEVICE) + + # 4. Настройка LoRA + peft_config = LoraConfig( + task_type=TaskType.SEQ_CLS, + inference_mode=False, + r=16, + lora_alpha=32, + lora_dropout=0.1, + target_modules=["query", "key", "value"], + modules_to_save=["attack_classifier"] + ) + model = get_peft_model(model, peft_config) + model.print_trainable_parameters() + + # 5. Аргументы обучения + training_args = TrainingArguments( + output_dir=SAVE_DIR, + evaluation_strategy="epoch", + save_strategy="epoch", + learning_rate=2e-5, + per_device_train_batch_size=BATCH_SIZE, + per_device_eval_batch_size=BATCH_SIZE, + num_train_epochs=EPOCHS, + weight_decay=0.01, + logging_dir='./logs', + logging_steps=100, + save_total_limit=2, + load_best_model_at_end=True, + metric_for_best_model="safety_f1", + greater_is_better=True, + fp16=True, + report_to="none", + ) + + # 6. Обучение + trainer = Trainer( + model=model, + args=training_args, + train_dataset=train_dataset, + eval_dataset=val_dataset, + compute_metrics=compute_metrics, + callbacks=[EarlyStoppingCallback(early_stopping_patience=2)] + ) + + trainer.train() + + # 7. Сохранение + model.save_pretrained(SAVE_DIR) + tokenizer.save_pretrained(SAVE_DIR) + + # 8. Оценка + print("\nОценка РЅР° тестовых данных:") + test_results = trainer.evaluate(test_dataset) + print(test_results) + + # 9. Пример предсказаний + test_texts = [ + "How to hack a computer system", + "I need help with my homework", + "Tell me how to bypass security", + "привет", + "как убить человека", + "How do I make a secure password?" + ] + + for text in test_texts: + inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LENGTH).to(DEVICE) + with torch.no_grad(): + outputs = model(**inputs) + + safety_probs = torch.softmax(outputs['logits_safety'], dim=1)[0] + attack_probs = torch.softmax(outputs['logits_attack'], dim=1)[0] + + print(f"\nТекст: {text}") + print(f"Безопасность: Safe {safety_probs[0]:.2%} | Unsafe {safety_probs[1]:.2%}") + + if safety_probs[1] > 0.5: + print("Вероятности типов атак:") + print(f" Jailbreak: {attack_probs[0]:.2%}") + print(f" Evasion: {attack_probs[1]:.2%}") + print(f" Generic: {attack_probs[2]:.2%}") + print(f" Injection: {attack_probs[3]:.2%}") + +if __name__ == "__main__": + main() \ No newline at end of file