The Team

Manikya Bardhan - PES2201800351
Rishab Kashyap - PES2201800065

Introduction

Everyday people perform various computational processes which demand a lot of online resources throughout the day, month or the year. The amount of data produced per day is massive, and it is important to know the exact usage of resources through this time to make it available to the users as an when required, thereby improving the efficiency. In the following project, we make use of certain dev-ops output data and analyze the variation in the performance throughout the day by performing a time series analysis on the data using spark RDDs. Apache Spark is a unified analytical engine used for big data and machine learning purposes developed at UC Berkeley in 2009. They also founded Databricks from Azure in 2013, which we use here to execute our program.

A peek in to the Dataset

The data is obtained from AWS CloudWatch, and is present in parquet format.

A sample of our dataset is shown below. It consists of 12 unique columns:

  • Index: The basic numbering of all the values from 0
  • Provider: The source from where the resources are provided, in this case we use that of Amazon Web Services
  • Service_Name: The type of service that is being requested for
  • Account_ID: The account identity from where the request for the resource came.
  • Region: Represents the regional host/ zone from where the resources are transported
  • Resource_ID: Unique ID given to every resource sent to the user.
  • Asset_ID: The unique identity given to the asset that complements the resource.
  • Timestamp: This shows the time of arrival of dispatched resources.
  • Value: The amount of it that has been provided.
  • Unit: The unit of the number present in value column, for eg. (Percent, Count, Byte etc.).
  • Metric_Name: Identifies the metric for which a particular value is mentioned.
  • Statistic: Denotes how the data was collected, for eg. (count, sum, average etc.).

Architecture

Overview

The whole project has two phases:

  • Training Phase
  • Production Phase

In the Training Phase, we read the data from parquet format stored in databricks file system, which is used to train the facebook prophet model. This model is used for forcasting the metric values of different cloud resources.

In the Production Phase, we use structured streaming to get the latest data, and use it with our model. This can be used to further train the model, or make future forecasts.

Inside spark, we train the model, and make plots to visualize the forecasts.

After this, the forecasts are finally stored

DBFS

Databricks File System (DBFS) is a distributed file system mounted into an Azure Databricks workspace and available on Azure Databricks clusters. DBFS is an abstraction on top of scalable object storage and offers the following benefits:

  • Allows you to mount storage objects so that you can seamlessly access data without requiring credentials.
  • Allows you to interact with object storage using directory and file semantics instead of storage URLs.
  • Persists files to object storage, so you won’t lose data after you terminate a cluster.

As mentioned above, we have used dbfs for initially storing our files.

Environment and Cluster details

The project has been done on Azure Databricks, which allows us to set up Apache Spark™ environment in minutes, autoscale and collaborate on shared projects in an interactive workspace. Azure Databricks even supports Python, Scala, sql etc.

The cluster contains 1 Driver, and 2 Workers as shown above.

Importing required libraries

import numpy as np
import pandas as pd
from fbprophet import Prophet
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, struct
from pyspark.sql.types import FloatType, StructField, StructType, StringType, TimestampType
from sklearn.metrics import mean_squared_error
from pathlib import Path
Importing plotly failed. Interactive plots will not work.

Defining path where data is stored.

path = Path("/FileStore/tables/")
file_name = "i_0a1c7dc126cb6ac8b_CPUUtilization.csv"
import pyarrow
pyarrow.__version__
Out[2]: '0.14.1'

Defining Helper Functions

Retrieves the data from path and cleans it.

def retrieve_data(file_name):
    """Load sample data from ./data/original-input.csv as a pyspark.sql.DataFrame."""
#     df = (spark.read
#           .option("header", "true")
#           .option("inferSchema", value=True)
#           .csv("./data/input.csv"))
    df = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(file_name)

    # Drop any null values incase they exist
    df = df.dropna()

    # Rename timestamp to ds and total to y for fbprophet
    df = df.select(
        df['timestamp'].alias('ds'),
        df['service_name'],
        df['value'].cast(FloatType()).alias('y'),
        df['metric_name']
    )

    return df

Retrieves the data as a stream from path.

def retrieve_datastream(file_name):
  schema = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(file_name).schema

  streamingInputDF = (
    spark
      .readStream
      .schema(schema)
      .option("header", "true").format("csv").load(file_name))


  streamingCountsDF = (
    streamingInputDF
      .groupBy(
        streamingInputDF.value,
        window(streamingInputDF.timestamp, "1 hour"))
      .count()
  )
  

