PySpark: Interview Questions (Coding)— Part 1

Journey through Interview Scenarios

Pravash
8 min readJul 17, 2024

Navigating through a PySpark interview can be a challenging endeavor, especially when faced with scenario-based questions that test not only one’s theoretical knowledge but also practical problem-solving skills.

In this article, I’ll share several questions I personally encountered during interviews and also important ones. I believe these questions will provide insight into the types of inquiries encountered, whether in online assessments or face-to-face interviews.

Q1. ClickStream

Given a clickstream of user activity data , find the relevant user session for each click event.

click_time | user_id
2018–01–01 11:00:00 | u1
2018–01–01 12:00:00 | u1
2018–01–01 13:00:00 | u1
2018–01–01 13:00:00 | u1
2018–01–01 14:00:00 | u1
2018–01–01 15:00:00 | u1
2018–01–01 11:00:00 | u2
2018–01–02 11:00:00 | u2

session definition:
1. session expires after inactivity of 30mins, because of inactivity no clickstream will be generated
2. session remain active for total of 2 hours

Sol -

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, lag, when, lit, concat, sum, monotonically_increasing_id
from pyspark.sql.window import Window

# Create a SparkSession
spark = SparkSession.builder \
.appName("ClickStreamSession") \
.getOrCreate()

# Define the schema for the clickstream data
schema = "click_time STRING, user_id STRING"

# Sample clickstream data
data = [
("2018-01-01 11:00:00", "u1"),
("2018-01-01 12:00:00", "u1"),
("2018-01-01 13:00:00", "u1"),
("2018-01-01 13:00:00", "u1"),
("2018-01-01 14:00:00", "u1"),
("2018-01-01 15:00:00", "u1"),
("2018-01-01 11:00:00", "u2"),
("2018-01-02 11:00:00", "u2")
]

# Create a DataFrame from the given sample data
clickstream_df = spark.createDataFrame(data, schema=schema)

# Convert click_time to Unix timestamp for easier calculations
clickstream_df = clickstream_df.withColumn("click_timestamp", unix_timestamp("click_time"))

session_window = Window.partitionBy("user_id").orderBy("click_timestamp")

# Getting the previous row value using lag
clickstream_df = clickstream_df.withColumn("prev_click_timestamp", lag("click_timestamp", 1).over(session_window))

# Difference between click time and dividing that with 60
clickstream_df = clickstream_df.withColumn("timestamp_diff", (col("click_timestamp")-col("prev_click_timestamp"))/60)

# Updating null with 0
clickstream_df = clickstream_df.withColumn("timestamp_diff", when(col("timestamp_diff").isNull(), 0).otherwise(col("timestamp_diff")))

# Check for new session
clickstream_df = clickstream_df.withColumn("session_new", when(col("timestamp_diff") > 30, 1).otherwise(0))

# New session names
clickstream_df = clickstream_df.withColumn("session_new_name", concat(col("user_id"), lit("--S"), sum(col("session_new")).over(session_window)))
clickstream_df.show()

Q2. Max Salary

Ask is to find the job titles of the highest-paid employees. output should include the highest-paid title or multiple titles with the same salary.

Input —

worker:
|worker_id|first_name|last_name|salary|joining_date| department|
| 1| John| Doe| 10000| 2023–01–01|Engineering|
| 2| Jane| Smith| 12000| 2022–12–01| Marketing|
| 3| Alice| Johnson| 12000| 2022–11–01|Engineering|

title:
|worker_ref_id|worker_title|affected_from|
| 1| Engineer| 2022–01–01|
| 2| Manager| 2022–01–01|
| 3| Engineer| 2022–01–01|

Output —
|worker_id|first_name|last_name|best_paid_title|salary|
| 3| Alice| Johnson| Engineer| 12000|
| 2| Jane| Smith| Manager| 12000|

Sol -

from pyspark.sql import SparkSession, SQLContext, Window
from pyspark.sql.functions import rank

spark = SparkSession.builder \
.appName("HighestPaidJobTitles") \
.getOrCreate()

worker_data = [(1, 'John', 'Doe', 10000, '2023-01-01', 'Engineering'),
(2, 'Jane', 'Smith', 12000, '2022-12-01', 'Marketing'),
(3, 'Alice', 'Johnson', 12000, '2022-11-01', 'Engineering')]
columns = ['worker_id', 'first_name', 'last_name', 'salary', 'joining_date', 'department']
worker = spark.createDataFrame(worker_data, columns)

