AIOps with Spark
Forecasting metrics values for Cloud resources, to find anomalies, and optimize costs
- The Team
- Introduction
- A peek in to the Dataset
- Architecture
- Environment and Cluster details
- Importing required libraries
- Defining Helper Functions
- Training the model
- Streaming data
Everyday people perform various computational processes which demand a lot of online resources throughout the day, month or the year. The amount of data produced per day is massive, and it is important to know the exact usage of resources through this time to make it available to the users as an when required, thereby improving the efficiency. In the following project, we make use of certain dev-ops output data and analyze the variation in the performance throughout the day by performing a time series analysis on the data using spark RDDs. Apache Spark is a unified analytical engine used for big data and machine learning purposes developed at UC Berkeley in 2009. They also founded Databricks from Azure in 2013, which we use here to execute our program.
The data is obtained from AWS CloudWatch, and is present in parquet format.
A sample of our dataset is shown below. It consists of 12 unique columns:
- Index: The basic numbering of all the values from 0
- Provider: The source from where the resources are provided, in this case we use that of Amazon Web Services
- Service_Name: The type of service that is being requested for
- Account_ID: The account identity from where the request for the resource came.
- Region: Represents the regional host/ zone from where the resources are transported
- Resource_ID: Unique ID given to every resource sent to the user.
- Asset_ID: The unique identity given to the asset that complements the resource.
- Timestamp: This shows the time of arrival of dispatched resources.
- Value: The amount of it that has been provided.
- Unit: The unit of the number present in value column, for eg. (Percent, Count, Byte etc.).
- Metric_Name: Identifies the metric for which a particular value is mentioned.
- Statistic: Denotes how the data was collected, for eg. (count, sum, average etc.).
The whole project has two phases:
- Training Phase
- Production Phase
In the Training Phase, we read the data from parquet format stored in databricks file system, which is used to train the facebook prophet model. This model is used for forcasting the metric values of different cloud resources.
In the Production Phase, we use structured streaming to get the latest data, and use it with our model. This can be used to further train the model, or make future forecasts.
Inside spark, we train the model, and make plots to visualize the forecasts.
After this, the forecasts are finally stored
DBFS
Databricks File System (DBFS) is a distributed file system mounted into an Azure Databricks workspace and available on Azure Databricks clusters. DBFS is an abstraction on top of scalable object storage and offers the following benefits:
- Allows you to mount storage objects so that you can seamlessly access data without requiring credentials.
- Allows you to interact with object storage using directory and file semantics instead of storage URLs.
- Persists files to object storage, so you won’t lose data after you terminate a cluster.
As mentioned above, we have used dbfs for initially storing our files.
The project has been done on Azure Databricks, which allows us to set up Apache Spark™ environment in minutes, autoscale and collaborate on shared projects in an interactive workspace. Azure Databricks even supports Python, Scala, sql etc.
The cluster contains 1 Driver, and 2 Workers as shown above.
import numpy as np
import pandas as pd
from fbprophet import Prophet
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, struct
from pyspark.sql.types import FloatType, StructField, StructType, StringType, TimestampType
from sklearn.metrics import mean_squared_error
from pathlib import Path
Defining path where data is stored.
path = Path("/FileStore/tables/")
file_name = "i_0a1c7dc126cb6ac8b_CPUUtilization.csv"
import pyarrow
pyarrow.__version__
Retrieves the data from path and cleans it.
def retrieve_data(file_name):
"""Load sample data from ./data/original-input.csv as a pyspark.sql.DataFrame."""
# df = (spark.read
# .option("header", "true")
# .option("inferSchema", value=True)
# .csv("./data/input.csv"))
df = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(file_name)
# Drop any null values incase they exist
df = df.dropna()
# Rename timestamp to ds and total to y for fbprophet
df = df.select(
df['timestamp'].alias('ds'),
df['service_name'],
df['value'].cast(FloatType()).alias('y'),
df['metric_name']
)
return df
Retrieves the data as a stream from path.
def retrieve_datastream(file_name):
schema = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(file_name).schema
streamingInputDF = (
spark
.readStream
.schema(schema)
.option("header", "true").format("csv").load(file_name))
streamingCountsDF = (
streamingInputDF
.groupBy(
streamingInputDF.value,
window(streamingInputDF.timestamp, "1 hour"))
.count()
)
Function to convert each of the sql rows into a Python dictionary. We append each element to a single dictionalry and convert the final output into a pandas dataframe
def transform_data(row):
"""Transform data from pyspark.sql.Row to python dict to be used in rdd."""
data = row['data']
app = row['service_name']
mt = row['metric_name']
# Transform [pyspark.sql.Dataframe.Row] -> [dict]
data_dicts = []
for d in data:
data_dicts.append(d.asDict())
# Convert into pandas dataframe for fbprophet
data = pd.DataFrame(data_dicts)
data['ds'] = pd.to_datetime(data['ds'])
return {
'service_name': app,
'metric_name': mt,
'data': data
}
Function used to split the data into training dataset and a testing dataset. The reference used to split would be the value of the timestamp. We initialize a 'max_datetime' variable to set a threshold for the split.
def partition_data(d):
"""Split data into training and testing based on timestamp."""
# Extract data from pd.Dataframe
data = d['data']
# Find max timestamp and extract timestamp for start of day
max_datetime = pd.to_datetime(max(data['ds']))
# start_datetime = max_datetime.replace(hour=00, minute=00, second=00)
start_datetime = '10/10/20 00:00'
# Extract training data
train_data = data[data['ds'] < start_datetime]
# Account for zeros in data while still applying uniform transform
# train_data['y'] = train_data['y'].apply(lambda x: np.log(x + 1))
# Assign train/test split
d['test_data'] = data.loc[(data['ds'] >= start_datetime)
& (data['ds'] <= max_datetime)]
d['train_data'] = train_data
return d
Function that is used to create a prophet model out if the data we input into it. Prophet takes in certain arguments as shown below and outputs a time series analysis for the same.
def create_model(d):
"""Create Prophet model using each input grid parameter set."""
m = Prophet()
d['model'] = m
return d
Once we have the Prophet model initialized we can fit the training dataset from our previous function and train the model.
def train_model(d):
"""Fit the model using the training data."""
model = d['model']
train_data = d['train_data']
model.fit(train_data)
d['model'] = model
return d
Function used to test if the trained model is able to perform smoothly and forecast the right values for the same.
def test_model(d):
"""Run the forecast method on the model to make future predictions."""
test_data = d['test_data']
model = d['model']
t = test_data['ds']
t = pd.DataFrame(t)
t.columns = ['ds']
predictions = model.predict(t)
d['predictions'] = predictions
return d
Function to forecast the data values upto a certain defined threshold.
def make_forecast(d):
"""Execute the forecast method on the model to make future predictions."""
model = d['model']
future = model.make_future_dataframe(
periods=576, freq='5min', include_history=False)
future = pd.DataFrame(future['ds'].apply(pd.DateOffset(1)))
forecast = model.predict(future)
d['forecast'] = forecast
return d
Function used to normalize all the values of our data.
def normalize_predictions(d):
"""Normalize predictions using np.exp()."""
predictions = d['predictions']
# predictions['yhat'] = np.exp(predictions['yhat']) - 1
d['predictions'] = predictions
return d
Function used to normalize the values of our forecasted data. As mentioned below, since certain values tend to infinity, we replace these values with None using lambda functions.
def normalize_forecast(d):
"""Normalize predictions using np.exp().
Note: np.exp(>709.782) = inf, so replace value with None
"""
forecast = d['forecast']
# forecast['yhat'] = forecast['yhat'].apply(
# lambda x: np.exp(x) - 1 if x < 709.782 else None)
# forecast['yhat_lower'] = forecast['yhat_lower'].apply(
# lambda x: np.exp(x) - 1 if x < 709.782 else None)
# forecast['yhat_upper'] = forecast['yhat_upper'].apply(
# lambda x: np.exp(x) - 1 if x < 709.782 else None)
d['forecast'] = forecast
return d
Function used to calculate the mean squared error of the given test data with respect to the predicted outcomes.
def calc_error(d):
"""Calculate error using mse (mean squared error)."""
test_data = d['test_data']
predictions = d['predictions']
results = mean_squared_error(test_data['y'], predictions['yhat'])
d['mse'] = results
return d
Function used to return a table of all the previously calculated values of our forecast and our mean square errors.
def reduce_data_scope(d):
"""Return a tuple (service_name + , + metric_type, {})."""
return (
d['service_name'] + ',' + d['metric_name'],
{
'forecast': d['forecast'],
'mse': d['mse'],
},
)
Function to Flatten rdd into tuple which will be converted into a dataframe.Row. Checks each float to see if it is a np datatype, since it could be None. If it is an np datatype then it will convert to scalar python datatype so that it can be persisted into a database, since most dont know how to interpret np python datatypes.
def expand_predictions(d):
"""Flatten rdd into tuple which will be converted into a dataframe.Row.
Checks each float to see if it is a np datatype, since it could be None.
If it is an np datatype then it will convert to scalar python datatype
so that it can be persisted into a database, since most dont know how to
interpret np python datatypes.
"""
service_metric, data = d
service, metric = service_metric.split(',')
return [
(
service,
metric,
p['ds'].to_pydatetime(),
np.asscalar(p['yhat']) if isinstance(
p['yhat'], np.generic) else p['yhat'],
np.asscalar(p['yhat_lower']) if isinstance(
p['yhat_lower'], np.generic) else p['yhat_lower'],
np.asscalar(p['yhat_upper']) if isinstance(
p['yhat_upper'], np.generic) else p['yhat_upper'],
np.asscalar(data['mse']) if isinstance(
data['mse'], np.generic) else data['mse'],
) for i, p in data['forecast'].iterrows()
]
file_name = "/FileStore/tables/i_0a1c7dc126cb6ac8b_CPUUtilization.csv"
Reading the csv file from the databricks file system (dbfs).
df1 = spark.read.format("csv").load(file_name, header="true", inferSchema="true")
The below function registers a table to make it accessible via SQL content as shown below.
df1.createOrReplaceTempView("data_ec2")
%sql
select timestamp, value from data_ec2 where metric_name = 'CPUUtilization';
df1
Initializing the spark configuration and spark session.
conf = (SparkConf()
.setMaster("local[*]")
.setAppName("EC2 Training"))
spark = (SparkSession
.builder
.config(conf=conf)
.getOrCreate())
sc = spark.sparkContext
sc.setLogLevel("INFO")
Retrieve data from dbfs
df = retrieve_data(file_name)
Group data by app and metric_type to aggregate data for each app-metric combo.
df = df.groupBy('service_name', 'metric_name')
df = df.agg(collect_list(struct('ds', 'y')).alias('data'))
Using the previously defined functions with lambda functions to execute the RDDs and provide our prophet model.
df = (df.rdd
.map(lambda r: transform_data(r))
.map(lambda d: partition_data(d))
# prophet cant handle data with < 2 training examples
.filter(lambda d: len(d['train_data']) > 2)
.map(lambda d: create_model(d))
.map(lambda d: train_model(d))
.map(lambda d: test_model(d))
.map(lambda d: make_forecast(d))
.map(lambda d: normalize_forecast(d))
.map(lambda d: normalize_predictions(d))
.map(lambda d: calc_error(d))
.map(lambda d: reduce_data_scope(d))
.flatMap(lambda d: expand_predictions(d)))
Defining the schema of the rdd.
schema = StructType([
StructField("service_name", StringType(), True),
StructField("metric_name", StringType(), True),
StructField("ds", TimestampType(), True),
StructField("yhat", FloatType(), True),
StructField("yhat_lower", FloatType(), True),
StructField("yhat_upper", FloatType(), True),
StructField("mse", FloatType(), True)
])
df = spark.createDataFrame(df, schema)
df
%sql
select ds, yhat from forecasts
df.show(n=5)
df.createOrReplaceTempView("forecasts")
We create a temporary variable to store the pandas dataframe and slice to show the the output.
temp = df.toPandas()
temp[:30]
Writing the output forecasts to a parquet file stored in dbfs.
df.write.options(header=True).parquet(f'{file_name}_output.parquet', mode='overwrite')
The input path has been defined where the new data gets added. Structured streaming has been performed for testing our model.
from pyspark.sql.types import *
from pyspark.sql.functions import *
inputPath = "/FileStore/tables/"
name = "i_0a1c7dc126cb6ac8b_CPUUtilization.csv"
schema = spark.read.option("header", "true").option("inferSchema", value=True).format("csv").load(inputPath+name).schema
streamingInputDF = (
spark
.readStream
.schema(schema)
.option("header", "true").format("csv").load(inputPath+'*.csv'))
streamingCountsDF = (
streamingInputDF
.groupBy(
streamingInputDF.value,
window(streamingInputDF.timestamp, "1 hour"))
.count()
)
query = (
streamingCountsDF
.writeStream
.format("memory") # memory = store in-memory table (for testing only)
.queryName("count") # counts = name of the in-memory table
.outputMode("complete") # complete = all the counts should be in the table
.start()
)
%sql select value, window, count from count