Hi Everyone
I have been trying to work pagination to return more than 100 results per query.
As per the api doc then each get_results() query returns a nextPage key which is a base64 string and I have been able to retrive this.
This should then be pased to the results request in the url as below:
/xdr-query/v1/queries/runs/{run id}/results?nextKey={page key}
However sending the base64 string did not work and I get the error below
{
"error": "InvalidOperationException",
"correlationId": "something",
"requestId": "something",
"createdAt": "2020-10-26T14:06:24.975Z",
"code": 400,
"message": "Failed to get QueryResults Bad Request"
}Using the query builder in the docs I can see that the page get gets URL encoded. however doing this also did not work, same error.
Finally I have resorted to tring to use burp suite to base64 decode and then URL encode and I still get the same error.
Anyone got any ideas? or know if this feature is not fully implemented yet?
My Custome API query file:
# Copyright 2020 Sophos Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import json
import logging
import requests
from retry import retry
MAX_RESULTS = 1000
class ApiError(Exception):
pass
class XDRQueryAPI:
def __init__(self):
self.query_success = 201
self.executions_route = 'xdr-query/v1/queries/runs'
self.content_type = 'application/json'
self.query_template = '''{"tenantIds": ["%s"],"deviceIds":[],"queryFormat":"sql","queryText":%s}'''
self.environment_urls = {'whoamiURL': 'https://api.central.sophos.com/whoami/v1',
'tokenURL': 'https://id.sophos.com/api/v2/oauth2/token'}
self.json_config = ''
def string_to_urls(self, env: str):
if env == '':
return self.environment_urls
if env not in self.json_config:
raise ApiError('Environment not in config')
return self.json_config[env]
def service_request_no_client_certs(self, method, url, payload, timeout, headers):
try:
with requests.Session() as session:
data = payload if payload is not None else None
req = requests.Request(method, url, data=data, headers=headers)
prepped = session.prepare_request(req)
resp = session.send(prepped, timeout=timeout, verify=True)
except requests.RequestException as e:
raise ApiError(e.strerror)
data = resp.content
status = resp.status_code
return data, status, dict(resp.headers)
def start_query(self, query, url, headers):
logging.debug('Running query: ' + query)
headers['Content-Type'] = self.content_type
executions_url = url + '/' + self.executions_route
logging.info('Querying reporting api using ' + executions_url)
data, status, _ = self.service_request_no_client_certs('POST', executions_url, query, 10, headers)
if status != self.query_success:
logging.error('Query report status was %d data: %s', status, data)
raise ApiError('Failed to run query')
response_json = json.loads(data)
logging.info('Query report response: ' + str(response_json))
return response_json['id']
@retry(tries=60, delay=1, logger=None)
def wait_complete_reporting_status(self, execution_id, url, headers):
status_url = url + '/' + self.executions_route + '/' + execution_id
logging.debug('Checking query status using ' + status_url)
data, _, _ = self.service_request_no_client_certs('GET', status_url, None, 10, headers)
response_json = json.loads(data)
logging.debug('Response json: ' + str(response_json))
if response_json['status'].lower() != 'finished':
raise ApiError(response_json['status'] + ' is not finished')
logging.info('Query status at ' + status_url + ' complete')
logging.info('Query status response: ' + str(response_json))
return response_json['result'].lower() == 'succeeded'
def get_results(self, execution_id, url, headers, next_page = None):
if (next_page == None):
result_url = url + '/' + self.executions_route + '/' + execution_id + '/results'
else:
result_url = url + '/' + self.executions_route + '/' + execution_id + '/results?nextKey='+next_page
#print(result_url)
#print(headers)
logging.info('Checking query results using ' + result_url)
data, status, _ = self.service_request_no_client_certs('GET', result_url, None, 10, headers)
logging.debug('Reporting results: ' + str(status))
if not status == 200:
logging.error('Reporting results failed: ' + str(status))
logging.error('Raw data: ' + str(data))
error = 'Get result failed error code: ' + str(status)
try:
data = json.loads(data)
if 'message' in data:
error = error + ', message: ' + data['message']
except json.JSONDecodeError:
pass
raise ApiError(error)
return data
def read_query_file(self, file):
with open(file, 'r') as f:
query = f.read()
return query
def run_query(self, query_text, tenant_id, url: str, authorization: str, tabulate_result=True):
headers = {
'Authorization': 'Bearer ' + authorization,
'X-Tenant-Id': tenant_id,
}
templated_query = self.query_template % (tenant_id, json.dumps(query_text))
execution_id = self.start_query(templated_query, url, headers)
status = self.wait_complete_reporting_status(execution_id, url, headers)
if not status:
logging.error('Query failed')
else:
logging.info('Query run successfully')
# read first page of results
results_json = json.loads(self.get_results(execution_id, url, headers))
# spilt into meta data and itmes
results = results_json['items']
# get next page key
next_page = results_json['pages']['nextKey']
#num_pages = results_json['pages']['total']
print(str(next_page))
# Read first page (100 results)
results_full = results
results_size = len(results)
# loop reading of rest of pages
while len(results) > 0 and results_size < MAX_RESULTS and next_page != None:
# read page
results_json = json.loads(self.get_results(execution_id, url, headers, str(next_page)))
# get next page key
next_page = results_json['pages']['nextKey']
results_size += len(results)
results_full += results_json['items']
return results_full
def generate_token(self, client_id, client_secret, env=''):
url = self.string_to_urls(env)['tokenURL']
if not url:
raise ApiError('No valid url found for env')
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
body = f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&scope=token'
data, status, _ = self.service_request_no_client_certs('POST', url, body, 10, headers)
if not status == 200:
raise ApiError('Get Token failed with error: ' + str(status))
data = json.loads(data)
if 'access_token' not in data:
raise ApiError('Response does not contain access token')
return data['access_token']
def get_whoami(self, authorization: str, env=''):
url = self.string_to_urls(env)['whoamiURL']
if not url:
raise ApiError('No valid url found for env')
headers = {
'Authorization': 'Bearer ' + authorization,
}
data, status, _ = self.service_request_no_client_certs('GET', url, None, 10, headers)
if not status == 200:
raise ApiError('Who ami failed with error: ' + str(status))
data = json.loads(data)
if 'apiHosts' not in data:
raise ApiError('Could not get api hosts')
if 'dataRegion' not in data['apiHosts']:
raise ApiError('Could not get data regions')
if 'id' not in data:
raise ApiError('Could not get id')
if 'idType' not in data:
raise ApiError('Could not get id type')
return data
def validate_config(self, config):
if config == '':
raise ApiError('Config loaded is empty')
for environment in config:
if 'whoamiURL' not in config[environment]:
raise ApiError('whoamiURL not found in ' + environment)
if 'tokenURL' not in config[environment]:
raise ApiError('tokenURL not found in ' + environment)
def load_config(self, filename):
loaded_config = ''
with open(filename, 'r') as f:
loaded_config = json.loads(f.read())
self.validate_config(loaded_config)
self.json_config = loaded_config
def create_logger(level):
numeric_level = logging.INFO
if level:
parsed_level = getattr(logging, level.upper(), None)
if isinstance(parsed_level, int):
numeric_level = parsed_level
logging.basicConfig(format='%(asctime)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=numeric_level)
if not isinstance(parsed_level, int):
logging.warning('Invalid log level argument: ' + level)
def parse_args():
parser = argparse.ArgumentParser(description='Argument Parser for sending queries to the reporting api')
parser.add_argument('-f', '--query_file', type=str.lower, help='The file containing the query json', required=True)
parser.add_argument('-t', '--tenant_id', type=str.lower, help='The tenant id', required=False)
parser.add_argument('-l', '--log_level', type=str.lower, help='Log level: debug ,info, warning, error',
required=False, default='info')
parser.add_argument('-o', '--output_file', type=str.lower, help='The output file to write the result to',
required=False)
parser.add_argument('-c', '--config', type=str.lower, help='The config file')
parser.add_argument('-e', '--environment', type=str.lower, help='The environment', default='')
parser.add_argument('-id', '--client_id', type=str.lower, help='The client id', required=True)
parser.add_argument('-s', '--client_secret', type=str.lower, help='The client secret', required=True)
return parser.parse_args()
def main():
args = parse_args()
query_api = XDRQueryAPI()
create_logger(args.log_level)
tenant_id = args.tenant_id
query = query_api.read_query_file(args.query_file)
output_file = args.output_file
try:
if args.config:
logging.info('Loading config file...')
query_api.load_config(args.config)
env = args.environment
logging.info('Getting authorization token...')
token = query_api.generate_token(args.client_id, args.client_secret, env)
logging.debug('Token: %s', token)
logging.info('Getting whoami...')
whoami = query_api.get_whoami(token, env)
url = whoami['apiHosts']['dataRegion']
logging.debug('Url: %s', url)
if whoami['idType'] == 'tenant':
if tenant_id and tenant_id != whoami['id']:
logging.error('Provided tenant ID does not match whoami response')
return
tenant_id = whoami['id']
elif not tenant_id:
logging.error('Provided tenant ID does not match whoami response')
return
logging.debug('Tenant ID: %s', tenant_id)
# get results
results = query_api.run_query(query, tenant_id, url, token)
# get json string of results
json_out = json.dumps(results)
#logging.info('Results:\n' + str(json_out))
if output_file:
with open(output_file, 'wb') as f:
# write json to file as bytes.
f.write(json_out.encode())
except (ApiError, FileNotFoundError) as e:
logging.error(str(e))
if __name__ == '__main__':
main()