asg_sentinel_policy/asg-tags.sentinel

101 lines
3.2 KiB
Plaintext

# This policy uses the Sentinel tfplan/v2 import to require that
# all aws_autoscaling_group instances have specified tag keys/values
# Import common-functions/tfplan-functions/tfplan-functions.sentinel
# with alias "plan"
import "tfplan/v2" as tfplan
#import "tfplan-functions" as plan
param actions default [
["no-op"],
["create"],
["update"],
]
# Mandatory tags. This should probably be a map that contains a list of acceptable values.
# e.g., mandatory_tags = { "Environments: [prod, dev, qa]"}
# at least in the context of asgs it would make more sense if there's more than one mandatory tag
mandatory_tags = ["Environments"]
# Allowed Environments
# Include "null" to allow missing or computed values
allowed_environments = ["tfe-monitoring"]
# Get asgs
allAwsAutoscalingGroups = filter tfplan.resource_changes as _, rc {
rc.provider_name matches "(.*)aws$" and
rc.type is "aws_autoscaling_group" and
rc.mode is "managed" and
rc.change.actions in actions
}
#verify that each element of a list is present in map m
#contains looks at keys when used with maps
contains_all_list_items = func(l, m) {
all_present = true
for l as i {
if m not contains i {
all_present = false
}
}
return all_present
}
# iterate over required keys (rk) that exist in a given map (m), and ensure that the value is in a provided list
# this won't actually work correctly for more than one required tag
tag_values_compliant = func(rk, m, l) {
values_compliant = true
for rk as key {
if l not contains m[key] else "ok" {
values_compliant = false
}
}
return values_compliant
}
# Merge tags on an aws_autoscaling_group resource. List of maps is tough to work with
merge_tags = func(lm) {
merged_tag = {}
for lm as l {
merged_tag[l.key] = l.value
}
return merged_tag
}
# Iterate over a set of resources, and filter to those without required keys (k) and values (v)
iterOverResources = func(resources, k, v, prtmsg) {
violators = {}
messages = {}
for resources as a, r {
merged_tags = merge_tags(r.change.after.tag)
all_required_keys_present = contains_all_list_items(k, merged_tags)
all_tag_values_compliant = tag_values_compliant(mandatory_tags, merged_tags, v)
if not all_required_keys_present {
message = string(a) + " is missing a required key from list: " + string(k) + " from tags " + string(merged_tags)
violators[a] = r
if prtmsg {
print("Not all required keys are present.")
print("Required keys:", k)
print("Resource keys:", keys(merged_tags))
}
}
if not all_tag_values_compliant {
violators[a] = r
if prtmsg {
print("tag values not compliant")
print("compliant key values: ", k)
print(a, " has a noncompliant tag value. Compliant values are:", v)
print("Resource tags:")
print(merged_tags)
}
}
}
return {"resources":violators, "messages":messages}
}
violations = length(iterOverResources(allAwsAutoscalingGroups, mandatory_tags, allowed_environments, true)["resources"])
# Main rule
main = rule {
violations is 0
}