In today’s fast-paced financial landscape, detecting transaction fraud is essential for protecting institutions and their customers. This article explores how to leverage Striim and SDGClassifier to create a robust fraud detection system that utilizes real-time data streaming and machine learning.
Problem
Transaction fraud detection is a critical responsibility for the IT teams of financial institutions. According to the 2024 Global Financial Crime Report from Nasdaq, an estimated $485.6 billion was lost to fraud scams and bank fraud schemes globally in 2023.
AI and ML help detect fraud, while real-time streaming frameworks like Striim play a key role in delivering financial data to reference and train classification models, enhancing customer protection.
Solution
In this article, I will demonstrate how to use Striim to perform key tasks for fraud detection with machine learning:
Ingest data using a Change Data Capture (CDC) reader in real time, call the model and deliver alerts to a target such as Email, Slack, Teams or any other target supported by Striim
Train the model using Striim Initial load app and re-train the model if its accuracy score decreases by using automation via REST APIs
Fraud Detection Approach
In typical credit card transactions, a financial institution’s data science team uses supervised learning to label data records as either fraudulent or legitimate. By carefully analyzing the data, engineers can extract key features that define a fraudulent user profile and behavior, such as personal information, number of orders, order content, payment history, geolocation, and network activity.
For this example, I’m using a dataset from Kaggle, which contains credit card transactions collected from EU retailers approximately 10 years ago. The dataset is already labeled with two classes representing fraudulent and normal transactions. Although the dataset is imbalanced, it serves well for this demonstration. Key fields include purchase value, age, browser type, source, and the class parameter, which indicates normal versus fraudulent transactions.
Picking Classification Model
There are many possibilities for classification using ML. In this example, I evaluated logistic regression and SGDClassifier: https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDClassifier.html. The main difference is that SGDClassifier uses stochastic gradient descent optimization whereas logistic regression uses the logistic function to model binary classification. Many experts consider SGD to be a more optimal approach for larger datasets, which is why it was selected for this application.
Accuracy Measurement
The accuracy score is a metric that measures how often a model correctly predicts the desired outcome. It is calculated by dividing the total number of correct predictions by the total number of predictions. In an ideal scenario, the best possible accuracy is 100% (or 1). However, due to the challenges of obtaining and diagnosing a high-quality dataset, data scientists typically aim for an accuracy greater than 90% (or 0.9).
Training Step
Striim provides the ability to read historical data from various sources including databases, messaging systems, files, and more. In this case, we have historical data stored in the MySQL database, which is a highly popular data source in the FinTech industry. Here’s what architecture with real-time data streaming augmented with training of the ML model looks like:
You can achieve this in Striim with an Initial Load application that has a Database reader pointed to the transactions table in MySQL and file target. With Striim’s flexible adapters, data can be loaded virtually from any database of choice and loaded into a local file system, ADLS, S3 or GCS.
Once the data load is completed, the application will change its status from RUNNING to COMPLETED. A script, or in this case, a PS made Open Processor (OP), can capture the status change and call the training Python script.
Additionally, I added a step with CQ (Continuous Query) that allows data scientists to add any transformation to the data in order to prepare the form satisfactory for the training process. This step can be easily implemented using Striim’s Flow Designer, which features a drag and drop interface along with the ability to code data modifications using a combination of SQL-like language and utility function calls.
Model Reference Step
Once the model is trained, we can deploy it in a real-time data CDC application that streams user financial transactions from an operational database. The application calls the model’s predict method, and if fraud is detected, it generates and sends an alert. Additionally, it will check the model accuracy and, if needed, initiate the retraining step described above.
Model Reference App Structure
Flow begins with Striim’s CDC reader that streams financial transactions directly from database binary log. It then invokes our classification model that was trained in the previous step via a REST CALL. In this case, I am using an OP that executes REST POST calls containing parsed transaction values needed for predictions. The model service returns the prediction to be parsed by a query. If fraud is detected, it generates an alert. At the same time, if the model accuracy dips below 90 percent, the Application Manager function can restart a training application called IL MySQL App using an internal management REST API.
Final Thoughts on Leveraging SDGClassifier and Striim for Financial Fraud Detection
This example illustrates how a real-world data streaming application can detect fraud by interacting with a classification model. The application sends alerts when fraud is detected using various Striim alert adapters, including email, web, Slack, or database. Furthermore, if the model’s quality deteriorates, it can retain the model for further evaluation.
CREATE OR REPLACE SOURCE TransactionsReader USING Global.MysqlReader (
ConnectionURL: ‘jdbc:mysql://localhost:3306/test’,
Tables: ‘ccorders’,
……)
OUTPUT TO transactionsStream;
CREATE STREAM sgdOutput OF Global.JsonNodeEvent;
CREATE STREAM FraudAlertStream OF Global.AlertEvent;
CREATE CQ checkPrediction
INSERT INTO predStream
SELECT data.get(“prediction”).toString() as pred FROM sgdOutput s;;
CREATE OR REPLACE CQ checkModelAccuracy
INSERT INTO accuracyStream
SELECT
data.get(“accuracy”).toString() as acc
FROM sgdOutput s;;
CREATE OR REPLACE OPEN PROCESSOR CallSGDClassifier USING Global.RestCallerPOST (
updateValues: ‘0&1&2&3&4&5&6&7&8&9&10&11’,
postUri_encrypted: ‘true’,
bodyPOST: ‘{“purchase_value” : {0}, “age”: {1}, “source_Ads”: {2}, “source_Direct” : {3}, “source_SEO” : {4}, “browser_Chrome” : {5}, “browser_FireFox” : {6}, “browser_IE” : {7}, “browser_Opera” : {8}, “browser_Safari” : {9}, “sex_F” : {10}, “sex_M” : {11}}’,
noOfThreads: ‘1’,
oauth: false,
basicAuth: false,
noOfRetries: ‘2’,
connPoolSize: ‘1’,
requestTimeoutMS: ‘3000’,
postUri: ‘http://host:5000/home’ )
INSERT INTO sgdOutput
FROM transactionsStream;
CREATE SUBSCRIPTION AlertAdapter USING Global.WebAlertAdapter (
isSubscription: ‘true’ )
INPUT FROM FraudAlertStream;
CREATE OR REPLACE CQ generateFraudAlert
INSERT INTO FraudAlertStream
SELECT “Company XYZ”, “Value”, “warning”, “raise”, “fraud prediction alert on CC transaction”
FROM predStream p where pred = “1.0”;;
CREATE OR REPLACE CQ CallTraining
INSERT INTO callOutput
SELECT com.striim.udf.app.ApplicationManager.startApplication(“admin.ILMySqlApp”)
FROM accuracyStream a
where TO_FLOAT(acc) < 0.9;
END APPLICATION FraudDetectionApp;
CREATE SOURCE ProcessorToStartTrainingStep USING Global.PrePostProcess (
Password_encrypted: ‘true’,
UserName: ‘root’,
SQLAfter: ‘python3 /tmp/callTraining.py’,
Password: ‘xxxx’,
ConnectionURL: ‘jdbc:mysql://localhost:3306’ )
OUTPUT TO m;
CREATE OR REPLACE SOURCE MySqlInitLoad USING Global.DatabaseReader (
ConnectionURL: ‘jdbc:mysql://localhost:3306/test’,
DatabaseProviderType: ‘Default’,
FetchSize: 100,
QuiesceOnILCompletion: true,
adapterName: ‘DatabaseReader’,
Password_encrypted: ‘true’,
Tables: ‘mytable’,
Password: ‘OaSpQIzaC0M9mtmPBCNEpw==’,
Username: ‘root’ )
OUTPUT TO myLoadOut;
CREATE CQ MyTransformationQuery
INSERT INTO myFileOutput
SELECT
to_string(data[0]) as age,
dnow() as curtime,
to_string(data[2]) as sourceOfdata,
to_string(data[0]) as browserType,
to_string(data[3]) as purchaseValue,
to_string(data[4]) as FraudClass….
FROM myLoadOut m;;
CREATE TARGET TrainFileTarget USING Global.FileWriter (
DataEncryptionKeyPassphrase: ”,
flushpolicy: ‘EventCount:10000,Interval:30s’,
rolloveronddl: ‘true’,
encryptionpolicy: ”,
rolloverpolicy: ‘EventCount:200000,Interval:30s’,
filename: ‘training_Data.csv’ )
FORMAT USING Global.DSVFormatter (
quotecharacter: ‘”‘,
columndelimiter: ‘,’,
nullvalue: ‘NULL’,
usequotes: ‘false’,
rowdelimiter: ‘n’,
standard: ‘none’,
header: ‘false’ )
INPUT FROM myFileOutput;
END APPLICATION ILMySqlApp;
Read MoreStriim