Real-time CRM for Gambling
Turning batch processing into real-time VIP detection
The Problem
A leading online gambling platform was losing valuable VIP players during gaming sessions because their CRM system operated on 4-hour batch cycles.
Sample Player Data
[
{
"player_id": "player_001",
"event_type": "bet",
"amount": 250.0,
"timestamp": "2024-01-15T14:30:00Z",
"game_type": "slots",
"session_id": "sess_001"
},
{
"player_id": "player_001",
"event_type": "bet",
"amount": 500.0,
"timestamp": "2024-01-15T14:31:00Z",
"game_type": "slots",
"session_id": "sess_001"
},
{
"player_id": "player_007",
"event_type": "login",
"amount": 0.0,
"timestamp": "2024-01-15T14:32:00Z",
"game_type": "poker",
"session_id": "sess_002"
}
]
Where Things Went Wrong
Architectural mistakes and their business impact
The Problematic Approach
They were using a traditional database-first approach that couldn't handle real-time requirements:
Every 4 hours
Critical Issues:
Our Winning Strategy
Real-time streaming with Data Lake architecture
The Triangulo Solution
We implemented a modern streaming architecture that separates processing from storage:
Streaming
Data Lake
Real-time Processing
Apache Spark processes events as they arrive with 1-hour sliding windows
Data Lake Storage
Delta Lake provides cheap, reliable storage with full audit trail
Batch CRM Updates
Efficient micro-batch updates to CRM system reduce database load
Time Travel
Full historical data for debugging and compliance requirements
Business Impact
Code That Made It Work
Production-ready implementation
Real-time VIP Detection Pipeline
Here's the core production code that identifies VIP players in real-time:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
def detect_vip_players():
# Initialize Spark with Delta Lake
spark = SparkSession.builder \\
.appName("RealTimeVIPDetection") \\
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension") \\
.getOrCreate()
# Read streaming player events
player_events = (spark
.readStream
.format("kafka")
.option("subscribe", "player_events")
.load()
.selectExpr("CAST(value AS STRING)"))
# Calculate real-time metrics
vip_players = (player_events
.select(
get_json_object("value", "$.player_id").alias("player_id"),
get_json_object("value", "$.amount").cast("double").alias("amount"),
to_timestamp(get_json_object("value", "$.timestamp")).alias("timestamp")
)
.withWatermark("timestamp", "5 minutes")
.groupBy("player_id", window("timestamp", "1 hour"))
.agg(sum("amount").alias("hourly_bet_amount"))
.filter(col("hourly_bet_amount") > 1000)) # VIP threshold
# Write to Delta Lake
query = (vip_players
.writeStream
.format("delta")
.outputMode("update")
.option("checkpointLocation", "/checkpoints/vip_detection")
.start("/data/vip_players"))
return query
# Start the pipeline
pipeline = detect_vip_players()
print("🎯 Real-time VIP detection started!")
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
import logging
class RealTimeVIPDetection:
def __init__(self):
self.spark = self.setup_spark_session()
self.logger = self.setup_logging()
def setup_spark_session(self):
"""Configure production Spark session"""
return SparkSession.builder \\
.appName("RealTimeVIPDetection") \\
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension") \\
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \\
.config("spark.sql.adaptive.enabled", "true") \\
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \\
.getOrCreate()
def setup_logging(self):
"""Setup production logging"""
logging.basicConfig(level=logging.INFO)
return logging.getLogger(__name__)
def create_vip_detection_pipeline(self):
"""Main VIP detection pipeline with error handling"""
try:
# Read from Kafka with error handling
player_events = (self.spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "player_events")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", 100000)
.load())
# Parse JSON with schema validation
event_schema = StructType([
StructField("player_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("timestamp", TimestampType(), True),
StructField("game_type", StringType(), True),
StructField("session_id", StringType(), True)
])
parsed_events = player_events \\
.select(from_json(col("value").cast("string"), event_schema) \\
.alias("data")) \\
.select("data.*") \\
.filter(col("player_id").isNotNull())
# Real-time aggregation with sliding window
vip_metrics = (parsed_events
.withWatermark("timestamp", "10 minutes")
.groupBy(
"player_id",
window("timestamp", "1 hour", "30 minutes") # Sliding window
)
.agg(
sum("amount").alias("hourly_bet_amount"),
count("event_type").alias("event_count"),
approx_count_distinct("session_id").alias("session_count"),
first("timestamp").alias("first_event_time"),
last("timestamp").alias("last_event_time")
)
.filter(col("hourly_bet_amount") > 1000))
# Write to Delta Lake with merge schema
query = (vip_metrics
.writeStream
.format("delta")
.outputMode("update")
.option("checkpointLocation", "/checkpoints/vip_detection")
.option("mergeSchema", "true")
.start("/data/vip_players"))
self.logger.info("✅ VIP detection pipeline started successfully")
return query
except Exception as e:
self.logger.error(f"❌ Pipeline failed: {str(e)}")
raise
# Production usage
if __name__ == "__main__":
detector = RealTimeVIPDetection()
pipeline = detector.create_vip_detection_pipeline()
pipeline.awaitTermination()
Key Features:
- Real-time Processing: Spark Structured Streaming with 1-hour sliding windows
- Fault Tolerance: Checkpointing for exactly-once processing
- Schema Evolution: Delta Lake handles schema changes automatically
- Production Ready: Comprehensive error handling and logging
Get the Complete Code
Download the full implementation including deployment scripts and sample data:
Summary
Production secrets and business results
The Production Success Story
After implementing our solution, the gambling platform achieved remarkable results:
23% Player Retention Increase
VIP players received instant bonuses, dramatically reducing churn
Real-time Campaigns
Bonuses delivered within seconds instead of hours
80% Database Load Reduction
Data Lake architecture eliminated database bottlenecks
🔮 Production Secrets
Performance Tip
👁️ Click to revealUse sliding windows (1h window, 30m slide) instead of tumbling windows for smoother real-time metrics and better user experience.
Cost Optimization
👁️ Click to revealData Lake storage costs $0.023/GB vs database storage at $0.115/GB - 5x savings while gaining better capabilities.
Monitoring Secret
👁️ Click to revealTrack streaming lag metric - if it grows beyond 5 minutes, you have a processing bottleneck that needs attention.
Deployment Trick
👁️ Click to revealUse Delta Lake time travel to debug production issues - you can query any previous state of your data.
Ready for Your Data Saga?
Let us help you implement this solution in your environment or tackle your unique data challenge.