Function to convert each of the sql rows into a Python dictionary. We append each element to a single dictionalry and convert the final output into a pandas dataframe

def transform_data(row):
    """Transform data from pyspark.sql.Row to python dict to be used in rdd."""
    data = row['data']
    app = row['service_name']
    mt = row['metric_name']

    # Transform [pyspark.sql.Dataframe.Row] -> [dict]
    data_dicts = []
    for d in data:
        data_dicts.append(d.asDict())

    # Convert into pandas dataframe for fbprophet
    data = pd.DataFrame(data_dicts)
    data['ds'] = pd.to_datetime(data['ds'])

    return {
        'service_name': app,
        'metric_name': mt,
        'data': data
    }

Function used to split the data into training dataset and a testing dataset. The reference used to split would be the value of the timestamp. We initialize a 'max_datetime' variable to set a threshold for the split.

def partition_data(d):
    """Split data into training and testing based on timestamp."""
    # Extract data from pd.Dataframe
    data = d['data']

    # Find max timestamp and extract timestamp for start of day
    max_datetime = pd.to_datetime(max(data['ds']))
#     start_datetime = max_datetime.replace(hour=00, minute=00, second=00)
    start_datetime = '10/10/20 00:00'
    # Extract training data
    train_data = data[data['ds'] < start_datetime]

    # Account for zeros in data while still applying uniform transform
#     train_data['y'] = train_data['y'].apply(lambda x: np.log(x + 1))

    # Assign train/test split
    d['test_data'] = data.loc[(data['ds'] >= start_datetime)
                              & (data['ds'] <= max_datetime)]
    d['train_data'] = train_data

    return d

Function that is used to create a prophet model out if the data we input into it. Prophet takes in certain arguments as shown below and outputs a time series analysis for the same.

def create_model(d):
    """Create Prophet model using each input grid parameter set."""
    m = Prophet()
    d['model'] = m

    return d

Once we have the Prophet model initialized we can fit the training dataset from our previous function and train the model.

def train_model(d):
    """Fit the model using the training data."""
    model = d['model']
    train_data = d['train_data']
    model.fit(train_data)
    d['model'] = model

    return d

Function used to test if the trained model is able to perform smoothly and forecast the right values for the same.

def test_model(d):
    """Run the forecast method on the model to make future predictions."""
    test_data = d['test_data']
    model = d['model']
    t = test_data['ds']
    t = pd.DataFrame(t)
    t.columns = ['ds']

    predictions = model.predict(t)
    d['predictions'] = predictions

    return d

Function to forecast the data values upto a certain defined threshold.

def make_forecast(d):
    """Execute the forecast method on the model to make future predictions."""
    model = d['model']
    future = model.make_future_dataframe(
        periods=576, freq='5min', include_history=False)
    future = pd.DataFrame(future['ds'].apply(pd.DateOffset(1)))
    forecast = model.predict(future)
    d['forecast'] = forecast
    

    return d

Function used to normalize all the values of our data.

def normalize_predictions(d):
    """Normalize predictions using np.exp()."""
    predictions = d['predictions']
#     predictions['yhat'] = np.exp(predictions['yhat']) - 1
    d['predictions'] = predictions
    return d

Function used to normalize the values of our forecasted data. As mentioned below, since certain values tend to infinity, we replace these values with None using lambda functions.

def normalize_forecast(d):
    """Normalize predictions using np.exp().
    Note:  np.exp(>709.782) = inf, so replace value with None
    """
    forecast = d['forecast']
#     forecast['yhat'] = forecast['yhat'].apply(
#         lambda x: np.exp(x) - 1 if x < 709.782 else None)
#     forecast['yhat_lower'] = forecast['yhat_lower'].apply(
#         lambda x: np.exp(x) - 1 if x < 709.782 else None)
#     forecast['yhat_upper'] = forecast['yhat_upper'].apply(
#         lambda x: np.exp(x) - 1 if x < 709.782 else None)
    d['forecast'] = forecast
    return d

Function used to calculate the mean squared error of the given test data with respect to the predicted outcomes.

def calc_error(d):
    """Calculate error using mse (mean squared error)."""
    test_data = d['test_data']
    predictions = d['predictions']
    results = mean_squared_error(test_data['y'], predictions['yhat'])
    d['mse'] = results

    return d

Function used to return a table of all the previously calculated values of our forecast and our mean square errors.

