Source code for googleapiclienthelpers.iam

import tenacity

from .exceptions import is_retryable_exception

__all__ = [
    'add_binding',
    'get_policy',
    'get_role_bindings',
    'remove_binding',
]


[docs]def get_role_bindings(policy, role): '''Find the given role's binding from the given IAM policy. Given an IAM policy and a desired role, return the binding associated with that role. If the role is not present in the policy, return None. Args: policy: list, the result of any getIamPolicy method. role: string, the role to pull from the policy. Returns: A list of dictionaries, or None if the role is not present. ''' for binding in policy.get('bindings', ()): if binding['role'] == role: return binding return None
def _build_request_body(client, policy): ''' Most APIs expect a `SetIAMPolicyRequest` body, however storage.buckets.setIamPolicy expects just the `Policy` This checks the request schema and tries to build the appropriate body Args: client: Resource, any googleapiclient resource or subresource. policy: Policy, a google Policy object Returns: dict, request body required for setIamPolicy call to client ''' def _nested_get(mydict, query, default=None): res = mydict for key in query.split('.'): if isinstance(res, dict): res = res.get(key, None) else: return default return res request_body_schema = _nested_get( client._resourceDesc, 'methods.setIamPolicy.parameters.body.$ref' ) if request_body_schema == 'Policy': return policy # Default to the SetIAMPolicyRequest format return {'policy': policy} def _api_requires_empty_body(client): '''Does the given client's getIamPolicy require an empty body param? This function examines the given client to determine if its getIamPolicy method must be called with {} as the body. Args: client: Resource, any googleapiclient resource or subresource. Returns: bool, True if client.getIamPolicy requires {} in the body. ''' desc = client._resourceDesc.get('methods', {}) getIamPolicy = desc.get('getIamPolicy') if not getIamPolicy: raise ValueError('The provided client has no getIamPolicy method.') # collect names of any required params required = [k for k, v in getIamPolicy.get('parameters', {}).items() if v.get('required')] return 'body' in required
[docs]def get_policy(client, **kargs): '''Retrieve a policy with GET or POST, as required Some getIamPolicy methods are called with GET, and must have an empty body. Other getIamPolicy methods are called with POST, and must have a non-empty body equal to {}. This function provides a common interface to both. Args: client: Resource, any googleapiclient resource or subresource. **kargs: passed onto client.getIamPolicy Returns: The result of client.getIamPolicy(**kargs).execute() ''' if _api_requires_empty_body(client) and 'body' not in kargs: kargs['body'] = {} return client.getIamPolicy(**kargs).execute()
[docs]@tenacity.retry( retry=tenacity.retry_if_result(is_retryable_exception), wait=tenacity.wait_random_exponential(multiplier=0.2, max=10), stop=tenacity.stop_after_attempt(3), ) def add_binding(client, role, member, **kargs): '''Generic function to add an IAM binding to any GCP resource. This function provides a simple, generic way to add a member to a role for any resource. The caller must supply a client that provides getIamPolicy and setIamPolicy methods. It preserves the policy's etag to prevent concurrent updates. Args: client: Resource, any googleapiclient resource or subresource. role: string, name of the role to modify (e.g., roles/viewer). member: string, the member to add (e.g., user:email@domain.com, serviceAccount:wooo@yea.iam.gserviceaccount.com). **kargs: passed onto client's getIamPolicy and setIamPolicy. This must include parameter that identifies the resource whose policy will be modified. The parameter varies; check the docs on client.getIamPolicy for details. Returns: the result of client.setIamPolicy ''' policy = get_policy(client, **kargs) # if the policy is empty, create the bindings list if 'bindings' not in policy: policy['bindings'] = [] # get the bindings for the role and create/add as required bindings = get_role_bindings(policy, role) if bindings: if member in bindings['members']: return # already in the desired state bindings['members'].append(member) else: policy['bindings'].append({ 'role': role, 'members': [member], }) return client.setIamPolicy( body=_build_request_body(client, policy), **kargs ).execute()
[docs]@tenacity.retry( retry=tenacity.retry_if_result(is_retryable_exception), wait=tenacity.wait_random_exponential(multiplier=0.2, max=10), stop=tenacity.stop_after_attempt(3), ) def remove_binding(client, role, member, **kargs): '''Generic function to remove an IAM binding from any GCP resource. This function provides a simple, generic way to remove a member from a role for any resource. The caller must supply a client that provides getIamPolicy and setIamPolicy methods. It preserves the policy's etag to prevent concurrent updates. Args: client: Resource, any googleapiclient resource or subresource. role: string, name of the role to modify (e.g., roles/viewer). member: string, the member to add (e.g., user:email@domain.com, serviceAccount:wooo@yea.iam.gserviceaccount.com). **kargs: passed onto client's getIamPolicy and setIamPolicy. This must include parameter that identifies the resource whose policy will be modified. The parameter varies; check the docs on client.getIamPolicy for details. Returns: the result of client.setIamPolicy ''' policy = get_policy(client, **kargs) binding = get_role_bindings(policy, role) # if no binding was found, we're done if not binding: return try: binding.get('members', []).remove(member) # If there are no more members for this role binding, remove # binding from policy if len(binding.get('members')) == 0: policy['bindings'].remove(binding) except ValueError: return # no member to remove, we're done return client.setIamPolicy( body=_build_request_body(client, policy), **kargs ).execute()