Caching
Panther's real-time analysis engine examines events one-by-one and sometimes it's helpful to keep state across invocations. To accommodate stateful checks, rules can cache values by using built-in helper functions.

Importing the Helpers

The first step is to import the open source helper library:
1
import panther_oss_helpers
Copied!
You may import specific functions:
1
from panther_oss_helpers import increment_counter
Copied!

Counters

To implement a counter-based rule, use one or more of the following functions:
  • get_counter: Get the latest counter value
  • increment_counter: Add to the counter (default of 1)
  • reset_counter: Reset the counter to 0
  • set_key_expiration: Set the lifetime of the counter
The rule below provides a demonstration of using counters.
1
from panther_oss_helpers import increment_counter, set_key_expiration, reset_counter
2
3
4
def rule(event):
5
# Filter to only analyze AccessDenied calls
6
if event.get('errorCode') != 'AccessDenied':
7
return False
8
9
# Create our counter key, which should be fairly unique
10
key = '{}-AccessDeniedCounter'.format(event['userIdentity'].get('arn'))
11
12
# Increment the counter, and then check the current value
13
hourly_error_count = increment_counter(key)
14
if hourly_error_count == 1:
15
set_key_expiration(time.time() + 3600)
16
elif failure_hourly_count >= 10:
17
# If it exceeds our threshold, reset and then return an alert
18
reset_counter(key)
19
return True
20
return False
Copied!

String Sets

To keep track of sets of strings, use the following functions:
  • get_string_set: Get the string set's current value
  • put_string_set: Overwrite a string set
  • add_to_string_set: Add one or more strings to a set
  • remove_from_string_set: Remove one or more strings from a set
  • reset_string_set: Empty the set
1
from panther_oss_helpers import add_to_string_set
2
3
4
def rule(event):
5
if event['eventName'] != 'AssumeRole':
6
return False
7
8
role_arn = event['requestParameters'].get('roleArn')
9
if not role_arn:
10
return False
11
12
role_arn_key = '{}-UniqueSourceIPs'.format(role_arn)
13
ip_addr = event['sourceIPAddress']
14
15
previously_seen_ips = get_string_set(role_arn_key)
16
17
# If this the only value, trust on first use
18
if len(previously_seen_ips) == 0:
19
add_to_string_set(role_arn_key, ip_addr)
20
return False
21
22
if ip_addr not in previously_seen_ips:
23
return True
24
25
return False
Copied!

Testing

Currently, CLI testing does not support mocking function calls.
Last modified 9mo ago