{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "fa72488b",
   "metadata": {},
   "source": [
    "启用Spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c5d540ee",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession \n",
    "from pyspark import SparkConf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "533b447f-7d54-4c0b-bf34-eab0db0dd8d3",
   "metadata": {},
   "outputs": [],
   "source": [
    "conf = SparkConf().setAppName(\"CustomerLoss\").setMaster(\"yarn\")\n",
    "conf.set(\"spark.driver.bindAddress\", \"0.0.0.0\")\n",
    "conf.set(\"spark.executor.memory\", \"1g\")\n",
    "conf.set(\"spark.executor.cores\", \"2\")\n",
    "conf.set(\"spark.executor.instances\", \"1\")\n",
    "conf.set(\"spark.driver.memory\", \"1g\")\n",
    "spark = SparkSession.builder.config(conf = conf).getOrCreate() \n",
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "43aa6c6a",
   "metadata": {},
   "source": [
    "数据准备"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f8db9054",
   "metadata": {},
   "outputs": [],
   "source": [
    "!hdfs dfs -mkdir -p /user/mqmrx/data\n",
    "!hdfs dfs -put data/userlostprob_train.txt /user/mqmrx/data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bb44dc8c",
   "metadata": {},
   "outputs": [],
   "source": [
    "file = '/user/mqmrx/data/userlostprob_train.txt'\n",
    "df = spark.read.csv(file, header=True, sep='\\t', inferSchema=True, nullValue='NULL')\n",
    "df.createOrReplaceTempView('ctripTable')\n",
    "df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "46566696",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.first()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ffaa1bee",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "05bdad69",
   "metadata": {},
   "source": [
    "数据探索"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "936c1377",
   "metadata": {},
   "outputs": [],
   "source": [
    "sql = 'select label, count(*) as cnt from ctripTable group by label order by label'\n",
    "df_label = spark.sql(sql).toPandas()\n",
    "df_label"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9b9e9a0d",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from matplotlib import pyplot as plt\n",
    "\n",
    "os.makedirs('figures', exist_ok=True)\n",
    "plt.figure(figsize=(5, 4), dpi=300)\n",
    "plt.bar(['Stay(0)', 'Loss(1)'], df_label['cnt'], color=['#3498db', '#e74c3c'])\n",
    "for i, v in enumerate(df_label['cnt']):\n",
    "    plt.text(i, v, str(v), ha='center', va='bottom')\n",
    "plt.title('Label Distribution')\n",
    "plt.savefig('figures/label_dist.png')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "13d40eb6",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "from pyspark.sql.functions import col, sum as _sum, when\n",
    "\n",
    "total = df.count()\n",
    "miss = df.select([\n",
    "    (_sum(when(col(c).isNull(), 1).otherwise(0)) * 100.0 / total).alias(c)\n",
    "    for c in df.columns\n",
    "]).first().asDict()\n",
    "miss_s = pd.Series(miss).sort_values(ascending=False)\n",
    "miss_s.head(15).round(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4165ccd2",
   "metadata": {},
   "source": [
    "数据清洗"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "99c47b9b",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler\n",
    "\n",
    "drop_cols = ['sampleid', 'd', 'arrival']\n",
    "df_clean = df.drop(*drop_cols)\n",
    "feat_cols = [c for c in df_clean.columns if c != 'label']\n",
    "\n",
    "imputer = Imputer(inputCols=feat_cols, outputCols=feat_cols, strategy='mean')\n",
    "assembler = VectorAssembler(inputCols=feat_cols, outputCol='features_raw', handleInvalid='skip')\n",
    "scaler = StandardScaler(inputCol='features_raw', outputCol='features', withMean=True, withStd=True)\n",
    "\n",
    "prep = Pipeline(stages=[imputer, assembler, scaler])\n",
    "prep_model = prep.fit(df_clean)\n",
    "df_ready = prep_model.transform(df_clean).select('label', 'features')\n",
    "df_ready.show(3, truncate=80)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "24ed36de",
   "metadata": {},
   "source": [
    "拆分样本"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2d08d4d4",
   "metadata": {},
   "outputs": [],
   "source": [
    "train, test = df_ready.randomSplit([0.8, 0.2], seed=42)\n",
    "train.cache()\n",
    "test.cache()\n",
    "print('训练集:', train.count(), '测试集:', test.count())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2af98108",
   "metadata": {},
   "source": [
    "逻辑回归"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "de3c59f1",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator\n",
    "\n",
    "eval_acc = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')\n",
    "eval_auc = BinaryClassificationEvaluator(labelCol='label', metricName='areaUnderROC')\n",
    "\n",
    "lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=20)\n",
    "lr_model = lr.fit(train)\n",
    "lr_pred = lr_model.transform(test)\n",
    "print('LR Accuracy:', round(eval_acc.evaluate(lr_pred), 4))\n",
    "print('LR AUC:', round(eval_auc.evaluate(lr_pred), 4))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2e3c9015",
   "metadata": {},
   "outputs": [],
   "source": [
    "随机森林"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4ddc6da1",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import RandomForestClassifier\n",
    "\n",
    "rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=50, maxDepth=8, seed=42)\n",
    "rf_model = rf.fit(train)\n",
    "rf_pred = rf_model.transform(test)\n",
    "print('RF Accuracy:', round(eval_acc.evaluate(rf_pred), 4))\n",
    "print('RF AUC:', round(eval_auc.evaluate(rf_pred), 4))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a1fbd009",
   "metadata": {},
   "source": [
    "决策树"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fb0a5ed1",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import DecisionTreeClassifier\n",
    "\n",
    "dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=8, seed=42)\n",
    "dt_model = dt.fit(train)\n",
    "dt_pred = dt_model.transform(test)\n",
    "print('DT Accuracy:', round(eval_acc.evaluate(dt_pred), 4))\n",
    "print('DT AUC:', round(eval_auc.evaluate(dt_pred), 4))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "290fe1aa",
   "metadata": {},
   "source": [
    "模型对比"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1593f2a5",
   "metadata": {},
   "outputs": [],
   "source": [
    "results = pd.DataFrame({\n",
    "    'Model': ['LR', 'RF', 'DT'],\n",
    "    'Accuracy': [eval_acc.evaluate(lr_pred), eval_acc.evaluate(rf_pred), eval_acc.evaluate(dt_pred)],\n",
    "    'AUC': [eval_auc.evaluate(lr_pred), eval_auc.evaluate(rf_pred), eval_auc.evaluate(dt_pred)]\n",
    "}).round(4)\n",
    "print(results)\n",
    "\n",
    "plt.figure(figsize=(6, 4), dpi=300)\n",
    "x = range(len(results))\n",
    "w = 0.3\n",
    "plt.bar([i - w/2 for i in x], results['Accuracy'], w, label='Accuracy')\n",
    "plt.bar([i + w/2 for i in x], results['AUC'], w, label='AUC')\n",
    "plt.xticks(list(x), results['Model'])\n",
    "plt.ylim(0, 1)\n",
    "plt.title('Model Comparison')\n",
    "plt.legend()\n",
    "plt.savefig('figures/model_compare.png')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "76f86719",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.13.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