def reduce_data_scope(d):
    """Return a tuple (service_name + , + metric_type, {})."""
    return (
        d['service_name'] + ',' + d['metric_name'],
        {
            'forecast': d['forecast'],
            'mse': d['mse'],
        },
    )

Function to Flatten rdd into tuple which will be converted into a dataframe.Row. Checks each float to see if it is a np datatype, since it could be None. If it is an np datatype then it will convert to scalar python datatype so that it can be persisted into a database, since most dont know how to interpret np python datatypes.

def expand_predictions(d):
    """Flatten rdd into tuple which will be converted into a dataframe.Row.
    Checks each float to see if it is a np datatype, since it could be None.
    If it is an np datatype then it will convert to scalar python datatype
    so that it can be persisted into a database, since most dont know how to
    interpret np python datatypes.
    """
    service_metric, data = d
    service, metric = service_metric.split(',')
    return [
        (
            service,
            metric,
            p['ds'].to_pydatetime(),
            np.asscalar(p['yhat']) if isinstance(
                p['yhat'], np.generic) else p['yhat'],
            np.asscalar(p['yhat_lower']) if isinstance(
                p['yhat_lower'], np.generic) else p['yhat_lower'],
            np.asscalar(p['yhat_upper']) if isinstance(
                p['yhat_upper'], np.generic) else p['yhat_upper'],
            np.asscalar(data['mse']) if isinstance(
                data['mse'], np.generic) else data['mse'],
        ) for i, p in data['forecast'].iterrows()
    ]
file_name = "/FileStore/tables/i_0a1c7dc126cb6ac8b_CPUUtilization.csv"

Reading the csv file from the databricks file system (dbfs).

df1 = spark.read.format("csv").load(file_name, header="true", inferSchema="true")

The below function registers a table to make it accessible via SQL content as shown below.

df1.createOrReplaceTempView("data_ec2")
%sql
select timestamp, value from data_ec2 where metric_name = 'CPUUtilization';

df1
Out[32]: DataFrame[_c0: int, provider: string, service_name: string, account_id: bigint, region: string, resource_id: string, asset_id: string, timestamp: string, value: double, unit: string, metric_name: string, statistic: string]

Training the model

Initializing the spark configuration and spark session.

conf = (SparkConf()
        .setMaster("local[*]")
        .setAppName("EC2 Training"))

spark = (SparkSession
         .builder
         .config(conf=conf)
         .getOrCreate())
sc = spark.sparkContext
sc.setLogLevel("INFO")

Retrieve data from dbfs

df = retrieve_data(file_name)

Group data by app and metric_type to aggregate data for each app-metric combo.

df = df.groupBy('service_name', 'metric_name')
df = df.agg(collect_list(struct('ds', 'y')).alias('data'))

Using the previously defined functions with lambda functions to execute the RDDs and provide our prophet model.

df = (df.rdd
      .map(lambda r: transform_data(r))
      .map(lambda d: partition_data(d))
      # prophet cant handle data with < 2 training examples
      .filter(lambda d: len(d['train_data']) > 2)
      .map(lambda d: create_model(d))
      .map(lambda d: train_model(d))
      .map(lambda d: test_model(d))
      .map(lambda d: make_forecast(d))
      .map(lambda d: normalize_forecast(d))
      .map(lambda d: normalize_predictions(d))
      .map(lambda d: calc_error(d))
      .map(lambda d: reduce_data_scope(d))
      .flatMap(lambda d: expand_predictions(d)))

Defining the schema of the rdd.

schema = StructType([
    StructField("service_name", StringType(), True),
    StructField("metric_name", StringType(), True),
    StructField("ds", TimestampType(), True),
    StructField("yhat", FloatType(), True),
    StructField("yhat_lower", FloatType(), True),
    StructField("yhat_upper", FloatType(), True),
    StructField("mse", FloatType(), True)
])
df = spark.createDataFrame(df, schema)
df
Out[46]: DataFrame[service_name: string, metric_name: string, ds: timestamp, yhat: float, yhat_lower: float, yhat_upper: float, mse: float]
%sql
select ds, yhat from forecasts

df.show(n=5)

df.createOrReplaceTempView("forecasts")

We create a temporary variable to store the pandas dataframe and slice to show the the output.

