Client represents a higher level interface to datasets API.
abeja.datalake.
Client
(organization_id: Optional[str] = None, credential: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, max_retry_count: Optional[int] = None)¶A high-level client for Datalake API
from abeja.datalake import Client
client = Client()
channels
¶Get channel objects
channels = client.channels
Channels
object
get_channel
(channel_id) → abeja.datalake.channel.Channel¶Get channel for specific channel_id
channel = client.get_channel(channel_id='1111111111111')
channel_id (str): channel id
Channel
object
abeja.datalake.channel.
Channel
(api: abeja.datalake.api.client.APIClient, organization_id: str, channel_id: str, name: Optional[str] = None, description: Optional[str] = None, display_name: Optional[str] = None, storage_type: Optional[str] = None, created_at: Optional[str] = None, updated_at: Optional[str] = None, archived: bool = False)¶a model class for a channel
organization_id (str)
channel_id (str)
name (str)
display_name (str)
description (str)
archived (bool)
created_at (datetime)
updated_at (datetime)
add_datasource
()¶files
¶Get datalake Files object
channel = client.get_channel(channel_id='1230000000000')
channel.files
Files
object
get_file
(file_id: str) → abeja.datalake.file.DatalakeFile¶get a datalake file in the channel
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
file_id (str): FILE_ID
DatalakeFile
object
list_datasources
()¶list_files
(start: Optional[str] = None, end: Optional[str] = None, timezone: Optional[str] = None, sort: Optional[str] = None, next_page_token: Optional[str] = None, limit: Optional[int] = None, prefetch: bool = False, query: Optional[str] = None) → abeja.datalake.file.FileIterator¶get datalake files in the channel
for f in channel.list_files():
pass
start (str): start date of target uploaded files
end (str): end date of target uploaded files
timezone (str): timezone of specified start and end date
query to search. It is possible to filter what contain specific value by describing like “x-abeja-meta-filename:filename”.
the order of the file list. multiple items can be specified by separating with commas (,). It is possible to sort in descending order by specifying a hyphen (-) in front of the item. By default, the list is sorted by uploaded_at in ascending order.
FileIterator
object
remove_datasource
()¶upload
(file_obj: _io.BytesIO, content_type: str, metadata: Optional[dict] = None, lifetime: Optional[str] = None, conflict_target: Optional[str] = None) → abeja.datalake.file.DatalakeFile¶upload a content to a channel with file-like object.
content_type = 'image/jpeg'
metadata = {
'label': 'example'
}
with open('example.csv') as f:
response = channel.upload(f, content_type, metadata=metadata)
file_obj (a file-like object) : a file-like object to upload. It must implement the read method, and must return bytes.
content_type (str): MIME type of content.
metadata (dict): [optional] metadata to be added to uploaded file. Object can not be set to the key or value of dict. It must be a string.
the file will be deleted after the specified time.
conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.
DatalakeFile
object
a file uploaded to a channel
upload_dir
(dir_path: str, metadata: Optional[dict] = None, content_type: Optional[str] = None, lifetime: Optional[str] = None, conflict_target: Optional[str] = None, recursive: bool = False, use_thread: bool = True) → Iterable[abeja.datalake.file.DatalakeFile]¶upload files in directory to a channel. This method infers the content_type of given file if content_type is not specified, and set the filename as x-abeja-meta-filename in metadata.
Note: this method returns list ( not generator ) to make sure upload process will be done here.
metadata = {
'label': 'example'
}
response = channel.upload_dir('./source_dir', metadata)
content (file-like object) : contents to be uploaded
metadata (dict): metadata to be added to uploaed file. [optional]
content_type (str): MIME type of content. Content-Type is assumed by extensions if not specified [optional]
the file will be deleted after the specified time.
conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.
list of DatalakeFile
object
A list of DatalakeFile successfully uploaded.
upload_file
(file_path: str, metadata: Optional[dict] = None, content_type: Optional[str] = None, lifetime: Optional[str] = None, conflict_target: Optional[str] = None) → abeja.datalake.file.DatalakeFile¶upload a file to a channel. This method infers the content_type of given file if content_type is not specified, and set the filename as x-abeja-meta-filename in metadata.
metadata = {
'label': 'example'
}
response = channel.upload('~/example.txt', metadata=metadata)
file_path (str) : path to a file
metadata (dict): [optional] metadata to be added to uploaed file.
content_type (str): [optional] MIME type of content. Content-Type is assumed by the extension if not specified.
the file will be deleted after the specified time.
conflict_target (str): [optional] return 409 Conflict when the same value of specified key already exists in channel.
DatalakeFile
object
a file uploaded to a channel
abeja.datalake.channel.
Channels
(api: abeja.datalake.api.client.APIClient, organization_id: str)¶a class for handling channels
create
(name: str, description: str, storage_type: str) → abeja.datalake.channel.Channel¶create a channel
API reference: POST /organizations/<organization_id>/channels/
params = {
"name": "test-channel",
"description": "test channel",
"storage_type": "datalake"
}
channel = channels.create(**params)
name (str): channel name
description (str): channel description
storage_type (str): type of storage, datalake or file
Channel
object
get
(channel_id: str) → abeja.datalake.channel.Channel¶get a channel
API reference: GET /organizations/<organization_id>/channels/<channel_id>
channel_id = '1234567890123'
channel = channels.get(channel_id=channel_id)
channel_id (str): identifier of channel
Channel
object
list
(limit: Optional[int] = None, offset: Optional[int] = None) → Iterable[abeja.datalake.channel.Channel]¶list channels
API reference: GET /organizations/<organization_id>/channels/
channel = channels.list()
generator of Channel
objects
patch
(channel_id: str, name: Optional[str] = None, description: Optional[str] = None) → abeja.datalake.channel.Channel¶patch a channel
API reference: PATCH /organizations/<organization_id>/channels/<channel_id>
params = {
"channel_id": "1234567890123",
"name": "updated_name",
"description": "updated description"
}
channel = channels.patch(**params)
channel_id (str): identifier of channel
name (str): channel name
description (str): channel description
Channel
object
abeja.datalake.file.
DatalakeFile
(api: abeja.datalake.api.client.APIClient, organization_id: Optional[str] = None, channel_id: Optional[str] = None, file_id: Optional[str] = None, uri: Optional[str] = None, type: Optional[str] = None, upload_url: Optional[str] = None, download_uri: Optional[str] = None, content_type: Optional[str] = None, url_expires_on: Optional[str] = None, metadata: Optional[dict] = None, uploaded_at: Optional[str] = None, lifetime: Optional[str] = None, **kwargs)¶a model class for a datalake channel file
if the file exists in local, get data from the file. unless, get data from remote, and save it in local.
the file is saved in ./{channel_id}/{file_id} by default.
you can change the location by setting ABEJA_STORAGE_DIR_PATH as environment variable. then it will be saved in ${ABEJA_STORAGE_DIR_PATH}/{channel_id}/{file_id}.
organization_id (str)
channel_id (str)
file_id (str)
uri (str)
type (str)
upload_url (str)
download_uri (str)
content_type (str)
metadata (dict)
url_expires_on (str)
uploaded_at (datetime)
LIFETIME
= ('1day', '1week', '1month', '6months', '1year')¶commit
() → bool¶reflect instance info into remote state. only metadata, lifetime are editable for now.
Optional[bool] : True if succeeded in update
get_content
(cache: bool = True) → bytes¶Get content from a binary file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_content()
if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.
bytes
get_file_info
() → dict¶Get information of a file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_file_info()
dict
Response Syntax:
{
"url_expires_on": "2017-12-20T17:08:26+00:00",
"uploaded_at": "2017-12-18T05:39:47+00:00",
"metadata": {
"x-abeja-meta-filename": "test.jpg"
},
"file_id": "20171218T053947-821bd0a3-3992-4320-bc1c-1ee8d0a0ad6b",
"download_uri": "...",
"content_type": "image/jpeg"
}
get_iter_content
(cache: bool = True, chunk_size: int = 1048576) → Generator[bytes, None, None]¶Get content iteratively from a binary file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_iter_content()
if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.
The number of bytes it should read into memory. default value : 1,048,576 ( = 1MB )
generator
get_iter_lines
(cache: bool = True) → Generator[str, None, None]¶Get lines iteratively from a text file
if the text file exists in local, get content from the file. unless, get content from remote, and save it in local.
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_iter_lines()
if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.
generator
get_json
() → dict¶Get json from a file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_json()
dict
json.decoder.JSONDecodeError
get_text
(cache: bool = True, encoding: Optional[str] = None) → str¶Get content from a text file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_text()
if True, read file saved in [ABEJA_STORAGE_DIR_PATH]/[channel_id]/[file_id] if exists, and if not, downloaded content will be saved in the path. By default, True.
Specify to get text encoded in other than ISO-8859-1.
str
lifetime
¶to_source_data
() → Dict[str, str]¶Convert to source data format
dict