How to count files in a users drives using Microsoft Graph SDK for Python

Intro

It sounds complicated, but it's actually easy because you only need to call three endpoints.

First you need to get all users in the organization:

GET /v1.0/users

Then all drives of each user:

GET /v1.0/users/{id}/drives

Finally drive's items:

GET /drives/{drive-id}/items/{item-id}/children

Prerequisites

Python modules

You need msgraph-sdk and azure.identy modules. They can be installed with pip:

pip install msgraph-sdk
pip install azure-identity

Entr app registration

Log into Entra Admin center and create a new application. Add and grant the Graph application permissions Files.Read.All and User.Read.All.

In the last step, create a new client secret for your application. Don't forget to copy the client secret. You will need it for authorization.

Implementation

Initialize the Graph service client with client secret:

import asyncio
from typing import List
from msgraph import GraphServiceClient
from msgraph.generated.drives.item.items.item.children.children_request_builder import ChildrenRequestBuilder
from msgraph.generated.users.users_request_builder import UsersRequestBuilder
from msgraph.generated.models.user import User
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
from kiota_abstractions.base_request_configuration import RequestConfiguration
from azure.identity import ClientSecretCredential

tenant_id = '<tenant_id>'
client_id = '<app_id>'
client_secret = '<client_secret>'

scopes = ['https://graph.microsoft.com/.default']

credential = ClientSecretCredential(tenant_id, client_id, client_secret)
graph_client = GraphServiceClient(credential, scopes)

Get all users

The first step is to retrieve all users in the organization. It's better to filter out guest users because the app can't access drives of guest users from another tenant.

async def get_all_users() -> List[User]:
    users: List[User] = []

    # return id and UPN, filter only members
    query_params = UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
        select = ["id","userPrincipalName"],
        filter = "userType eq 'member'"
    )
    request_configuration = RequestConfiguration(
        query_parameters = query_params,
    )

    # get first page
    users_response = await graph_client.users.get(request_configuration)
    if users_response:
        for i in range(len(users_response.value)):
            users.append(users_response.value[i])

    # get next pages
    while users_response is not None and users_response.odata_next_link is not None:
        users_response = await graph_client.users.with_url(users_response.odata_next_link).get(request_configuration)
        if users_response:
            for i in range(len(users_response.value)):
                users.append(users_response.value[i])

    return users

The Python SDK doesn't have any paging component, so it's up to you to check @data.nextLink and iterate through all pages to get all users.

Get all user's drives

In the next step, you need to get all drives of each user returned by get_all_users:

drives = await graph_client.users.by_user_id(user.id).drives.get()

Get drive's items count

The last step is to count the items in a user's drive. The function will count the items in a specific folder and it will be reused to count the items in all subfolders recursively.

async def get_child_items_count(drive_id: str, drive_item_id: str) -> int:
    items_count = 0
    all_drive_item_folders_ids: List[str] = []
    
    # get first page
    drive_items_response = await graph_client.drives.by_drive_id(drive_id).items.by_drive_item_id(drive_item_id).children.get()
    if drive_items_response:
        items_count+=len(drive_items_response.value)
        for i in range(len(drive_items_response.value)):
            child_item = drive_items_response.value[i]
            if child_item.folder is not None and child_item.folder.child_count > 0:
                all_drive_item_folders_ids.append(child_item.id)

    # get next pages
    while drive_items_response is not None and drive_items_response.odata_next_link is not None:
        drive_items_response = await graph_client.drives.by_drive_id(drive_id).items.by_drive_item_id(drive_item_id).children.with_url(drive_items_response.odata_next_link).get(request_configuration = request_configuration)
        if drive_items_response:
            items_count+=len(drive_items_response.value)
            for i in range(len(drive_items_response.value)):
                child_item = drive_items_response.value[i]
                if child_item.folder is not None and child_item.folder.child_count > 0:
                    all_drive_item_folders_ids.append(child_item.id)

    # count items in subfolders
    for i in range(len(all_drive_item_folders_ids)):
        items_count += await get_child_items_count(drive_id, all_drive_item_folders_ids[i])
    return items_count

The function has two parameters, drive_id which is the id of the drive and drive_item_id which is the id of the drive's item that represents a folder. all_drive_item_folders_ids contains child folders ids of the current folder.

Similar to fetching all users, when reading child items in a folder you must iterate through all pages by checking @data.nextLink.

items_count is incremented by the number of items in the page. If the item is a folder, its id is added to the all_drive_item_folders_ids list.

The last step is to iterate through all folders ids and call get_child_items_count.

Call all functions

We are almost done. The final step is to call all functions and print the results:

async def get_users_drives_files_count():
    users = await get_all_users()
    if users:
        for i in range(len(users)):
            user = users[i]
            try:
                drives = await graph_client.users.by_user_id(user.id).drives.get()
                if drives:
                    for i in range(len(drives.value)):
                        drive = drives.value[i]
                        items_count = await get_child_items_count(drive.id, 'root')
                        print(f"{user.user_principal_name}: drive '{drive.name}' has {items_count} item(s)")
            except ODataError as e:
                print(f"Failed for user {user.user_principal_name} ({user.id}). {e.primary_message}")

As you can see get_child_items_count is called for each drive and the root folder, which is the top-level folder in the drive.

When the Graph API call fails, the ODataError exception is thrown.

Use asyncio.run to call the get_users_drives_files_count function:

asyncio.run(get_users_drives_files_count())

The output will look like this:

JohnDoe@contoso.onmicrosoft.com: drive 'OneDrive' has 24 item(s)
PattiF@contoso.onmicrosoft.com: drive 'OneDrive' has 100 item(s)
LeeG@contoso.onmicrosoft.com: drive 'OneDrive' has 256 item(s)
MiriamG@contoso.onmicrosoft.com: drive 'OneDrive' has 0 item(s)
AlexW@contoso.onmicrosoft.com: drive 'OneDrive' has 3 item(s)
AdeleV@contoso.onmicrosoft.com: drive 'OneDrive' has 10000 item(s)
Failed for Room_5.01@contoso.onmicrosoft.com. User's mysite not found.
Failed for Vacation@contoso.onmicrosoft.com. User's mysite not found.

The whole code is available in the GitHub repository

Conclusion

Reading users, drives and drive items is pretty straightforward. The only thing you need to remember is to iterate through all pages when reading users and drive items. The Python SDK doesn't have any paging component, so you need to check @odata.nextLink and call the next page manually.

1
Buy Me a Coffee at ko-fi.com
An error has occurred. This application may no longer respond until reloaded. Reload x