Add new file operations mkdir, rename, delete and upload to drive service. (#291)
* Add new file operations mkdir, rename, upload and delete to drive service. The drive service only supports the bare minimum right now, improve this situation. Also support upload of new files to the iCloud Drive. * Apply suggestions from code review Co-authored-by: Quentame <polletquentin74@me.com> * Minor fix, return the right json part when calling mkdir and rename * Remove more %s indirections... * Run Black. Again... Co-authored-by: Quentame <polletquentin74@me.com>
This commit is contained in:
parent
852151ef5f
commit
d87ab69a4a
2 changed files with 164 additions and 3 deletions
14
README.rst
14
README.rst
|
@ -274,6 +274,20 @@ The ``open`` method will return a response object from which you can read the fi
|
|||
>>> with open(drive_file.name, 'wb') as file_out:
|
||||
>>> copyfileobj(response.raw, file_out)
|
||||
|
||||
To interact with files and directions the ``mkdir``, ``rename`` and ``delete`` functions are available
|
||||
for a file or folder:
|
||||
|
||||
>>> api.drive['Holiday Photos'].mkdir('2020')
|
||||
>>> api.drive['Holiday Photos']['2020'].rename('2020_copy')
|
||||
>>> api.drive['Holiday Photos']['2020_copy'].delete()
|
||||
|
||||
The ``upload`` method can be used to send a file-like object to the iCloud Drive:
|
||||
|
||||
>>> with open('Vacation.jpeg', 'rb') as file_in:
|
||||
>>>> api.drive['Holiday Photos'].upload(file_in)
|
||||
|
||||
It is strongly suggested to open file handles as binary rather than text to prevent decoding errors
|
||||
further down the line.
|
||||
|
||||
Photo Library
|
||||
=======================
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
"""Drive service."""
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
import time
|
||||
from re import search
|
||||
from six import PY2
|
||||
|
||||
|
@ -17,11 +20,11 @@ class DriveService(object):
|
|||
|
||||
def _get_token_from_cookie(self):
|
||||
for cookie in self.session.cookies:
|
||||
if cookie.name == "X-APPLE-WEBAUTH-TOKEN":
|
||||
if cookie.name == "X-APPLE-WEBAUTH-VALIDATE":
|
||||
match = search(r"\bt=([^:]+)", cookie.value)
|
||||
if not match:
|
||||
if match is None:
|
||||
raise Exception("Can't extract token from %r" % cookie.value)
|
||||
self.params.update({"token": match.group(1)})
|
||||
return {"token": match.group(1)}
|
||||
raise Exception("Token cookie not found")
|
||||
|
||||
def get_node_data(self, node_id):
|
||||
|
@ -53,6 +56,130 @@ class DriveService(object):
|
|||
url = response.json()["data_token"]["url"]
|
||||
return self.session.get(url, params=self.params, **kwargs)
|
||||
|
||||
def _get_upload_contentws_url(self, file_object):
|
||||
"""Get the contentWS endpoint URL to add a new file."""
|
||||
content_type = mimetypes.guess_type(file_object.name)[0]
|
||||
if content_type is None:
|
||||
content_type = ""
|
||||
|
||||
# Get filesize from file object
|
||||
orig_pos = file_object.tell()
|
||||
file_object.seek(0, os.SEEK_END)
|
||||
file_size = file_object.tell()
|
||||
file_object.seek(orig_pos, os.SEEK_SET)
|
||||
|
||||
file_params = self.params
|
||||
file_params.update(self._get_token_from_cookie())
|
||||
|
||||
request = self.session.post(
|
||||
self._document_root + "/ws/com.apple.CloudDocs/upload/web",
|
||||
params=file_params,
|
||||
headers={"Content-Type": "text/plain"},
|
||||
data=json.dumps(
|
||||
{
|
||||
"filename": file_object.name,
|
||||
"type": "FILE",
|
||||
"content_type": content_type,
|
||||
"size": file_size,
|
||||
}
|
||||
),
|
||||
)
|
||||
if not request.ok:
|
||||
return None
|
||||
return (request.json()[0]["document_id"], request.json()[0]["url"])
|
||||
|
||||
def _update_contentws(self, folder_id, sf_info, document_id, file_object):
|
||||
request = self.session.post(
|
||||
self._document_root + "/ws/com.apple.CloudDocs/update/documents",
|
||||
params=self.params,
|
||||
headers={"Content-Type": "text/plain"},
|
||||
data=json.dumps(
|
||||
{
|
||||
"data": {
|
||||
"signature": sf_info["fileChecksum"],
|
||||
"wrapping_key": sf_info["wrappingKey"],
|
||||
"reference_signature": sf_info["referenceChecksum"],
|
||||
"receipt": sf_info["receipt"],
|
||||
"size": sf_info["size"],
|
||||
},
|
||||
"command": "add_file",
|
||||
"create_short_guid": True,
|
||||
"document_id": document_id,
|
||||
"path": {
|
||||
"starting_document_id": folder_id,
|
||||
"path": file_object.name,
|
||||
},
|
||||
"allow_conflict": True,
|
||||
"file_flags": {
|
||||
"is_writable": True,
|
||||
"is_executable": False,
|
||||
"is_hidden": False,
|
||||
},
|
||||
"mtime": int(time.time()),
|
||||
"btime": int(time.time()),
|
||||
}
|
||||
),
|
||||
)
|
||||
if not request.ok:
|
||||
return None
|
||||
return request.json()
|
||||
|
||||
def send_file(self, folder_id, file_object):
|
||||
"""Send new file to iCloud Drive."""
|
||||
document_id, content_url = self._get_upload_contentws_url(file_object)
|
||||
|
||||
request = self.session.post(content_url, files={file_object.name: file_object})
|
||||
if not request.ok:
|
||||
return None
|
||||
content_response = request.json()["singleFile"]
|
||||
|
||||
self._update_contentws(folder_id, content_response, document_id, file_object)
|
||||
|
||||
def create_folders(self, parent, name):
|
||||
"""Creates a new iCloud Drive folder"""
|
||||
request = self.session.post(
|
||||
self._service_root + "/createFolders",
|
||||
params=self.params,
|
||||
headers={"Content-Type": "text/plain"},
|
||||
data=json.dumps(
|
||||
{
|
||||
"destinationDrivewsId": parent,
|
||||
"folders": [{"clientId": self.params["clientId"], "name": name,}],
|
||||
}
|
||||
),
|
||||
)
|
||||
return request.json()
|
||||
|
||||
def rename_items(self, node_id, etag, name):
|
||||
"""Renames an iCloud Drive node"""
|
||||
request = self.session.post(
|
||||
self._service_root + "/renameItems",
|
||||
params=self.params,
|
||||
data=json.dumps(
|
||||
{"items": [{"drivewsid": node_id, "etag": etag, "name": name,}],}
|
||||
),
|
||||
)
|
||||
return request.json()
|
||||
|
||||
def move_items_to_trash(self, node_id, etag):
|
||||
"""Moves an iCloud Drive node to the trash bin"""
|
||||
request = self.session.post(
|
||||
self._service_root + "/moveItemsToTrash",
|
||||
params=self.params,
|
||||
data=json.dumps(
|
||||
{
|
||||
"items": [
|
||||
{
|
||||
"drivewsid": node_id,
|
||||
"etag": etag,
|
||||
"clientId": self.params["clientId"],
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
)
|
||||
return request.json()
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
"""Returns the root node."""
|
||||
|
@ -128,12 +255,32 @@ class DriveNode(object):
|
|||
"""Gets the node file."""
|
||||
return self.connection.get_file(self.data["docwsid"], **kwargs)
|
||||
|
||||
def upload(self, file_object, **kwargs):
|
||||
""""Upload a new file."""
|
||||
return self.connection.send_file(self.data["docwsid"], file_object, **kwargs)
|
||||
|
||||
def dir(self):
|
||||
"""Gets the node list of directories."""
|
||||
if self.type == "file":
|
||||
return None
|
||||
return [child.name for child in self.get_children()]
|
||||
|
||||
def mkdir(self, folder):
|
||||
"""Create a new directory directory."""
|
||||
return self.connection.create_folders(self.data["drivewsid"], folder)
|
||||
|
||||
def rename(self, name):
|
||||
"""Rename an iCloud Drive item."""
|
||||
return self.connection.rename_items(
|
||||
self.data["drivewsid"], self.data["etag"], name
|
||||
)
|
||||
|
||||
def delete(self):
|
||||
"""Delete an iCloud Drive item."""
|
||||
return self.connection.move_items_to_trash(
|
||||
self.data["drivewsid"], self.data["etag"]
|
||||
)
|
||||
|
||||
def get(self, name):
|
||||
"""Gets the node child."""
|
||||
if self.type == "file":
|
||||
|
|
Loading…
Reference in a new issue