Client represents a higher level interface to datasets API.
abeja.datalake.
Client
(organization_id: typing.Union[str, NoneType] = None, credential: typing.Union[typing.Dict[str, str], NoneType] = None, timeout: typing.Union[int, NoneType] = None, max_retry_count: typing.Union[int, NoneType] = None) → None¶A high-level client for Datalake API
from abeja.datalake import Client
client = Client()
abeja.datalake.channel.
Channel
(api: abeja.datalake.api.client.APIClient, organization_id: str, channel_id: str, name: str = None, description: str = None, display_name: str = None, storage_type: str = None, created_at: str = None, updated_at: str = None, archived: bool = False) → None¶a model class for a channel
add_datasource
()¶files
¶Get datalake Files object
channel = client.get_channel(channel_id='1230000000000')
channel.files
Files
objectget_file
(file_id: str) → abeja.datalake.file.DatalakeFile¶get a datalake file in the channel
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
DatalakeFile
objectlist_datasources
()¶list_files
(start: str = None, end: str = None, timezone: str = None, sort: str = None, next_page_token: str = None, limit: int = None, prefetch: bool = False, query: str = None) → abeja.datalake.file.FileIterator¶get datalake files in the channel
for f in channel.list_files():
pass
FileIterator
objectremove_datasource
()¶upload
(file_obj: _io.BytesIO, content_type: str, metadata: dict = None, lifetime: str = None, conflict_target: str = None) → abeja.datalake.file.DatalakeFile¶upload a content to a channel with file-like object.
content_type = 'image/jpeg'
metadata = {
'label': 'example'
}
with open('example.csv') as f:
response = channel.upload(f, content_type, metadata=metadata)
DatalakeFile
objectupload_dir
(dir_path: str, metadata: dict = None, content_type: str = None, lifetime: str = None, conflict_target: str = None, recursive: bool = False, use_thread: bool = True) → typing.Iterable[abeja.datalake.file.DatalakeFile]¶upload files in directory to a channel. This method infers the content_type of given file if content_type is not specified, and set the filename as x-abeja-meta-filename in metadata.
Note: this method returns list ( not generator ) to make sure upload process will be done here.
metadata = {
'label': 'example'
}
response = channel.upload_dir('./source_dir', metadata)
DatalakeFile
objectupload_file
(file_path: str, metadata: dict = None, content_type: str = None, lifetime: str = None, conflict_target: str = None) → abeja.datalake.file.DatalakeFile¶upload a file to a channel. This method infers the content_type of given file if content_type is not specified, and set the filename as x-abeja-meta-filename in metadata.
metadata = {
'label': 'example'
}
response = channel.upload('~/example.txt', metadata=metadata)
DatalakeFile
objectabeja.datalake.channel.
Channels
(api: abeja.datalake.api.client.APIClient, organization_id: str) → None¶a class for handling channels
create
(name: str, description: str, storage_type: str) → abeja.datalake.channel.Channel¶create a channel
API reference: POST /organizations/<organization_id>/channels/
params = {
"name": "test-channel",
"description": "test channel",
"storage_type": "datalake"
}
channel = channels.create(**params)
Channel
objectget
(channel_id: str) → abeja.datalake.channel.Channel¶get a channel
API reference: GET /organizations/<organization_id>/channels/<channel_id>
channel_id = '1234567890123'
channel = channels.get(channel_id=channel_id)
Channel
objectlist
(limit: int = None, offset: int = None) → typing.Iterable[abeja.datalake.channel.Channel]¶list channels
API reference: GET /organizations/<organization_id>/channels/
channel = channels.list()
Channel
objectspatch
(channel_id: str, name: str = None, description: str = None) → abeja.datalake.channel.Channel¶patch a channel
API reference: PATCH /organizations/<organization_id>/channels/<channel_id>
params = {
"channel_id": "1234567890123",
"name": "updated_name",
"description": "updated description"
}
channel = channels.patch(**params)
Channel
objectabeja.datalake.file.
DatalakeFile
(api: abeja.datalake.api.client.APIClient, organization_id: str = None, channel_id: str = None, file_id: str = None, uri: str = None, type: str = None, upload_url: str = None, download_uri: str = None, content_type: str = None, url_expires_on: str = None, metadata: dict = None, uploaded_at: str = None, lifetime: str = None, **kwargs) → None¶a model class for a datalake channel file
if the file exists in local, get data from the file. unless, get data from remote, and save it in local.
the file is saved in ./{channel_id}/{file_id} by default.
you can change the location by setting ABEJA_STORAGE_DIR_PATH as environment variable. then it will be saved in ${ABEJA_STORAGE_DIR_PATH}/{channel_id}/{file_id}.
LIFETIME
= ('1day', '1week', '1month', '6months')¶commit
() → bool¶reflect instance info into remote state. only metadata, lifetime are editable for now.
get_content
(cache: bool = True) → bytes¶Get content from a binary file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_content()
get_file_info
() → dict¶Get information of a file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_file_info()
Response Syntax:
{
"url_expires_on": "2017-12-20T17:08:26+00:00",
"uploaded_at": "2017-12-18T05:39:47+00:00",
"metadata": {
"x-abeja-meta-filename": "test.jpg"
},
"file_id": "20171218T053947-821bd0a3-3992-4320-bc1c-1ee8d0a0ad6b",
"download_uri": "...",
"content_type": "image/jpeg"
}
get_iter_content
(cache: bool = True, chunk_size: int = 1048576) → typing.Generator[[bytes, NoneType], NoneType]¶Get content iteratively from a binary file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_iter_content()
get_iter_lines
(cache: bool = True) → typing.Generator[[str, NoneType], NoneType]¶Get lines iteratively from a text file
if the text file exists in local, get content from the file. unless, get content from remote, and save it in local.
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_iter_lines()
get_json
() → dict¶Get json from a file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_json()
get_text
(cache: bool = True, encoding: typing.Union[str, NoneType] = None) → str¶Get content from a text file
file_id = '20180101T000000-00000000-1111-2222-3333-999999999999'
datalake_file = channel.get_file(file_id=file_id)
content = datalake_file.get_text()
lifetime
¶to_source_data
() → typing.Dict[str, str]¶Convert to source data format