{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark=SparkSession.builder.appName('Basics').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "df=spark.read.json('people.json')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-------+\n",
      "| age|   name|\n",
      "+----+-------+\n",
      "|null|Michael|\n",
      "|  30|   Andy|\n",
      "|  19| Justin|\n",
      "+----+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- age: long (nullable = true)\n",
      " |-- name: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['age', 'name']"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.columns\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[summary: string, age: string, name: string]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.describe()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+-------+\n",
      "|summary|               age|   name|\n",
      "+-------+------------------+-------+\n",
      "|  count|                 2|      3|\n",
      "|   mean|              24.5|   null|\n",
      "| stddev|7.7781745930520225|   null|\n",
      "|    min|                19|   Andy|\n",
      "|    max|                30|Michael|\n",
      "+-------+------------------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.describe().show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "if schema is not crt we can change it like if its integer or string"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import StructField,StringType,IntegerType,StructType"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_schema =[StructField('age',IntegerType(),True),\n",
    "             StructField('name',StringType(),True)]\n",
    "#field,datatype to change to , is there null present =True or false"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "final_struct=StructType(fields=data_schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    " df=spark.read.json('people.json',schema=final_struct)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- age: integer (nullable = true)\n",
      " |-- name: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Column<'age'>"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#  grab data from this like grab age vs selsct age\n",
    "df['age']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "pyspark.sql.column.Column"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(df['age'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "# that is a column 'Column' object is not callable..if i want dataframe we use select"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+\n",
      "| age|\n",
      "+----+\n",
      "|null|\n",
      "|  30|\n",
      "|  19|\n",
      "+----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select('age').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(age=None, name='Michael'), Row(age=30, name='Andy')]"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.head(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Row(age=None, name='Michael')"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.head(2)[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "# the above line is a row object"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-------+\n",
      "| age|   name|\n",
      "+----+-------+\n",
      "|null|Michael|\n",
      "|  30|   Andy|\n",
      "|  19| Justin|\n",
      "+----+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(['age','name']).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-------+------+\n",
      "| age|   name|newage|\n",
      "+----+-------+------+\n",
      "|null|Michael|  null|\n",
      "|  30|   Andy|    30|\n",
      "|  19| Justin|    19|\n",
      "+----+-------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# add a new column\n",
    "df.withColumn('newage',df['age']).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "# a copy of age will be new age\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-------+----------+\n",
      "| age|   name|double_age|\n",
      "+----+-------+----------+\n",
      "|null|Michael|      null|\n",
      "|  30|   Andy|        60|\n",
      "|  19| Justin|        38|\n",
      "+----+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.withColumn('double_age',df['age']*2).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-------+\n",
      "| age|   name|\n",
      "+----+-------+\n",
      "|null|Michael|\n",
      "|  30|   Andy|\n",
      "|  19| Justin|\n",
      "+----+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+-------+\n",
      "|my_new_age|   name|\n",
      "+----------+-------+\n",
      "|      null|Michael|\n",
      "|        30|   Andy|\n",
      "|        19| Justin|\n",
      "+----------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.withColumnRenamed('age','my_new_age').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [],
   "source": [
    "# pure sql to interact with the data frame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "# sql etmporary view"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.createOrReplaceTempView('people')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [],
   "source": [
    "results=spark.sql(\"SELECT * FROM people\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-------+\n",
      "| age|   name|\n",
      "+----+-------+\n",
      "|null|Michael|\n",
      "|  30|   Andy|\n",
      "|  19| Justin|\n",
      "+----+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "results.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "results=spark.sql(\"SELECT * FROM people WHERE age=30\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+----+\n",
      "|age|name|\n",
      "+---+----+\n",
      "| 30|Andy|\n",
      "+---+----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "results.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# new lesson on dataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark=SparkSession.builder.appName('ops').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [],
   "source": [
    "df=spark.read.csv('appl_stock.csv',inferSchema=True,header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Date: string (nullable = true)\n",
      " |-- Open: double (nullable = true)\n",
      " |-- High: double (nullable = true)\n",
      " |-- Low: double (nullable = true)\n",
      " |-- Close: double (nullable = true)\n",
      " |-- Volume: integer (nullable = true)\n",
      " |-- Adj Close: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Row(Date='2010-01-04', Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)"
      ]
     },
     "execution_count": 39,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|\n",
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|\n",
      "|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|\n",
      "|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|\n",
      "|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|\n",
      "|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|\n",
      "|2010-01-11|212.79999700000002|        213.000002|        208.450005|210.11000299999998|115557400|         27.221758|\n",
      "|2010-01-12|209.18999499999998|209.76999500000002|        206.419998|        207.720001|148614900|          26.91211|\n",
      "|2010-01-13|        207.870005|210.92999500000002|        204.099998|        210.650002|151473000|          27.29172|\n",
      "|2010-01-14|210.11000299999998|210.45999700000002|        209.020004|            209.43|108223500|         27.133657|\n",
      "|2010-01-15|210.92999500000002|211.59999700000003|        205.869999|            205.93|148516900|26.680197999999997|\n",
      "|2010-01-19|        208.330002|215.18999900000003|        207.240004|        215.039995|182501900|27.860484999999997|\n",
      "|2010-01-20|        214.910006|        215.549994|        209.500002|            211.73|153038200|         27.431644|\n",
      "|2010-01-21|        212.079994|213.30999599999998|        207.210003|        208.069996|152038600|         26.957455|\n",
      "|2010-01-22|206.78000600000001|        207.499996|            197.16|            197.75|220441900|         25.620401|\n",
      "|2010-01-25|202.51000200000001|        204.699999|        200.190002|        203.070002|266424900|26.309658000000002|\n",
      "|2010-01-26|205.95000100000001|        213.710005|        202.580004|        205.940001|466777500|         26.681494|\n",
      "|2010-01-27|        206.849995|            210.58|        199.530001|        207.880005|430642100|26.932840000000002|\n",
      "|2010-01-28|        204.930004|        205.500004|        198.699995|        199.289995|293375600|25.819922000000002|\n",
      "|2010-01-29|        201.079996|        202.199995|        190.250002|        192.060003|311488100|         24.883208|\n",
      "|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|\n",
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|\n",
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|\n",
      "|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|\n",
      "|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|\n",
      "|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|\n",
      "|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|\n",
      "|2010-01-11|212.79999700000002|        213.000002|        208.450005|210.11000299999998|115557400|         27.221758|\n",
      "|2010-01-12|209.18999499999998|209.76999500000002|        206.419998|        207.720001|148614900|          26.91211|\n",
      "|2010-01-13|        207.870005|210.92999500000002|        204.099998|        210.650002|151473000|          27.29172|\n",
      "|2010-01-14|210.11000299999998|210.45999700000002|        209.020004|            209.43|108223500|         27.133657|\n",
      "|2010-01-15|210.92999500000002|211.59999700000003|        205.869999|            205.93|148516900|26.680197999999997|\n",
      "|2010-01-19|        208.330002|215.18999900000003|        207.240004|        215.039995|182501900|27.860484999999997|\n",
      "|2010-01-20|        214.910006|        215.549994|        209.500002|            211.73|153038200|         27.431644|\n",
      "|2010-01-21|        212.079994|213.30999599999998|        207.210003|        208.069996|152038600|         26.957455|\n",
      "|2010-01-22|206.78000600000001|        207.499996|            197.16|            197.75|220441900|         25.620401|\n",
      "|2010-01-25|202.51000200000001|        204.699999|        200.190002|        203.070002|266424900|26.309658000000002|\n",
      "|2010-01-26|205.95000100000001|        213.710005|        202.580004|        205.940001|466777500|         26.681494|\n",
      "|2010-01-27|        206.849995|            210.58|        199.530001|        207.880005|430642100|26.932840000000002|\n",
      "|2010-01-28|        204.930004|        205.500004|        198.699995|        199.289995|293375600|25.819922000000002|\n",
      "|2010-01-29|        201.079996|        202.199995|        190.250002|        192.060003|311488100|         24.883208|\n",
      "|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|\n",
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter(\"Close < 500\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|              Open|\n",
      "+------------------+\n",
      "|        213.429998|\n",
      "|        214.599998|\n",
      "|        214.379993|\n",
      "|            211.75|\n",
      "|        210.299994|\n",
      "|212.79999700000002|\n",
      "|209.18999499999998|\n",
      "|        207.870005|\n",
      "|210.11000299999998|\n",
      "|210.92999500000002|\n",
      "|        208.330002|\n",
      "|        214.910006|\n",
      "|        212.079994|\n",
      "|206.78000600000001|\n",
      "|202.51000200000001|\n",
      "|205.95000100000001|\n",
      "|        206.849995|\n",
      "|        204.930004|\n",
      "|        201.079996|\n",
      "|192.36999699999998|\n",
      "+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter(\"Close < 500\").select('Open').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+------------------+\n",
      "|              Open|             Close|\n",
      "+------------------+------------------+\n",
      "|        213.429998|        214.009998|\n",
      "|        214.599998|        214.379993|\n",
      "|        214.379993|        210.969995|\n",
      "|            211.75|            210.58|\n",
      "|        210.299994|211.98000499999998|\n",
      "|212.79999700000002|210.11000299999998|\n",
      "|209.18999499999998|        207.720001|\n",
      "|        207.870005|        210.650002|\n",
      "|210.11000299999998|            209.43|\n",
      "|210.92999500000002|            205.93|\n",
      "|        208.330002|        215.039995|\n",
      "|        214.910006|            211.73|\n",
      "|        212.079994|        208.069996|\n",
      "|206.78000600000001|            197.75|\n",
      "|202.51000200000001|        203.070002|\n",
      "|205.95000100000001|        205.940001|\n",
      "|        206.849995|        207.880005|\n",
      "|        204.930004|        199.289995|\n",
      "|        201.079996|        192.060003|\n",
      "|192.36999699999998|        194.729998|\n",
      "+------------------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter(\"Close < 500\").select(['Open','Close']).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+\n",
      "|   Volume|\n",
      "+---------+\n",
      "|123432400|\n",
      "|150476200|\n",
      "|138040000|\n",
      "|119282800|\n",
      "|111902700|\n",
      "|115557400|\n",
      "|148614900|\n",
      "|151473000|\n",
      "|108223500|\n",
      "|148516900|\n",
      "|182501900|\n",
      "|153038200|\n",
      "|152038600|\n",
      "|220441900|\n",
      "|266424900|\n",
      "|466777500|\n",
      "|430642100|\n",
      "|293375600|\n",
      "|311488100|\n",
      "|187469100|\n",
      "+---------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter(df['Close'] <500).select('Volume').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+------------------+----------+----------+----------+---------+------------------+\n",
      "|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|\n",
      "+----------+------------------+----------+----------+----------+---------+------------------+\n",
      "|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|\n",
      "|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|\n",
      "|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|\n",
      "+----------+------------------+----------+----------+----------+---------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter((df['Close'] <200) & (df['Open']>200)).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|\n",
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|\n",
      "|2010-02-02|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|\n",
      "|2010-02-03|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|\n",
      "|2010-02-04|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|\n",
      "|2010-02-05|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.323710000000002|\n",
      "|2010-02-08|        195.690006|197.88000300000002|        193.999994|194.11999699999998|119567700|           25.1501|\n",
      "|2010-02-09|        196.419996|        197.499994|        194.749998|196.19000400000002|158221700|         25.418289|\n",
      "|2010-02-10|        195.889997|             196.6|            194.26|195.12000700000002| 92590400|          25.27966|\n",
      "|2010-02-11|        194.880001|        199.750006|194.05999599999998|        198.669994|137586400|         25.739595|\n",
      "|2010-02-23|        199.999998|        201.330002|        195.709993|        197.059998|143773700|         25.531005|\n",
      "|2014-06-09|         92.699997|         93.879997|             91.75|         93.699997| 75415000|         88.906324|\n",
      "|2014-06-10|         94.730003|         95.050003|             93.57|             94.25| 62777000|         89.428189|\n",
      "|2014-06-11|         94.129997|         94.760002|         93.470001|         93.860001| 45681000|         89.058142|\n",
      "|2014-06-12|         94.040001|         94.120003|         91.900002|         92.290001| 54749000|         87.568463|\n",
      "|2014-06-13|         92.199997|         92.440002|         90.879997|         91.279999| 54525000|         86.610132|\n",
      "|2014-06-16|         91.510002|             92.75|         91.449997|         92.199997| 35561000|         87.483064|\n",
      "|2014-06-17|         92.309998|         92.699997|         91.800003| 92.08000200000001| 29726000| 87.36920699999999|\n",
      "|2014-06-18|         92.269997|         92.290001|         91.349998|             92.18| 33514000|          87.46409|\n",
      "|2014-06-19|         92.290001|         92.300003|         91.339996|         91.860001| 35528000|         87.160461|\n",
      "|2014-06-20|         91.849998|         92.550003|         90.900002|         90.910004|100898000|         86.259066|\n",
      "+----------+------------------+------------------+------------------+------------------+---------+------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter((df['Close'] <200) & ~(df['Open']>200)).show()\n",
    "# ~ for not"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+------------------+----------+------+------+---------+---------+\n",
      "|      Date|              Open|      High|   Low| Close|   Volume|Adj Close|\n",
      "+----------+------------------+----------+------+------+---------+---------+\n",
      "|2010-01-22|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|\n",
      "+----------+------------------+----------+------+------+---------+---------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.filter(df['Low'] ==197.16).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(Date='2010-01-22', Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]"
      ]
     },
     "execution_count": 54,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.filter(df['Low'] ==197.16).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "metadata": {},
   "outputs": [],
   "source": [
    "result=df.filter(df['Low'] ==197.16).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 58,
   "metadata": {},
   "outputs": [],
   "source": [
    "row=result[0]\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 59,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'Date': '2010-01-22',\n",
       " 'Open': 206.78000600000001,\n",
       " 'High': 207.499996,\n",
       " 'Low': 197.16,\n",
       " 'Close': 197.75,\n",
       " 'Volume': 220441900,\n",
       " 'Adj Close': 25.620401}"
      ]
     },
     "execution_count": 59,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "row.asDict()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 60,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "220441900"
      ]
     },
     "execution_count": 60,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "row.asDict()['Volume']"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## next lesson"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 61,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark=SparkSession.builder.appName('aggs').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 62,
   "metadata": {},
   "outputs": [],
   "source": [
    "df=spark.read.csv('sales_info.csv',inferSchema=True,header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 63,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-------+-----+\n",
      "|Company| Person|Sales|\n",
      "+-------+-------+-----+\n",
      "|   GOOG|    Sam|200.0|\n",
      "|   GOOG|Charlie|120.0|\n",
      "|   GOOG|  Frank|340.0|\n",
      "|   MSFT|   Tina|600.0|\n",
      "|   MSFT|    Amy|124.0|\n",
      "|   MSFT|Vanessa|243.0|\n",
      "|     FB|   Carl|870.0|\n",
      "|     FB|  Sarah|350.0|\n",
      "|   APPL|   John|250.0|\n",
      "|   APPL|  Linda|130.0|\n",
      "|   APPL|   Mike|750.0|\n",
      "|   APPL|  Chris|350.0|\n",
      "+-------+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 64,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- Company: string (nullable = true)\n",
      " |-- Person: string (nullable = true)\n",
      " |-- Sales: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 65,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyspark.sql.group.GroupedData at 0x7f0762785250>"
      ]
     },
     "execution_count": 65,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.groupBy(\"Company\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 67,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----------------+\n",
      "|Company|       avg(Sales)|\n",
      "+-------+-----------------+\n",
      "|   APPL|            370.0|\n",
      "|   GOOG|            220.0|\n",
      "|     FB|            610.0|\n",
      "|   MSFT|322.3333333333333|\n",
      "+-------+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy(\"Company\").mean().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 68,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------+\n",
      "|Company|sum(Sales)|\n",
      "+-------+----------+\n",
      "|   APPL|    1480.0|\n",
      "|   GOOG|     660.0|\n",
      "|     FB|    1220.0|\n",
      "|   MSFT|     967.0|\n",
      "+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy(\"Company\").sum().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 69,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------+\n",
      "|Company|max(Sales)|\n",
      "+-------+----------+\n",
      "|   APPL|     750.0|\n",
      "|   GOOG|     340.0|\n",
      "|     FB|     870.0|\n",
      "|   MSFT|     600.0|\n",
      "+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy(\"Company\").max().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 70,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----+\n",
      "|Company|count|\n",
      "+-------+-----+\n",
      "|   APPL|    4|\n",
      "|   GOOG|    3|\n",
      "|     FB|    2|\n",
      "|   MSFT|    3|\n",
      "+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy(\"Company\").count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 71,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+\n",
      "|sum(Sales)|\n",
      "+----------+\n",
      "|    4327.0|\n",
      "+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.agg({'Sales':'sum'}).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 72,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df.agg({'column name':'function'}).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 73,
   "metadata": {},
   "outputs": [],
   "source": [
    "group_data=df.groupBy('Company')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 74,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+----------+\n",
      "|Company|max(Sales)|\n",
      "+-------+----------+\n",
      "|   APPL|     750.0|\n",
      "|   GOOG|     340.0|\n",
      "|     FB|     870.0|\n",
      "|   MSFT|     600.0|\n",
      "+-------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "group_data.agg({'Sales':'max'}).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 75,
   "metadata": {},
   "outputs": [],
   "source": [
    "# import functions form spark\n",
    "from pyspark.sql.functions import countDistinct,avg,stddev"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 76,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------------------+\n",
      "|count(DISTINCT Sales)|\n",
      "+---------------------+\n",
      "|                   11|\n",
      "+---------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(countDistinct('Sales')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 77,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+\n",
      "|       avg(Sales)|\n",
      "+-----------------+\n",
      "|360.5833333333333|\n",
      "+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(avg('Sales')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 78,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+\n",
      "|    Average Sales|\n",
      "+-----------------+\n",
      "|360.5833333333333|\n",
      "+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(avg('Sales').alias('Average Sales')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 79,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|stddev_samp(Sales)|\n",
      "+------------------+\n",
      "|250.08742410799007|\n",
      "+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.select(stddev('Sales')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 80,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import format_number"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 82,
   "metadata": {},
   "outputs": [],
   "source": [
    "sales_std =df.select(stddev('Sales').alias('std'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 83,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------------------+\n",
      "|format_number(std, 2)|\n",
      "+---------------------+\n",
      "|               250.09|\n",
      "+---------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sales_std.select(format_number('std',2)).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 85,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+\n",
      "|   STD|\n",
      "+------+\n",
      "|250.09|\n",
      "+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sales_std.select(format_number('std',2).alias('STD')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 86,
   "metadata": {},
   "outputs": [],
   "source": [
    "# sort by sales coumn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 87,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-------+-----+\n",
      "|Company| Person|Sales|\n",
      "+-------+-------+-----+\n",
      "|   GOOG|Charlie|120.0|\n",
      "|   MSFT|    Amy|124.0|\n",
      "|   APPL|  Linda|130.0|\n",
      "|   GOOG|    Sam|200.0|\n",
      "|   MSFT|Vanessa|243.0|\n",
      "|   APPL|   John|250.0|\n",
      "|   GOOG|  Frank|340.0|\n",
      "|     FB|  Sarah|350.0|\n",
      "|   APPL|  Chris|350.0|\n",
      "|   MSFT|   Tina|600.0|\n",
      "|   APPL|   Mike|750.0|\n",
      "|     FB|   Carl|870.0|\n",
      "+-------+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.orderBy(\"Sales\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 89,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-------+-----+\n",
      "|Company| Person|Sales|\n",
      "+-------+-------+-----+\n",
      "|     FB|   Carl|870.0|\n",
      "|   APPL|   Mike|750.0|\n",
      "|   MSFT|   Tina|600.0|\n",
      "|     FB|  Sarah|350.0|\n",
      "|   APPL|  Chris|350.0|\n",
      "|   GOOG|  Frank|340.0|\n",
      "|   APPL|   John|250.0|\n",
      "|   MSFT|Vanessa|243.0|\n",
      "|   GOOG|    Sam|200.0|\n",
      "|   APPL|  Linda|130.0|\n",
      "|   MSFT|    Amy|124.0|\n",
      "|   GOOG|Charlie|120.0|\n",
      "+-------+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.orderBy(df['Sales'].desc()).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}