title_data = [(1, 'Engineer', '2022-01-01'),
(2, 'Manager', '2022-01-01'),
(3, 'Engineer', '2022-01-01')]
columns = ['worker_ref_id', 'worker_title', 'affected_from']
title = spark.createDataFrame(title_data, columns)

joined_df = worker.join(title, worker.worker_id == title.worker_ref_id)

ranked_df = joined_df.withColumn("salary_rank", f.rank().over(Window.orderBy(joined_df["salary"].desc())))
highest_paid_df = ranked_df.filter(ranked_df["salary_rank"] == 1)
result_df = highest_paid_df.select("worker_id", "first_name", "last_name", "worker_title", "salary").withColumnRenamed('worker_title', 'best_paid_title')
result_df.show()

spark.stop()

Q3. Highest and Lowest Salary

You have been asked to find the employees with the highest and lowest salary from the below sample data for worker and t.The output includes a column salary_type that categorizes the output by:
‘Highest Salary’ represents the highest salary
‘Lowest Salary’ represents the lowest salary

worker:
|worker_id|first_name|last_name|salary|joining_date| department|
| 1| John| Doe| 5000| 2023–01–01|Engineering|
| 2| Jane| Smith| 6000| 2023–01–15| Marketing|
| 3| Alice| Johnson| 4500| 2023–02–05|Engineering|

title:
|worker_ref_id|worker_title|affected_from|
| 1| Engineer| 2022–01–01|
| 2| Manager| 2022–01–01|
| 3| Engineer| 2022–01–01|

Sol -

from pyspark.sql import SparkSession
from pyspark.sql.functions import max, min, when

spark = SparkSession.builder \
.appName("HighestLowestSalaryEmployees") \
.getOrCreate()

worker_data = [
(1, 'John', 'Doe', 5000, '2023-01-01', 'Engineering'),
(2, 'Jane', 'Smith', 6000, '2023-01-15', 'Marketing'),
(3, 'Alice', 'Johnson', 4500, '2023-02-05', 'Engineering')
]
title_data = [
(1, 'Engineer', '2022-01-01'),
(2, 'Manager', '2022-01-01'),
(3, 'Engineer', '2022-01-01')
]
worker_columns = ['worker_id', 'first_name', 'last_name', 'salary', 'joining_date', 'department']
title_columns = ['worker_ref_id', 'worker_title', 'affected_from']
worker_df = spark.createDataFrame(worker_data, worker_columns)
title_df = spark.createDataFrame(title_data, title_columns)
worker_df.show()
title_df.show()

joined_df = worker_df.join(title_df, worker_df.worker_id == title_df.worker_ref_id, "inner")
result_df = joined_df.groupBy("worker_id", "first_name", "last_name", "salary", "department") \
.agg(
max("salary").alias("max_salary"),
min("salary").alias("min_salary")
)

result_df = result_df.withColumn("salary_type",
when(result_df["salary"] == result_df["max_salary"], "Highest Salary")
.when(result_df["salary"] == result_df["min_salary"], "Lowest Salary")
.otherwise(None))
result_df.show()

spark.stop()

Q4. Pivot the Column into Rows

Given Input -
StudentID, StudentName , AScore, BScore,CScore
123, A, 30, 31, 32
124, B, 40, 41, 42

Get the output in below format -
StudentID, StudentName , Subject , Score
123, A, AScore, 30
123, A, BScore, 31
123, A, CScore, 32
124, B, AScore, 40
124, B, BScore, 41
124, B, SScore, 42

Sol -

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
.appName("Transform Data") \
.getOrCreate()

data = [
(123, "A", 30, 31, 32),
(124, "B", 40, 41, 42),
(125, "B", 50, 51, 52)
]

df = spark.createDataFrame(data, ["StudentID", "StudentName", "AScore", "BScore", "CScore"])

pivot_df = df.selectExpr(
"StudentID",
"StudentName",
"stack(3, 'AScore', AScore, 'BScore', BScore, 'CScore', CScore) as (Subject, Score)"
)

pivot_df.show()

