Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/codebase cleanup #10

Merged
merged 13 commits into from
Aug 13, 2023
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ __pycache__/
# Generated system files (from archives, etc)
.DS_Store
opt/
media
media

# Editor configs
/.vscode
52 changes: 17 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,54 +1,38 @@
# Simon
A pipeline which allows for the ingestion, storage, and processing of a large body of textual information with LLMs.
# #!/Simon
Hello! Welcome to Simon. Simon is an open-source pipeline which allows for the ingestion, storage, and processing of a large body of textual information with LLMs.

We are working on demos of salient use cases soon, as well as a hosted instance of Simon. This document serves as a very-minimal quickstart.

## Quickstart to run the code

**Create a Python virtual environment for dependencies, etc.**
(Only need to do this once per computer)

```
python3 -m venv venv
```
### Elastic
You need an ElasticSearch instance running somewhere to support Simon. One easy way to get it setup on your local machine is by [getting yourself a copy of Elastic here](https://www.elastic.co/downloads/elasticsearch) and following instructions there to get it running. There are also hosted options available online.

**Ensure virtual environment is activated.**
(Do this each time you start a new terminal session)
You will also need OpenAI credentials. This will be available be either with the OpenAI public API, or Azure OpenAI Services.

```
source venv/bin/activate
```
Get the Elastic connection credentials and OpenAI credentials, and keep it for the steps below.

**Install Python dependencies in virtual environment.**
(Do this whenever dependencies change)
### Requirements Setup
Begin by using the Python package management tool of your choice to install the requirements:

```
pip install -r requirements.txt
```

**Make sure all environment variables are set.**
(Do this whenever you want to run code)
all versions are figured with `Python 3.11`; all versions `>3.9` should be supported.

```
python environment.py
```

If you don't have a required environment variable set, you'll get a message like this:

```
Not all required environment variables present, some code might not function properly.
Missing vars: OPENAI_KEY
```

Set those variables through the `.env` file (copy `.env.example` and set values after the = sign) or through simple `export` in your shell, e.g.:
### Set Environment Variables
Collect your credentials from the steps before, and create an `.env` file (copy `.env.example` and set values after the = sign) or through simple `export` in your shell, e.g.:

```
export OPENAI_KEY=sk-some-api-key
```

An example of all the environment variables needed is in the .env.example file?
An example of all the environment variables needed is in the .env.example file.

Values set in your shell will override those in the `.env` file.

**Provision your Elastic**
### Provision your Elastic

You need to manually seed the ElasticSearch schema when you're first setting up. To do this, create an ElasticSearch api instance, and use the following helper script once per **new elastic instance**:

Expand All @@ -62,11 +46,9 @@ If you find that you want to delete the existing ElasticSearch schema and start
python setup_es.py --nuke
```

**Run the code!**
### Run the code!

```
python playground.py
```
You are now ready to ~~rock~~ Simon! Follow the usage examples in `tutorial.py` to get a full overview of the API.

## Misc notes

Expand Down
2 changes: 1 addition & 1 deletion api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from uuid import uuid4

# environment variables
from environment import get_env_vars
from simon.environment import get_env_vars
env_vars = get_env_vars()
KEY = env_vars.get("OPENAI_KEY")
ES_CONFIG = env_vars.get('ES_CONFIG')
Expand Down
199 changes: 199 additions & 0 deletions ingest_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import argparse
import logging
import multiprocessing
import os
import signal
import threading

from simon.components import aws
from simon.ingestion import TextFileIngester

def _setup_log_file(log_file):
# Make intermediate directories if necessary
if os.path.dirname(log_file):
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# Clear any previous log contents
open(log_file, 'w').close()


def _configure_logger(debug=False, log_file=None):
log_format = '[%(asctime)s] [%(name)s] [%(processName)s] [%(levelname)s] %(message)s'
if debug:
logging.basicConfig(format=log_format, level=logging.DEBUG)
else:
logging.basicConfig(format=log_format, level=logging.INFO)

# Suppress chatty request logging from elasticsearch library
logging.getLogger('elastic_transport.transport').setLevel(logging.WARNING)

if log_file:
_setup_log_file(log_file)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter(log_format))
logging.getLogger().addHandler(file_handler)

logging.info('Logger initialized.')


def _make_logger_thread(logger_queue):
def thread_target():
while True:
record = logger_queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)

return threading.Thread(target=thread_target)


def _find_files_to_ingest(file_paths):
to_ingest = []
for path in file_paths:
if aws.is_s3_uri(path):
logging.info(
f'Finding all files at S3 URI {path} to be ingested...')
s3_uris = aws.get_files_at_s3_uri(path)
logging.debug(f'{len(s3_uris)} files found: {s3_uris}')
to_ingest.extend(s3_uris)
elif os.path.isdir(path):
logging.info(
f'Finding all files in directory {path} to be ingested...')
dir_files = []
for root, _, files in os.walk(path):
for file in files:
dir_files.append(os.path.join(root, file))
logging.debug(f'{len(dir_files)} files found: {dir_files}')
to_ingest.extend(dir_files)
else:
to_ingest.append(path)

logging.info(f'{len(to_ingest)} files found for ingestion.')
return to_ingest


def _configure_worker_logger(logger_queue=None, debug=False):
if not logger_queue:
raise Exception(
'Must provide a Queue to route worker logs through.')

# Clear any inherited log handlers so all logging will go through queue
logging.getLogger().handlers.clear()

from logging.handlers import QueueHandler
log_handler = QueueHandler(logger_queue)
logging.getLogger().addHandler(log_handler)

log_level = logging.DEBUG if debug else logging.INFO
logging.getLogger().setLevel(log_level)

# Suppress chatty request logging from elasticsearch library
logging.getLogger('elastic_transport.transport').setLevel(logging.WARNING)

logging.info('Worker logger initialized.')


def _make_ingestion_worker(files=[], ingester_args={}, logger_args={}):
def process_target():
_configure_worker_logger(**logger_args)
ingester = TextFileIngester(**ingester_args)
ingester.ingest_all(files)

return multiprocessing.Process(target=process_target)


# Handle keyboard interrupts (ctrl+C from console); without this, workers will not be terminated
def _handle_keyboard_interrupt(*args):
# Get the current process ID
current_process_id = os.getpid()

# Terminate all child processes
for process in multiprocessing.active_children():
if process.pid != current_process_id:
process.terminate()

# Exit the main process (if needed)
raise SystemExit(f"KeyboardInterrupt (PID: {current_process_id})")


signal.signal(signal.SIGINT, _handle_keyboard_interrupt)


def main(args):
_configure_logger(args.debug, args.log_file)

logger_queue = multiprocessing.Queue()
logger_thread = _make_logger_thread(logger_queue)
logger_thread.start()

to_ingest = _find_files_to_ingest(args.files)

worker_processes = []
files_per_worker = len(to_ingest) // args.num_workers
file_segments = [to_ingest[i:i + files_per_worker]
for i in range(0, len(to_ingest), files_per_worker)]
for segment in file_segments:
worker = _make_ingestion_worker(
files=segment,
ingester_args={
'uid': args.uid,
'source_prefix': args.source_prefix
},
logger_args={
'logger_queue': logger_queue,
'debug': args.debug,
}
)
worker_processes.append(worker)
worker.start()

for worker in worker_processes:
worker.join()

logging.info('All ingestion across all workers complete.')

# Tell the logger thread to stop
logger_queue.put(None)
logger_thread.join()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Ingest files into ElasticSearch for use with Simon.')

parser.add_argument(
'--files',
nargs='+',
help='One or more paths to files or folders to be ingested into ElasticSearch. Can be local paths or S3 URIs.'
)
parser.add_argument(
'--uid',
default='ingest_files',
help='UID to be associated with ingested files.'
)
parser.add_argument(
'--source_prefix',
default=None,
help='Prefix to be prepended to file names when setting `source` attribute for local documents.'
)
parser.add_argument(
'--num_workers',
type=int,
default=1,
help='Number of worker processes to use for ingestion.'
)
parser.add_argument(
'--debug',
action='store_true',
default=False,
help='Enable debug logging.'
)
parser.add_argument(
'--log_file',
default=None,
help='Mirror logs to a file in addition to stdout.'
)

args = parser.parse_args()

main(args)
Loading