From d87ab69a4ae007225046e7cea711fe59b0423fcc Mon Sep 17 00:00:00 2001 From: Andreas Thienemann Date: Mon, 10 Aug 2020 19:09:48 +0200 Subject: [PATCH] Add new file operations mkdir, rename, delete and upload to drive service. (#291) * Add new file operations mkdir, rename, upload and delete to drive service. The drive service only supports the bare minimum right now, improve this situation. Also support upload of new files to the iCloud Drive. * Apply suggestions from code review Co-authored-by: Quentame * Minor fix, return the right json part when calling mkdir and rename * Remove more %s indirections... * Run Black. Again... Co-authored-by: Quentame --- README.rst | 14 ++++ pyicloud/services/drive.py | 153 ++++++++++++++++++++++++++++++++++++- 2 files changed, 164 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index e8f2f91..65e0075 100644 --- a/README.rst +++ b/README.rst @@ -274,6 +274,20 @@ The ``open`` method will return a response object from which you can read the fi >>> with open(drive_file.name, 'wb') as file_out: >>> copyfileobj(response.raw, file_out) +To interact with files and directions the ``mkdir``, ``rename`` and ``delete`` functions are available +for a file or folder: + +>>> api.drive['Holiday Photos'].mkdir('2020') +>>> api.drive['Holiday Photos']['2020'].rename('2020_copy') +>>> api.drive['Holiday Photos']['2020_copy'].delete() + +The ``upload`` method can be used to send a file-like object to the iCloud Drive: + +>>> with open('Vacation.jpeg', 'rb') as file_in: +>>>> api.drive['Holiday Photos'].upload(file_in) + +It is strongly suggested to open file handles as binary rather than text to prevent decoding errors +further down the line. Photo Library ======================= diff --git a/pyicloud/services/drive.py b/pyicloud/services/drive.py index 543a319..6b081c2 100644 --- a/pyicloud/services/drive.py +++ b/pyicloud/services/drive.py @@ -1,6 +1,9 @@ """Drive service.""" from datetime import datetime, timedelta import json +import mimetypes +import os +import time from re import search from six import PY2 @@ -17,11 +20,11 @@ class DriveService(object): def _get_token_from_cookie(self): for cookie in self.session.cookies: - if cookie.name == "X-APPLE-WEBAUTH-TOKEN": + if cookie.name == "X-APPLE-WEBAUTH-VALIDATE": match = search(r"\bt=([^:]+)", cookie.value) - if not match: + if match is None: raise Exception("Can't extract token from %r" % cookie.value) - self.params.update({"token": match.group(1)}) + return {"token": match.group(1)} raise Exception("Token cookie not found") def get_node_data(self, node_id): @@ -53,6 +56,130 @@ class DriveService(object): url = response.json()["data_token"]["url"] return self.session.get(url, params=self.params, **kwargs) + def _get_upload_contentws_url(self, file_object): + """Get the contentWS endpoint URL to add a new file.""" + content_type = mimetypes.guess_type(file_object.name)[0] + if content_type is None: + content_type = "" + + # Get filesize from file object + orig_pos = file_object.tell() + file_object.seek(0, os.SEEK_END) + file_size = file_object.tell() + file_object.seek(orig_pos, os.SEEK_SET) + + file_params = self.params + file_params.update(self._get_token_from_cookie()) + + request = self.session.post( + self._document_root + "/ws/com.apple.CloudDocs/upload/web", + params=file_params, + headers={"Content-Type": "text/plain"}, + data=json.dumps( + { + "filename": file_object.name, + "type": "FILE", + "content_type": content_type, + "size": file_size, + } + ), + ) + if not request.ok: + return None + return (request.json()[0]["document_id"], request.json()[0]["url"]) + + def _update_contentws(self, folder_id, sf_info, document_id, file_object): + request = self.session.post( + self._document_root + "/ws/com.apple.CloudDocs/update/documents", + params=self.params, + headers={"Content-Type": "text/plain"}, + data=json.dumps( + { + "data": { + "signature": sf_info["fileChecksum"], + "wrapping_key": sf_info["wrappingKey"], + "reference_signature": sf_info["referenceChecksum"], + "receipt": sf_info["receipt"], + "size": sf_info["size"], + }, + "command": "add_file", + "create_short_guid": True, + "document_id": document_id, + "path": { + "starting_document_id": folder_id, + "path": file_object.name, + }, + "allow_conflict": True, + "file_flags": { + "is_writable": True, + "is_executable": False, + "is_hidden": False, + }, + "mtime": int(time.time()), + "btime": int(time.time()), + } + ), + ) + if not request.ok: + return None + return request.json() + + def send_file(self, folder_id, file_object): + """Send new file to iCloud Drive.""" + document_id, content_url = self._get_upload_contentws_url(file_object) + + request = self.session.post(content_url, files={file_object.name: file_object}) + if not request.ok: + return None + content_response = request.json()["singleFile"] + + self._update_contentws(folder_id, content_response, document_id, file_object) + + def create_folders(self, parent, name): + """Creates a new iCloud Drive folder""" + request = self.session.post( + self._service_root + "/createFolders", + params=self.params, + headers={"Content-Type": "text/plain"}, + data=json.dumps( + { + "destinationDrivewsId": parent, + "folders": [{"clientId": self.params["clientId"], "name": name,}], + } + ), + ) + return request.json() + + def rename_items(self, node_id, etag, name): + """Renames an iCloud Drive node""" + request = self.session.post( + self._service_root + "/renameItems", + params=self.params, + data=json.dumps( + {"items": [{"drivewsid": node_id, "etag": etag, "name": name,}],} + ), + ) + return request.json() + + def move_items_to_trash(self, node_id, etag): + """Moves an iCloud Drive node to the trash bin""" + request = self.session.post( + self._service_root + "/moveItemsToTrash", + params=self.params, + data=json.dumps( + { + "items": [ + { + "drivewsid": node_id, + "etag": etag, + "clientId": self.params["clientId"], + } + ], + } + ), + ) + return request.json() + @property def root(self): """Returns the root node.""" @@ -128,12 +255,32 @@ class DriveNode(object): """Gets the node file.""" return self.connection.get_file(self.data["docwsid"], **kwargs) + def upload(self, file_object, **kwargs): + """"Upload a new file.""" + return self.connection.send_file(self.data["docwsid"], file_object, **kwargs) + def dir(self): """Gets the node list of directories.""" if self.type == "file": return None return [child.name for child in self.get_children()] + def mkdir(self, folder): + """Create a new directory directory.""" + return self.connection.create_folders(self.data["drivewsid"], folder) + + def rename(self, name): + """Rename an iCloud Drive item.""" + return self.connection.rename_items( + self.data["drivewsid"], self.data["etag"], name + ) + + def delete(self): + """Delete an iCloud Drive item.""" + return self.connection.move_items_to_trash( + self.data["drivewsid"], self.data["etag"] + ) + def get(self, name): """Gets the node child.""" if self.type == "file":