Note:
"stack(3, 'AScore', AScore, 'BScore', BScore, 'CScore', CScore)": This part uses the stack function to pivot the DataFrame. The first argument 3 indicates the number of key-value pairs to stack. The subsequent arguments are key-value pairs where the first argument is the key (in this case, column name) and the second argument is the value (the column value). So, for each row, three key-value pairs are created: ('AScore', AScore), ('BScore', BScore), and ('CScore', CScore). The stack function stacks these key-value pairs into rows

Q5. Repeat IDs

Given Input —
id
1
2
3

Output —
id
1
2
2
3
3
3

Sol -

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, explode, split
spark = SparkSession.builder \
.appName("Repeat ID") \
.getOrCreate()
df = spark.createDataFrame([(1,), (2,), (3,)], ["id"])

output_df = df.selectExpr("explode(sequence(1, id)) as id")

output_df.show()

Q6. Group Rows

Input —
col1|col2|col3
alpha| aa| 1
alpha| aa| 2
beta| bb| 3
beta| bb| 4
beta| bb| 5

Output —
col1|col2|col3_list
alpha| aa| [1, 2]
beta| bb|[3, 4, 5]

Sol -

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
.appName("HighestLowestSalaryEmployees") \
.getOrCreate()

data = [("alpha", "aa", 1),
("alpha", "aa", 2),
("beta", "bb", 3),
("beta", "bb", 5),
("beta", "bb", 4)]
schema = ["col1", "col2", "col3"]
df = spark.createDataFrame(data, schema=schema)
df.show()
df_grouped = df.groupBy("col1", "col2").agg(collect_list("col3").alias("col3_list"))
df_grouped.show()
spark.stop()

Q7. Json Data

Read below json file —
[
{
“dept_id”: 102,
“e_id”: [
10201,
10202
]
},
{
“dept_id”: 101,
“e_id”: [
10101,
10102,
10103
]
}
]

output —
dept_id | e_id
101 | 10101
101 | 10102
101 | 10103
102 | 10201
102 | 10202

Sol -

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("ReadJSON") \
.getOrCreate()

df = spark.read.option("multiline", "true").json('sample_data.json')
df_exploded = df.selectExpr("dept_id", "explode(e_id) as e_id")
# df_exploded = df.select("dept_id", explode('e_id').alias('e_id'))
df_exploded.show()

Note:
Need to use option("multiline", "true"), Otherwise this will below exception —
AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column(named _corrupt_record by default).

This is because PySpark considers every record in a JSON file as a fully qualified record in a single line. PySpark JSON data source API provides the multiline option to read records from multiple lines.

Q8. Employee Total Hours Inside

From the below data, find the total hours employee was inside office.

Input —
emp_id| punch_time|flag
11114|1900–01–01 08:30:00| I
11114|1900–01–01 10:30:00| O
11114|1900–01–01 11:30:00| I
11114|1900–01–01 15:30:00| O
11115|1900–01–01 09:30:00| I
11115|1900–01–01 17:30:00| O

Sol —

import datetime
from pyspark.sql.types import StructType, StructField, TimestampType, LongType, StringType
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *

spark = SparkSession.builder \
.appName("TotalInTime") \
.getOrCreate()

_data = [
(11114, datetime.datetime.strptime('08:30:00.00', "%H:%M:%S.%f"), "I"),
(11114, datetime.datetime.strptime('10:30:00.00', "%H:%M:%S.%f"), 'O'),
(11114, datetime.datetime.strptime('11:30:00.00', "%H:%M:%S.%f"), 'I'),
(11114, datetime.datetime.strptime('15:30:00.00', "%H:%M:%S.%f"), 'O'),
(11115, datetime.datetime.strptime('09:30:00.00', "%H:%M:%S.%f"), 'I'),
(11115, datetime.datetime.strptime('17:30:00.00', "%H:%M:%S.%f"), 'O')
]

# Schema
_schema = StructType([
StructField('emp_id', LongType(), True),
StructField('punch_time', TimestampType(), True),
StructField('flag', StringType(), True)
])

df = spark.createDataFrame(data=_data, schema=_schema)

window_agg = Window.partitionBy('emp_id').orderBy(col('punch_time'))

df = df.withColumn('prev_time', lag(col('punch_time')).over(window_agg))

df = df.withColumn('time_diff', (col('punch_time').cast('long') - col('prev_time').cast('long'))/3600)

