Hi folks an experimental query to perform MITRE ATT&CK classifications with data from an external repository (GIT)
While we build out the backend to allow us to run with thousands of classification heuristics and richer more complex machine learning classifiers I wanted to experiment with some of the tools for virtual table creation that I wrote.
Often enough we want to process a large volume of data that may exist outside of osquery. perhaps it is from an external threat intelligence site, some internal file share or from a log file that resides on the device. To get that information into a virtual table so you can perform joins I wrote this little query. It will load the data and use GREP and SPLIT to convert it into a table.
LOAD A CSV FROM A REMOTE LOCATION
-- LOAD CSV from GIT LOCATION
-- VARIABLE $$URL$$ URL
WITH
Remote_CSV_file(Line, str) AS (
SELECT '', (SELECT result from curl where url = '$$URL$$') ||char(10)
UNION ALL
SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM Remote_CSV_file WHERE str!=''
),
-- Create Table for Remote_CSV_file
Remote_Loaded_Table (Col1, Col2, Col3, Col4, Col5, Col6) AS (
SELECT SPLIT(Line,',',0) Col1, SPLIT(Line,',',1) Col2, SPLIT(Line,',',2) Col3, SPLIT(Line,',',3) Col4, SPLIT(Line,',',4) Col2, SPLIT(Line,',',5) Col6
FROM Remote_CSV_file WHERE Line != ''
LIMIT 5
)
SELECT * FROM Remote_Loaded_Table
LOAD A CSV FROM LOCAL FILE SYSTEM
-- LOAD CSV from LOCAL SYSTEM
-- VARIABLE $$File Path$$ STRING
WITH
-- LOAD CSV from LOCAL SYSTEM
Local_CSV_file AS (
SELECT line FROM grep WHERE pattern = ',' AND path = '$$File Path$$'
),
-- Create Table for Local_CSV_file
Local_Loaded_Table AS (
SELECT SPLIT(Line,',',0) Col1, SPLIT(Line,',',1) Col2, SPLIT(Line,',',2) Col3, SPLIT(Line,',',3) Col4, SPLIT(Line,',',4) Col2, SPLIT(Line,',',5) Col6
FROM Local_CSV_file WHERE Line != ''
LIMIT 5
)
SELECT * FROM Local_Loaded_Table
With the ability to load virtual tables from the system or from a remote location I then proceeded to consolidate over 600 IOC mapping rules into two files. The first contains the full MITRE MAP INFORMATION the second contains the RULES I want to process.
MITRE MAP
https://github.com/karlrackerman/Query_Library/raw/main/MITRE_ID_MAP.CSV
CMDLINE IOC DETECTION RULES
https://github.com/karlrackerman/Query_Library/raw/main/TTP_Rules.CSV
Wit those in place I can now use a query that creates the virtual table to bring that data down to the device when I run a query and I can then process the rules and compare it to cmdlines I am seeing from other tables. You get a nice MITRE Classification whenever you perform a generic search for process information.
-- Generic Search
-- VARIABLE: $$Begin Search on date$$ DATE
-- VARIABLE: $$Hours to Search$$ STRING
-- VARIABLE: $$command line$$ STRING
-- VARIABLE: $$process name$$ STRING
-- VARIABLE: $$parent process name$$ STRING
-- VARIABLE: $$user name$$ STRING
-- In order to avoid the watchdog on the device, we will query the journals in in 4 hour chunks (3600 seconds)
WITH RECURSIVE
Time_Interval(x) AS (
VALUES ( CAST($$Begin Search on date$$ AS INT) )
UNION ALL
SELECT x+14400 FROM Time_Interval WHERE x < CAST($$Begin Search on date$$ AS INT) + CAST( $$Hours to Search$$ * 3600 AS INT)
),
-- Get map from GIT LOCATION
mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS (
WITH mitre_map_file(Line, str) AS (
SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10)
UNION ALL
SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!=''
)
SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique
FROM mitre_map_file WHERE Line != ''
),
-- Get detection rules from GIT LOCATION
mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS (
WITH mitre_rule_file(Line, str) AS (
SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10)
UNION ALL
SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!=''
)
SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator
FROM mitre_rule_file WHERE Line != ''
),
-- Map the methods to the description
Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator
FROM mitre_map mm
JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT)
)
SELECT
CAST( replace(datetime(spj.time,'unixepoch'),' ','T') AS TEXT)Date_Time, -- add the T to help excel understand this is a date and time
CAST( users.username AS TEXT) User_Name,
CAST( (SELECT processname FROM sophos_process_journal spj2 WHERE spj2.sophosPID = spj.parentSophosPID) AS TEXT) Parent_Process_Name,
CAST( spj.processname AS TEXT) Process_Name,
CAST( spj.cmdline AS TEXT) CmdLine,
CAST((SELECT '('||COUNT(info)||') '||CHAR(10)||GROUP_CONCAT(info, CHAR(10)) FROM Rule_Map WHERE spj.processname LIKE '%'||process AND spj.cmdline LIKE indicator) AS TEXT) Mitre_Info,
-- SHOW a pretty bar whre the size depends on the execution duration Duration bar is a sqrt function based on execution time
CASE CAST( (CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END)/15 AS INT)
WHEN 0 THEN '│'
ELSE printf('%.' || CAST(CAST(SQRT(CAST( CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END AS INT)/15) AS INT) AS TEXT) ||'c', '█')
END Execution_Duration,
CAST( CASE spj.eventType WHEN 0 THEN strftime('%s', 'now') - spj.processStartTime ELSE MAX(spj.endtime) - spj.processStartTime END AS TEXT) Seconds,
CAST( spj.sophosPid AS TEXT) SophosPID,
CAST( spj.parentSophosPID AS TEXT) Parent_SophosPID,
CAST( spj.SHA256 AS TEXT) SHA256,
CAST( CASE spj.eventType WHEN 0 THEN 'Process Running' ELSE 'Process Stopped' END AS TEXT)ProcessStatus,
CAST( spj.pathname AS TEXT) PathName
FROM Time_Interval t
LEFT JOIN sophos_process_journal spj ON spj.time > t.x AND spj.time < t.x+14400
LEFT JOIN users ON uuid LIKE sid
WHERE
-- SEARCH AND FILTER CRITERIA
LOWER(spj.cmdline) LIKE LOWER('%$$command line$$%') AND
LOWER(users.username) LIKE LOWER('%$$user name$$%') AND
LOWER(spj.processname) LIKE LOWER('%$$process name$$%') AND
LOWER(Parent_Process_Name) LIKE LOWER('%$$parent process name$$%')
GROUP BY SophosPID
You can also add that into a more exhaustive classifier for all activity.
-- MITRE_ATT&CK Cmdline mappings -- Get map from GIT LOCATION WITH mitre_map_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!='' ), -- Create Table for Mitre_MAP mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS ( SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique FROM mitre_map_file WHERE Line != '' ), -- Get detection rules from GIT LOCATION mitre_rule_file(Line, str) AS ( SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10) UNION ALL SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!='' ), -- Create Table for Detection_Rules mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS ( SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator FROM mitre_rule_file WHERE Line != '' ), -- Map the methods to the description Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator FROM mitre_map mm JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT) ) -- Perform the evaluations SELECT REPLACE(DATETIME(spj.time,'unixepoch'),' ','T') date_time, Rule_Map.Info, (SELECT username FROM users WHERE users.uuid = spj.sid) AS username, spj.processname AS process_name, spj.cmdline AS cmd_line, spj.sophosPID AS sophos_pid, spj.pathname AS path_name, (SELECT spj2.processname FROM sophos_process_journal spj2 WHERE spj2.sophospid = spj.parentSophosPID AND spj2.eventtype = 0) parent_name FROM sophos_process_journal spj JOIN Rule_Map ON spj.processname LIKE Rule_Map.process AND spj.cmdline LIKE Rule_Map.Indicator WHERE spj.eventType = 0 AND spj.time > $$Begin search on$$ AND spj.time < $$Begin search on$$ + $$Number of hours to search$$ * 3600
And now that we have PIVOTS you can go from one of the Sophos PIDS to a process three that includes the MITRE enrichments as well
-- Process Tree with MITRE enrichment for CMD LINE
-- VARIABLE $$sophosPID$$ SophosPID
-- Generate Process Tree
WITH RECURSIVE
get_ancestors(sophosPID, level, startTime, cmdLine, procName, parentSophosPID, sha256, sid) AS (
SELECT sophosPID, 0, processStartTime, cmdLine, processName, parentSophosPID, sha256, sid
FROM sophos_process_journal
WHERE sophosPID = '$$sophosPID$$' AND eventtype = 0
UNION ALL
SELECT p.sophosPID, get_ancestors.level - 1, p.processStartTime, p.cmdLine, p.processName, p.parentSophosPID, p.sha256, p.sid
FROM sophos_process_journal p
JOIN get_ancestors ON p.sophosPID = get_ancestors.parentSophosPID
ORDER BY 2 ASC
),
ancestor_tree AS (
SELECT DISTINCT
startTime,
printf('%.' || ABS(level) || 'c', '<') || ' ' || procName processBranch,
sophosPID,
cmdLine,
sha256,
sid
FROM get_ancestors
WHERE level < 0
ORDER BY level ASC
),
get_children(sophosPID, level, startTime, cmdLine, procName, targetsophosPID, sha256, sid) AS (
SELECT p.sophosPID, 0, p.processStartTime, p.cmdLine, p.processName,spa.targetsophosPID, p.sha256, p.sid
FROM sophos_process_journal p
LEFT JOIN sophos_process_activity spa ON spa.sophosPID = p.sophosPID AND subject = 'Process'
WHERE p.sophosPID = '$$sophosPID$$'
UNION ALL
SELECT p.sophosPID, get_children.level + 1, p.processStartTime, p.cmdLine, p.processName, spa.targetsophosPID, p.sha256, p.sid
FROM sophos_process_journal p
JOIN get_children ON p.sophosPID = get_children.targetsophosPID LEFT JOIN sophos_process_activity spa ON spa.sophosPID = p.sophosPID AND subject = 'Process'
ORDER BY 2 DESC
),
child_tree AS (
SELECT
DISTINCT startTime,
CASE
WHEN level = 0 THEN procName
ELSE printf('%.' || level || 'c', '>') || ' ' || procName
END as processBranch,
sophosPID,
cmdLine,
sha256,
sid
FROM get_children
),
Full_Tree AS (
SELECT * FROM ancestor_tree
UNION ALL
SELECT * FROM child_tree
),
-- Get map from GIT LOCATION
mitre_map_file(Line, str) AS (
SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/MITRE_ID_MAP.CSV') ||char(10)
UNION ALL
SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_map_file WHERE str!=''
),
-- Create Table for Mitre_MAP
mitre_map (Tid, tactic, id, technique, subid, subtechnique) AS (
SELECT SPLIT(Line,',',0) Tid, SPLIT(Line,',',1) tactic, SPLIT(Line,',',2) id, SPLIT(Line,',',3) technique, SPLIT(Line,',',4) subid, SPLIT(Line,',',5) subtechnique
FROM mitre_map_file WHERE Line != ''
),
-- Get detection rules from GIT LOCATION
mitre_rule_file(Line, str) AS (
SELECT '', (SELECT result from curl where url = 'https://raw.githubusercontent.com/karlrackerman/Query_Library/main/TTP_Rules.CSV') ||char(10)
UNION ALL
SELECT substr(str, 0, instr(str, char(10) )), substr(str, instr(str, char(10) )+1) FROM mitre_rule_file WHERE str!=''
),
-- Create Table for Detection_Rules
mitre_detection_rules (method, noise_level, id,subid,process,indicator) AS (
SELECT SPLIT(Line,',',0) method, SPLIT(Line,',',1) noise_level, SPLIT(Line,',',2) id, SPLIT(Line,',',3) subid, SPLIT(Line,',',4) process, SPLIT(Line,',',5) indicator
FROM mitre_rule_file WHERE Line != ''
),
-- Map the methods to the description
Rule_Map AS ( SELECT mm.id||'.'||mm.subid||' -- '||mm.tactic||':: '||mm.technique||'- '||mm.subtechnique||CHAR(10)||'. Method: '||mdr.method||CHAR(10)||'. Process LIKE: '||mdr.process||CHAR(10)||'. Indicator LIKE: '||mdr.indicator info, mdr.process, mdr.indicator
FROM mitre_map mm
JOIN mitre_detection_rules mdr ON CAST(mm.id AS TEXT) = CAST(mdr.id AS TEXT) AND CAST(mm.subid AS TEXT) = CAST(mdr.subid AS TEXT)
)
-- USED TO DEBUG
-- SELECT * FROM mitre_map
-- SELECT CAST(method AS TEXT) Method, CAST(noise_level AS TEXT) Noise_Level, CAST(id AS TEXT) ID, CAST(subid AS TEXT) SubID, CAST(process AS TEXT) Process, CAST(indicator AS TEXT) Indicator FROM mitre_detection_rules
-- SELECT * FROM Rule_Map ORDER BY info
-- SELECT * FROM mitre_map UNION ALL SELECT * FROM mitre_detection_rules
-- Display the Tree and enrich the CMDLINE
SELECT
DateTime(starttime,'unixepoch') Date_Time,
processBranch,
cmdline,
-- NOTE GROUP_CONCAT incase a cmdline has multiple TTP hits
CAST((SELECT '('||COUNT(info)||') '||CHAR(10)||GROUP_CONCAT(info, CHAR(10)) FROM Rule_Map WHERE processBranch LIKE '%'||process AND Cmdline LIKE indicator) AS TEXT) Mitre_Info,
(SELECT username FROM users WHERE uuid = sid) username,
sophosPID,
(SELECT pathname FROM sophos_process_journal WHERE sophosPID = Full_Tree.sophosPID AND eventtype = 0) pathname,
CAST(sha256 AS TEXT) sha256,
sid,
ROW_NUMBER() OVER() Row_number
FROM Full_Tree
Oh and if you find a particular process you really want to examine, run the extended process tree query on it
-- Extended Process Tree for a SophosPID
-- VARIABLE $$SophosPID$$ sophosPID
-- NOTE THE PROCESS OR ANCESTORS MAY STILL BE RUNNING SO HANDLE ENDTIME CORRECTLY by changing endtime for running processes to now + 10 min
WITH RECURSIVE
-- GET A LIST OF ALL ANCESTORS OF A SOPHOS PID
Ancestors(SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end) AS (
-- Define the SEED Row as the information on the SophosPID provided
SELECT sophosPID, 0, ParentSophosPID, processname, pathname, cmdline, sha256, sid, time,
CASE WHEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0) > 0
THEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0)
ELSE strftime('%s','now','+10 minutes') END
FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) = 0
UNION ALL
-- Recursvly identify all decendents
SELECT spj.SophosPID, Level - 1, spj.ParentSophosPID, spj.processname, spj.pathname, spj.cmdline, spj.sha256, spj.sid, spj.time,
CASE WHEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0) > 0
THEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0)
ELSE strftime('%s','now','+10 minutes') END
FROM Ancestors JOIN Sophos_Process_Journal spj ON spj.SophosPID = Ancestors.parent AND CAST(spj.endtime AS INT) = 0
-- Perform a Depth First Search ASC would perform a Breadth First Search
ORDER BY 2 DESC
),
-- Add Row Numbers to the Ancestor List so we order the tree corretly
-- EXCLUDE the line for the specified SophosPID so that when we show the tree we do not have a duplicate row
Orderd_Ancestors AS (SELECT SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end, -1 * ROW_Number() OVER () Row FROM Ancestors WHERE SophosPID NOT IN ('$$SophosPID$$') ),
-- GET A LIST OF ALL DECENDENTS OF A SOPHOS PID
-- NOTE THE PROCESS OR CHILDREN MAY STILL BE RUNNING SO HANDLE ENDTIME CORRECTLY by changing endtime for running processes to now + 10 min
Children(SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end) AS (
-- Define the SEED Row as the information on the SophosPID provided
SELECT sophosPID, 0, ParentSophosPID, processname, pathname, cmdline, sha256, sid, time,
CASE WHEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0) > 0
THEN (SELECT endtime FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) > 0)
ELSE strftime('%s','now','+10 minutes') END
FROM sophos_process_Journal WHERE sophos_process_journal.SophosPID = '$$SophosPID$$' AND CAST(endtime AS INT) = 0
UNION ALL
-- Recursvly identify all decendents
SELECT spj.SophosPID, Level +1, spj.ParentSophosPID, spj.processname, spj.pathname, spj.cmdline, spj.sha256, spj.sid, spj.time,
CASE WHEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0) > 0
THEN (SELECT spj2.endtime FROM sophos_process_Journal spj2 WHERE spj2.SophosPID = spj.SophosPID AND CAST(endtime AS INT) > 0)
ELSE strftime('%s','now','+10 minutes') END
FROM Children JOIN Sophos_Process_Journal spj ON spj.ParentSophosPID = Children.SophosPID AND CAST(spj.endtime AS INT) = 0 AND spj.time > Children.start -5 and spj.time < Children.end +3600
-- Perform a Depth First Search ASC would perform a Breadth First Search
ORDER BY 2 DESC
),
-- Add Row Numbers to the Decendent List so we order the tree corretly
Orderd_Descendants AS (SELECT SophosPID, Level, parent, processname, pathname, cmdline, sha256, sid, start, end, ROW_Number() OVER () Row FROM Children),
-- Now collect the activity for all descendents and the selected sophosPID using a UNION to list the decendent then the file activity it had
File_Activity AS (
-- FOR ANCESTORS WE WILL ONLY SHOW THE PROCESS TREE INFO (No activity will be collected)
SELECT
REPLACE(DATETIME(A.start,'unixepoch'), ' ','T') Date_Time,
CASE A.SophosPID
WHEN '$$SophosPID$$' THEN A.ProcessName
ELSE substr('< < < < < < < < < < < < < < < < < < < < ', 1, A.Level * -2) || A.processName
END Process_Tree,
'-----------' Subject,
'-----------' Action,
'-----------' Object,
CAST(A.cmdline AS TEXT) Cmd_Line,
A.SophosPID SophosPID,
A.pathname Process_Pathname,
A.sha256 Process_SHA256,
A.SID Process_SID,
A.Level Level,
A.Row Row,
0 Sub_Row,
a.start time
FROM Orderd_Ancestors A
UNION ALL
-- SHOW THE PROCESS TREE INFO FOR DESCENDENTS
SELECT
REPLACE(DATETIME(D.start,'unixepoch'), ' ','T') Date_Time,
CASE D.SophosPID
WHEN '$$SophosPID$$' THEN D.ProcessName
ELSE substr('> > > > > > > > > > > > > > > > > > > > > > ', 1, D.Level * 2) || D.processName
END Process_Tree,
'-----------' Subject,
'-----------' Action,
'-----------' Object,
CAST(D.cmdline AS TEXT) Cmd_Line,
D.SophosPID SophosPID,
D.pathname Process_Pathname,
D.sha256 Process_SHA256,
D.SID Process_SID,
D.Level Level,
D.Row Row,
0 Sub_Row,
D.start time
FROM Orderd_Descendants D
UNION ALL
-- ADD THE PROCESS ACTIVITY FOR EACH DESCENDENT
SELECT
REPLACE(DATETIME(MIN(spa.time),'unixepoch'),' ','T') Date_Time,
CASE D.SophosPID
WHEN '$$SophosPID$$' THEN '( '||D.processname||' ) ACTIVITY'
ELSE substr('~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~', 1, D.Level * 2) || '( '||D.processname||' ) ACTIVITY'
END Process_Tree,
spa.subject Subject,
CAST(GROUP_CONCAT(DISTINCT spa.action)||' ('||COUNT(spa.action)||')' AS TEXT) Action,
spa.object,
CAST(D.cmdline AS TEXT) Cmd_Line,
D.SophosPID SophosPID,
D.pathname Process_Pathname,
D.sha256 Process_SHA256,
D.SID Process_SID,
'' Level,
D.Row Row,
1 Sub_Row,
spa.time time
-- NOTE The details for each process does not include 'FileDataReads', 'FileOtherReads', 'FileBinaryReads', 'Image', 'Thread'
FROM Orderd_Descendants D LEFT JOIN Sophos_Process_Activity spa ON -- spa.subject IN ('Thread','DirectoryChanges','Dns','FileBinaryChanges','FileDataChanges','FileOtherChanges','Http','Ip','Network','Url','Registry','Process')
-- AND
spa.SophosPID = D.SophosPID
AND spa.time > D.start-1
AND spa.time < D.end+1
WHERE spa.subject > ''
GROUP BY spa.subject, spa.action, spa.object, spa.SophosPID, D.processname, D.SophosPID, D.pathname, D.sha256, D.SID
)
-- Now that we have all activity for each descendent, we need to provide the pretty list showing the Process Tree and File activity for each process in the tree
SELECT Date_Time, Process_Tree, Subject, Action, Object, Cmd_Line, SophosPID, Process_Pathname, Process_SHA256, Process_SID, row, sub_row, time,ROW_NUMBER() OVER( ORDER BY Row, Sub_Row, Time) SORT_ORDER_FOR_EXCEL
FROM File_Activity
ORDER By SORT_ORDER_FOR_EXCEL