temp = df.toPandas()
/databricks/spark/python/pyspark/sql/pandas/conversion.py:88: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below: PyArrow >= 0.15.1 must be installed; however, your version was 0.14.1. Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. warnings.warn(msg)
temp[:30]
service_name metric_name ds yhat yhat_lower yhat_upper mse
0 ec2 CPUUtilization 2020-10-11 00:00:00 4.619425 2.152688 7.399229 14.689586
1 ec2 CPUUtilization 2020-10-11 00:05:00 4.819132 2.049825 7.313528 14.689586
2 ec2 CPUUtilization 2020-10-11 00:10:00 5.025560 2.346648 7.663989 14.689586
3 ec2 CPUUtilization 2020-10-11 00:15:00 5.238224 2.600470 8.062807 14.689586
4 ec2 CPUUtilization 2020-10-11 00:20:00 5.456621 2.759909 8.302505 14.689586
5 ec2 CPUUtilization 2020-10-11 00:25:00 5.680228 3.041043 8.384025 14.689586
6 ec2 CPUUtilization 2020-10-11 00:30:00 5.908505 3.247892 8.670483 14.689586
7 ec2 CPUUtilization 2020-10-11 00:35:00 6.140900 3.661118 8.664343 14.689586
8 ec2 CPUUtilization 2020-10-11 00:40:00 6.376847 3.758875 8.888179 14.689586
9 ec2 CPUUtilization 2020-10-11 00:45:00 6.615771 3.907142 9.169059 14.689586
10 ec2 CPUUtilization 2020-10-11 00:50:00 6.857092 4.183938 9.378720 14.689586
11 ec2 CPUUtilization 2020-10-11 00:55:00 7.100224 4.212289 9.687737 14.689586
12 ec2 CPUUtilization 2020-10-11 01:00:00 7.344578 4.710319 10.216467 14.689586
13 ec2 CPUUtilization 2020-10-11 01:05:00 7.589569 5.146057 10.505490 14.689586
14 ec2 CPUUtilization 2020-10-11 01:10:00 7.834610 5.086944 10.616975 14.689586
15 ec2 CPUUtilization 2020-10-11 01:15:00 8.079126 5.313694 10.592289 14.689586
16 ec2 CPUUtilization 2020-10-11 01:20:00 8.322544 5.868225 11.002398 14.689586
17 ec2 CPUUtilization 2020-10-11 01:25:00 8.564303 5.880727 11.178074 14.689586
18 ec2 CPUUtilization 2020-10-11 01:30:00 8.803857 6.183046 11.552999 14.689586
19 ec2 CPUUtilization 2020-10-11 01:35:00 9.040670 6.286131 11.792228 14.689586
20 ec2 CPUUtilization 2020-10-11 01:40:00 9.274226 6.631486 11.928005 14.689586
21 ec2 CPUUtilization 2020-10-11 01:45:00 9.504026 6.949830 12.192855 14.689586
22 ec2 CPUUtilization 2020-10-11 01:50:00 9.729593 7.086974 12.337980 14.689586
23 ec2 CPUUtilization 2020-10-11 01:55:00 9.950470 7.122755 12.677289 14.689586
24 ec2 CPUUtilization 2020-10-11 02:00:00 10.166224 7.448644 12.650644 14.689586
25 ec2 CPUUtilization 2020-10-11 02:05:00 10.376447 7.896529 13.116122 14.689586
26 ec2 CPUUtilization 2020-10-11 02:10:00 10.580760 7.970045 13.270639 14.689586
27 ec2 CPUUtilization 2020-10-11 02:15:00 10.778807 8.094746 13.579990 14.689586
28 ec2 CPUUtilization 2020-10-11 02:20:00 10.970263 8.249172 13.512366 14.689586
29 ec2 CPUUtilization 2020-10-11 02:25:00 11.154834 8.614250 13.950434 14.689586

Writing the output forecasts to a parquet file stored in dbfs.

df.write.options(header=True).parquet(f'{file_name}_output.parquet', mode='overwrite')

Streaming data

The input path has been defined where the new data gets added. Structured streaming has been performed for testing our model.

from pyspark.sql.types import *
from pyspark.sql.functions import *

inputPath = "/FileStore/tables/"
name = "i_0a1c7dc126cb6ac8b_CPUUtilization.csv"

schema = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(inputPath+name).schema

streamingInputDF = (
  spark
    .readStream
    .schema(schema)
    .option("header", "true").format("csv").load(inputPath+'*.csv'))


streamingCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.value,
      window(streamingInputDF.timestamp, "1 hour"))
    .count()
)
query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only)
    .queryName("count")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

%sql select value, window, count from count