Intro
It sounds complicated, but it's actually easy because you only need to call three endpoints.
First you need to get all users in the organization:
GET /v1.0/users
Then all drives of each user:
GET /v1.0/users/{id}/drives
Finally drive's items:
GET /drives/{drive-id}/items/{item-id}/children
Prerequisites
Python modules
You need msgraph-sdk
and azure.identy
modules. They can be installed with pip:
pip install msgraph-sdk
pip install azure-identity
Entr app registration
Log into Entra Admin center and create a new application. Add and grant the Graph application permissions Files.Read.All
and User.Read.All
.
In the last step, create a new client secret for your application. Don't forget to copy the client secret. You will need it for authorization.
Implementation
Initialize the Graph service client with client secret:
import asyncio
from typing import List
from msgraph import GraphServiceClient
from msgraph.generated.drives.item.items.item.children.children_request_builder import ChildrenRequestBuilder
from msgraph.generated.users.users_request_builder import UsersRequestBuilder
from msgraph.generated.models.user import User
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
from kiota_abstractions.base_request_configuration import RequestConfiguration
from azure.identity import ClientSecretCredential
tenant_id = '<tenant_id>'
client_id = '<app_id>'
client_secret = '<client_secret>'
scopes = ['https://graph.microsoft.com/.default']
credential = ClientSecretCredential(tenant_id, client_id, client_secret)
graph_client = GraphServiceClient(credential, scopes)
Get all users
The first step is to retrieve all users in the organization. It's better to filter out guest users because the app can't access drives of guest users from another tenant.
async def get_all_users() -> List[User]:
users: List[User] = []
# return id and UPN, filter only members
query_params = UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
select = ["id","userPrincipalName"],
filter = "userType eq 'member'"
)
request_configuration = RequestConfiguration(
query_parameters = query_params,
)
# get first page
users_response = await graph_client.users.get(request_configuration)
if users_response:
for i in range(len(users_response.value)):
users.append(users_response.value[i])
# get next pages
while users_response is not None and users_response.odata_next_link is not None:
users_response = await graph_client.users.with_url(users_response.odata_next_link).get(request_configuration)
if users_response:
for i in range(len(users_response.value)):
users.append(users_response.value[i])
return users
The Python SDK doesn't have any paging component, so it's up to you to check @data.nextLink
and iterate through all pages to get all users.
Get all user's drives
In the next step, you need to get all drives of each user returned by get_all_users
:
drives = await graph_client.users.by_user_id(user.id).drives.get()
Get drive's items count
The last step is to count the items in a user's drive. The function will count the items in a specific folder and it will be reused to count the items in all subfolders recursively.
async def get_child_items_count(drive_id: str, drive_item_id: str) -> int:
items_count = 0
all_drive_item_folders_ids: List[str] = []
# get first page
drive_items_response = await graph_client.drives.by_drive_id(drive_id).items.by_drive_item_id(drive_item_id).children.get()
if drive_items_response:
items_count+=len(drive_items_response.value)
for i in range(len(drive_items_response.value)):
child_item = drive_items_response.value[i]
if child_item.folder is not None and child_item.folder.child_count > 0:
all_drive_item_folders_ids.append(child_item.id)
# get next pages
while drive_items_response is not None and drive_items_response.odata_next_link is not None:
drive_items_response = await graph_client.drives.by_drive_id(drive_id).items.by_drive_item_id(drive_item_id).children.with_url(drive_items_response.odata_next_link).get(request_configuration = request_configuration)
if drive_items_response:
items_count+=len(drive_items_response.value)
for i in range(len(drive_items_response.value)):
child_item = drive_items_response.value[i]
if child_item.folder is not None and child_item.folder.child_count > 0:
all_drive_item_folders_ids.append(child_item.id)
# count items in subfolders
for i in range(len(all_drive_item_folders_ids)):
items_count += await get_child_items_count(drive_id, all_drive_item_folders_ids[i])
return items_count
The function has two parameters, drive_id
which is the id of the drive and drive_item_id
which is the id of the drive's item that represents a folder. all_drive_item_folders_ids
contains child folders ids of the current folder.
Similar to fetching all users, when reading child items in a folder you must iterate through all pages by checking @data.nextLink
.
items_count
is incremented by the number of items in the page. If the item is a folder, its id is added to the all_drive_item_folders_ids
list.
The last step is to iterate through all folders ids and call get_child_items_count
.
Call all functions
We are almost done. The final step is to call all functions and print the results:
async def get_users_drives_files_count():
users = await get_all_users()
if users:
for i in range(len(users)):
user = users[i]
try:
drives = await graph_client.users.by_user_id(user.id).drives.get()
if drives:
for i in range(len(drives.value)):
drive = drives.value[i]
items_count = await get_child_items_count(drive.id, 'root')
print(f"{user.user_principal_name}: drive '{drive.name}' has {items_count} item(s)")
except ODataError as e:
print(f"Failed for user {user.user_principal_name} ({user.id}). {e.primary_message}")
As you can see get_child_items_count
is called for each drive and the root folder, which is the top-level folder in the drive.
When the Graph API call fails, the ODataError
exception is thrown.
Use asyncio.run
to call the get_users_drives_files_count
function:
asyncio.run(get_users_drives_files_count())
The output will look like this:
JohnDoe@contoso.onmicrosoft.com: drive 'OneDrive' has 24 item(s)
PattiF@contoso.onmicrosoft.com: drive 'OneDrive' has 100 item(s)
LeeG@contoso.onmicrosoft.com: drive 'OneDrive' has 256 item(s)
MiriamG@contoso.onmicrosoft.com: drive 'OneDrive' has 0 item(s)
AlexW@contoso.onmicrosoft.com: drive 'OneDrive' has 3 item(s)
AdeleV@contoso.onmicrosoft.com: drive 'OneDrive' has 10000 item(s)
Failed for Room_5.01@contoso.onmicrosoft.com. User's mysite not found.
Failed for Vacation@contoso.onmicrosoft.com. User's mysite not found.
The whole code is available in the GitHub repository
Conclusion
Reading users, drives and drive items is pretty straightforward. The only thing you need to remember is to iterate through all pages when reading users and drive items. The Python SDK doesn't have any paging component, so you need to check @odata.nextLink
and call the next page manually.