df = df.groupBy('emp_id').agg(sum(when(col('flag') == 'O', col('time_diff')).otherwise(0)).alias('total_time'))
print(df.show())

Q9. Employee with Manager

From the given data set, Fetch the manager and their employees

Input —
employee_id|first_name|manager_id
4529| Nancy| 4125
4238| John| 4329
4329| Martina| 4125
4009| Klaus| 4329
4125| Mafalda| NULL
4500| Jakub| 4529
4118| Moira| 4952
4012| Jon| 4952
4952| Sandra| 4529
4444| Seamus| 4329

Output —
manager_id|manager_name|count
4125| Mafalda| 2
4952| Sandra| 2
4329| Martina| 3
4529| Nancy| 2

Sol —

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
.appName("EmployeesWithManagers") \
.getOrCreate()

data = [('4529', 'Nancy', '4125'),
('4238','John', '4329'),
('4329', 'Martina', '4125'),
('4009', 'Klaus', '4329'),
('4125', 'Mafalda', 'NULL'),
('4500', 'Jakub', '4529'),
('4118', 'Moira', '4952'),
('4012', 'Jon', '4952'),
('4952', 'Sandra', '4529'),
('4444', 'Seamus', '4329')]
schema = ['employee_id', 'first_name', 'manager_id']

df = spark.createDataFrame(data=data, schema=schema)
df = spark.createDataFrame(data=data, schema=schema)

# Self-join the DataFrame to get manager names
result_df = df.alias("e").join(df.alias("m"), col("e.manager_id") == col("m.employee_id"), "inner") \
.select(col("e.employee_id"), col("e.first_name"),
col("e.manager_id"), col("m.first_name").alias("manager_name"))

result_df.groupBy("manager_id", "manager_name").count().show()

spark.stop()

Q10. Count the Frequency of each Word

Write a word count function using pyspark that covers following steps —
1. read the data from the input.txt file
2. lower the words
3. remove any punctuations
4. Remove the None, blank strings and digits
5. sort the word sin descending order
6. write it to a csv file

Test Input —
test_data = [
“Hello world! This is a test.”,
“Spark is awesome, isn’t it?”,
“Test test test.”
“Test 0 21.”
]

Output —
word|count
test| 4
is| 2
a| 1
awesome| 1
spark| 1
this| 1
world| 1
hello| 1
isn| 1
it| 1

Sol —

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType


def word_count(output_file_path):
# Initialize SparkSession
spark = SparkSession.builder \
.appName("WordCount") \
.getOrCreate()

# Test data
test_data = [
"Hello world! This is a test.",
"Spark is awesome, isn't it?",
"Test test test."
"Test 0 21."
]

# Create DataFrame from test data
input_df = spark.createDataFrame(test_data, StringType())

input_df = input_df.select(split(col("value"), " ").alias("line"))
input_df = input_df.select(explode(col("line")).alias("value"))

# lower the words
input_df = input_df.withColumn("value", lower(col("value")))

# Split text into words, remove punctuations
words_df = input_df.select(regexp_extract(col("value"), "[a-z]+", 0).alias("word"))

# Remove None or blank strings
words_df = words_df.filter(col("word").isNotNull() & (col("word") != "") & (~col("value").rlike("^\d+$")))

# Perform word count
word_count_df = words_df.groupBy("word").count()

# Sort the words in descending order
word_count_df = word_count_df.orderBy(col("count").desc())
word_count_df.show()

# Write the word count results to CSV file
word_count_df.coalesce(1).write.csv(output_file_path, header=True)

# Stop SparkSession
spark.stop()


# Usage example:
output_file_path = "output_word_count_result.csv"
word_count(output_file_path)

Winding Up

I hope these questions will give an idea about the types of questions asked and also confident to navigate PySpark interviews and demonstrate your expertise in handling big data processing tasks. Since all these questions involves different PySpark functions which are really important to know and understand their usage if you are asked to generate the required output. And Yes there multiple ways to answer them, I have only shared my approach.

I will share more questions in the subsequent parts of PySpark Interview Questions articles. I hope this encouraged you to delve deeper into PySpark’s functionalities, and engage in hands-on practice.

--

--

Pravash
Pravash

Written by Pravash

I am a passionate Data Engineer and Technology Enthusiast. Here I am using this platform to share my knowledge and experience on tech stacks.