663 lines
28 KiB
Python
663 lines
28 KiB
Python
from flask import Flask, render_template, redirect, url_for, request, flash, session, abort
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_caching import Cache
|
|
from http import HTTPStatus
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
from flask_login import LoginManager, login_user, logout_user, login_required, current_user, UserMixin
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from flask_wtf import FlaskForm
|
|
from wtforms import StringField, PasswordField, BooleanField, SubmitField
|
|
from wtforms.validators import ValidationError, DataRequired, Email, EqualTo, Length, Regexp
|
|
import matplotlib
|
|
import matplotlib.pyplot as plt
|
|
import random
|
|
import qrcode
|
|
import io
|
|
from base64 import b64encode
|
|
import uuid
|
|
from enum import Enum
|
|
import os
|
|
|
|
from groq import Groq
|
|
from custom_fields import CreditCardField
|
|
|
|
app = Flask(__name__)
|
|
app.config['CACHE_TYPE'] = 'null'
|
|
cache = Cache(app)
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////app/bike_rental.db'
|
|
app.config['SECRET_KEY'] = str(uuid.uuid4())
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
|
|
db = SQLAlchemy(app)
|
|
|
|
matplotlib.use('Agg') # Use a non-interactive backend
|
|
|
|
def get_utc_time():
|
|
utc_time = datetime.utcnow().replace(tzinfo=pytz.utc)
|
|
return utc_time
|
|
|
|
@app.route('/offline')
|
|
def offline():
|
|
return render_template('offline.html')
|
|
|
|
login_manager = LoginManager(app)
|
|
|
|
class User(db.Model, UserMixin):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
phone_number = db.Column(db.String(8), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(128), nullable=False)
|
|
is_member = db.Column(db.Boolean, default=False)
|
|
is_admin = db.Column(db.Boolean, default=False)
|
|
station_id = db.Column(db.Integer, db.ForeignKey('station.id'))
|
|
credit_card_number = db.Column(db.String(19), nullable=True)
|
|
credit_card_expiry = db.Column(db.String(5), nullable=True)
|
|
credit_card_cvv = db.Column(db.String(4), nullable=True)
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def get_id(self):
|
|
return self.id
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
user = User.query.get(int(user_id))
|
|
return user
|
|
|
|
class Bike(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
bike_type = db.Column(db.String(50), nullable=False)
|
|
price_per_hour = db.Column(db.Float, nullable=False)
|
|
availability = db.Column(db.Integer, nullable=False)
|
|
|
|
class Station(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
location = db.Column(db.String(200), nullable=False)
|
|
|
|
class Rental(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
bike_id = db.Column(db.Integer, db.ForeignKey('bike.id'), nullable=False)
|
|
bike = db.relationship('Bike', backref='rentals')
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
user = db.relationship('User', backref=db.backref('rentals', cascade='all, delete-orphan'), foreign_keys=[user_id])
|
|
start_station_id = db.Column(db.Integer, db.ForeignKey('station.id'), nullable=False)
|
|
start_station = db.relationship('Station', foreign_keys=[start_station_id], backref='start_rentals')
|
|
end_station_id = db.Column(db.Integer, db.ForeignKey('station.id'), nullable=False)
|
|
end_station = db.relationship('Station', foreign_keys=[end_station_id], backref='end_rentals')
|
|
start_time = db.Column(db.DateTime, nullable=False, default=get_utc_time)
|
|
end_time = db.Column(db.DateTime)
|
|
total_price = db.Column(db.Float)
|
|
num_bikes = db.Column(db.Integer, default=1)
|
|
|
|
class RegistrationForm(FlaskForm):
|
|
phone_number = StringField('Phone Number', validators=[DataRequired(), Length(min=8, max=8)])
|
|
password = PasswordField('Password', validators=[DataRequired(), Length(min=8)])
|
|
confirm_password = PasswordField('Confirm Password', validators=[DataRequired(), EqualTo('password', message='Passwords must match.')])
|
|
credit_card_number = CreditCardField('Credit Card Number')
|
|
credit_card_expiry = CreditCardField('Expiry Date (MM/YY)')
|
|
credit_card_cvv = CreditCardField('CVV')
|
|
submit = SubmitField('Register')
|
|
|
|
class LoginForm(FlaskForm):
|
|
phone_number = StringField('Phone Number', validators=[DataRequired(), Length(min=8, max=8)])
|
|
password = PasswordField('Password', validators=[DataRequired()])
|
|
remember_me = BooleanField('Remember Me')
|
|
submit = SubmitField('Login')
|
|
|
|
class ReserveForm(FlaskForm):
|
|
email = StringField('Email', validators=[DataRequired(), Email()])
|
|
submit = SubmitField('Reserve')
|
|
|
|
class ReturnForm(FlaskForm):
|
|
submit = SubmitField('Return')
|
|
|
|
# User-Friendly Web/Mobile Interface
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/about')
|
|
def about():
|
|
return render_template('about.html')
|
|
|
|
@app.route('/membership_info')
|
|
def membership_info():
|
|
return render_template('membership_info.html')
|
|
|
|
@app.route('/rent/<bike_id>', methods=['GET'])
|
|
@login_required
|
|
def rent_bike(bike_id):
|
|
bike = db.session.get(Bike, bike_id)
|
|
if current_user.is_admin:
|
|
return abort(403)
|
|
return render_template('rent_bike.html', bike=bike)
|
|
|
|
@app.route('/rent/<bike_id>/qr_code', methods=['GET'])
|
|
@login_required
|
|
def rent_bike_qr_code(bike_id):
|
|
if current_user.is_admin:
|
|
return abort(401)
|
|
|
|
bike = db.session.get(Bike, bike_id)
|
|
|
|
# Generate QR code with the complete_rental URL and user ID
|
|
complete_url = url_for('complete_rental', bike_id=bike_id, user_id=current_user.id, _external=True)
|
|
img = qrcode.make(complete_url)
|
|
img_io = io.BytesIO()
|
|
img.save(img_io, 'PNG')
|
|
qr_code_base64 = b64encode(img_io.getvalue()).decode('utf-8')
|
|
|
|
return render_template('qr_code.html', bike=bike, qr_code_base64=qr_code_base64)
|
|
|
|
@app.route('/rent/<bike_id>/<user_id>/complete', methods=['GET', 'POST'])
|
|
@login_required
|
|
def complete_rental(bike_id, user_id):
|
|
if not current_user.is_admin:
|
|
return abort(403)
|
|
|
|
bike = db.session.get(Bike, bike_id)
|
|
user = db.session.get(User, user_id)
|
|
stations = Station.query.all() # Retrieve all stations
|
|
admin_station = db.session.get(Station, current_user.station_id)
|
|
|
|
active_rental = Rental.query.filter_by(bike_id=bike.id, user_id=user.id, end_time=None).first()
|
|
|
|
if active_rental:
|
|
start_time = active_rental.start_time.replace(tzinfo=pytz.utc)
|
|
current_time = datetime.now(pytz.timezone('UTC'))
|
|
rental_duration = (current_time - start_time).total_seconds() / 3600
|
|
rental_duration = round(rental_duration, 2)
|
|
|
|
if request.method == 'POST' and request.form['action'] == 'return':
|
|
start_station = db.session.get(Station, active_rental.start_station_id)
|
|
end_station = db.session.get(Station, request.form['end_station'])
|
|
active_rental.end_time = get_utc_time()
|
|
active_rental.end_station = end_station
|
|
total_rental_price = calculate_rental_price(start_time, current_time, active_rental.bike_id, active_rental.num_bikes, user, start_station, end_station)
|
|
active_rental.total_price = total_rental_price
|
|
db.session.commit()
|
|
flash('Bike(s) returned successfully.', 'success')
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
start_station = db.session.get(Station, active_rental.start_station_id)
|
|
if active_rental.end_time:
|
|
end_time = active_rental.end_time.astimezone(pytz.timezone('Asia/Hong_Kong'))
|
|
else:
|
|
end_time = None
|
|
total_rental_price = calculate_rental_price(start_time, end_time, active_rental.bike_id, active_rental.num_bikes, user, start_station, None)
|
|
tz = pytz.timezone('Asia/Hong_Kong')
|
|
return render_template('complete_rental.html', bike=bike, user=user, active_rental=active_rental, rental_duration=rental_duration, total_rental_price=total_rental_price, stations=stations, tz=tz, admin_station=admin_station)
|
|
else:
|
|
start_station = db.session.get(Station, current_user.station_id)
|
|
if start_station is None:
|
|
flash('You must be an admin to complete new rentals.', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
# Modify the following lines to allow bikes with availability 3 and 2
|
|
same_type_bikes = Bike.query.filter(Bike.bike_type == bike.bike_type, Bike.availability.in_([4, 3, 2])).all()
|
|
if not same_type_bikes:
|
|
flash('No bikes of this type are available for rental.', 'danger')
|
|
return redirect(url_for('index'))
|
|
|
|
rental = Rental(bike_id=same_type_bikes[0].id, user_id=user.id, start_time=get_utc_time(), start_station_id=start_station.id)
|
|
db.session.add(rental)
|
|
|
|
if request.method == 'POST' and request.form['action'] == 'rent':
|
|
db.session.commit()
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
availability = "A Lot" if same_type_bikes[0].availability == 4 else "Some" if same_type_bikes[0].availability == 3 else "A Little" if same_type_bikes[0].availability == 2 else "None"
|
|
return render_template('complete_rental.html', bike=same_type_bikes[0], user=user, availability=availability, stations=stations)
|
|
|
|
# Real-Time Bike Availability
|
|
@app.route('/stations')
|
|
def stations():
|
|
stations = Station.query.all()
|
|
return render_template('stations.html', stations=stations)
|
|
|
|
@app.route('/station/<station_id>')
|
|
def station_bikes(station_id):
|
|
station = db.session.get(Station, station_id)
|
|
bikes = Bike.query.all()
|
|
availability_levels = {
|
|
4: "A Lot",
|
|
3: "Some",
|
|
2: "A Little",
|
|
1: "None"
|
|
}
|
|
bike_availability = {}
|
|
for bike in bikes:
|
|
bike_availability[bike.bike_type] = {
|
|
"availability": availability_levels[bike.availability],
|
|
"id": bike.id,
|
|
"price_per_hour": bike.price_per_hour
|
|
}
|
|
return render_template('station_bikes.html', station=station, bike_availability=bike_availability)
|
|
|
|
|
|
# Rental History and Reporting
|
|
@app.route('/rentals', methods=['GET', 'POST'])
|
|
@login_required
|
|
def rentals():
|
|
if current_user.is_admin:
|
|
return abort(403)
|
|
|
|
if request.method == 'POST':
|
|
rental_id = request.form['rental_id']
|
|
rental = Rental.query.get(rental_id)
|
|
bike = Bike.query.get(rental.bike_id)
|
|
start_station = Station.query.get(rental.start_station_id)
|
|
end_station = Station.query.get(rental.end_station_id)
|
|
if rental.end_time:
|
|
start_time = rental.start_time.astimezone(pytz.timezone('Asia/Hong_Kong'))
|
|
end_time = rental.end_time.astimezone(pytz.timezone('Asia/Hong_Kong'))
|
|
rental_duration = (end_time - start_time).total_seconds() / 3600
|
|
rental_duration = round(rental_duration, 2)
|
|
else:
|
|
rental_duration = 'Ongoing'
|
|
total_rental_price = rental.total_price if rental.total_price else 'N/A'
|
|
return render_template('rental_details.html', rental=rental, bike=bike, start_station=start_station, end_station=end_station, rental_duration=rental_duration, total_rental_price=total_rental_price, datetime=datetime, pytz=pytz)
|
|
|
|
rentals = Rental.query.filter_by(user_id=current_user.id).all()
|
|
rental_data = []
|
|
for rental in rentals:
|
|
bike = Bike.query.get(rental.bike_id)
|
|
start_station = Station.query.get(rental.start_station_id)
|
|
end_station = Station.query.get(rental.end_station_id)
|
|
end_station_name = end_station.name if end_station else 'Not Returned'
|
|
start_time = rental.start_time.astimezone(pytz.timezone('Asia/Hong_Kong'))
|
|
if rental.end_time:
|
|
end_time = rental.end_time.astimezone(pytz.timezone('Asia/Hong_Kong'))
|
|
rental_duration = (end_time - start_time).total_seconds() / 3600
|
|
rental_duration = round(rental_duration, 2)
|
|
else:
|
|
end_time = None
|
|
rental_duration = 'Ongoing'
|
|
rental_data.append({
|
|
'bike_type': bike.bike_type,
|
|
'bike_quantity': rental.num_bikes,
|
|
'start_station': start_station.name,
|
|
'end_station': end_station_name,
|
|
'start_time': start_time,
|
|
'end_time': end_time,
|
|
'rental_duration': rental_duration,
|
|
'total_price': rental.total_price if rental.total_price else 'N/A',
|
|
'bike_id': rental.id
|
|
})
|
|
|
|
return render_template('rentals.html', rental_data=rental_data, datetime=datetime, pytz=pytz)
|
|
|
|
@app.route('/rental_history_charts')
|
|
@login_required
|
|
def rental_history_charts():
|
|
# Check if the current user is an admin
|
|
if current_user.is_admin:
|
|
abort(403) # Forbidden, admins are not allowed to access this route
|
|
|
|
# Retrieve the user's rental history
|
|
rentals = Rental.query.filter_by(user_id=current_user.id).all()
|
|
|
|
# Generate chart 1: Rental duration distribution
|
|
rental_durations = [
|
|
(rental.end_time.astimezone(pytz.timezone('Asia/Hong_Kong')) - rental.start_time.astimezone(pytz.timezone('Asia/Hong_Kong'))).total_seconds() / 3600
|
|
for rental in rentals if rental.end_time
|
|
]
|
|
|
|
fig1, ax1 = plt.subplots(figsize=(12, 6))
|
|
ax1.barh(range(len(rental_durations)), rental_durations, color=[f'#{"%06x" % random.randint(0, 0xFFFFFF)}' for _ in range(len(rental_durations))])
|
|
ax1.set_yticks(range(len(rental_durations)))
|
|
ax1.set_yticklabels([f'Rental {i+1}' for i in range(len(rental_durations))])
|
|
ax1.set_title('Rental Duration Distribution', fontsize=18)
|
|
ax1.set_xlabel('Duration (hours)', fontsize=14)
|
|
ax1.set_ylabel('Rental', fontsize=14)
|
|
ax1.tick_params(axis='both', labelsize=12)
|
|
|
|
img_io = io.BytesIO()
|
|
fig1.savefig(img_io, format='png')
|
|
img_io.seek(0)
|
|
chart1_base64 = b64encode(img_io.getvalue()).decode('utf-8')
|
|
plt.close(fig1)
|
|
|
|
# Generate chart 2: Rental cost distribution
|
|
rental_costs = [rental.total_price for rental in rentals if rental.total_price]
|
|
|
|
fig2, ax2 = plt.subplots(figsize=(12, 6))
|
|
ax2.barh(range(len(rental_costs)), rental_costs, color=[f'#{"%06x" % random.randint(0, 0xFFFFFF)}' for _ in range(len(rental_costs))])
|
|
ax2.set_yticks(range(len(rental_costs)))
|
|
ax2.set_yticklabels([f'Rental {i+1}' for i in range(len(rental_costs))])
|
|
ax2.set_title('Rental Cost Distribution', fontsize=18)
|
|
ax2.set_xlabel('Cost ($)', fontsize=14)
|
|
ax2.set_ylabel('Rental', fontsize=14)
|
|
ax2.tick_params(axis='both', labelsize=12)
|
|
|
|
img_io = io.BytesIO()
|
|
fig2.savefig(img_io, format='png')
|
|
img_io.seek(0)
|
|
chart2_base64 = b64encode(img_io.getvalue()).decode('utf-8')
|
|
plt.close(fig2)
|
|
|
|
return render_template('rental_history_charts.html', chart1_base64=chart1_base64, chart2_base64=chart2_base64)
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
form = RegistrationForm()
|
|
if form.validate_on_submit():
|
|
phone_number = form.phone_number.data
|
|
password = form.password.data
|
|
credit_card_number = form.credit_card_number.data or ''
|
|
credit_card_expiry = form.credit_card_expiry.data or ''
|
|
credit_card_cvv = form.credit_card_cvv.data or ''
|
|
user = User.query.filter_by(phone_number=phone_number).first()
|
|
if user:
|
|
flash('Phone number already registered. Please log in.', 'info')
|
|
return render_template('register.html', form=form), HTTPStatus.CONFLICT
|
|
user = User(phone_number=phone_number)
|
|
user.set_password(password)
|
|
user.is_member = bool(credit_card_number and credit_card_expiry and credit_card_cvv)
|
|
user.credit_card_number = credit_card_number
|
|
user.credit_card_expiry = credit_card_expiry
|
|
user.credit_card_cvv = credit_card_cvv
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
flash('You have successfully registered. Please log in.', 'success')
|
|
return redirect(url_for('login'))
|
|
return render_template('register.html', form=form)
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
form = LoginForm()
|
|
if form.validate_on_submit():
|
|
phone_number = form.phone_number.data
|
|
password = form.password.data
|
|
user = User.query.filter_by(phone_number=phone_number).first()
|
|
if user:
|
|
if user.check_password(password):
|
|
login_user(user, remember=form.remember_me.data)
|
|
flash('Logged in successfully.', 'success')
|
|
if user.is_admin:
|
|
session['is_admin'] = True
|
|
else:
|
|
session['is_admin'] = False
|
|
return redirect(url_for('index'))
|
|
else:
|
|
flash('Invalid password.', 'danger')
|
|
return render_template('login.html', form=form), HTTPStatus.UNAUTHORIZED
|
|
else:
|
|
flash('Phone number not registered. Please register first.', 'danger')
|
|
return render_template('login.html', form=form), HTTPStatus.UNAUTHORIZED
|
|
return render_template('login.html', form=form)
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
flash('You have been logged out.', 'success')
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/my_account', methods=['GET', 'POST'])
|
|
@login_required
|
|
def my_account():
|
|
if current_user.is_admin:
|
|
abort(403)
|
|
|
|
if request.method == 'POST':
|
|
action = request.form['action']
|
|
if action == 'become_member':
|
|
current_user.is_member = True
|
|
current_user.credit_card_number = request.form['credit_card_number']
|
|
current_user.credit_card_expiry = request.form['credit_card_expiry']
|
|
current_user.credit_card_cvv = request.form['credit_card_cvv']
|
|
db.session.commit()
|
|
flash('You have successfully become a member.', 'success')
|
|
elif action == 'stop_membership':
|
|
current_user.is_member = False
|
|
current_user.credit_card_number = None
|
|
current_user.credit_card_expiry = None
|
|
current_user.credit_card_cvv = None
|
|
db.session.commit()
|
|
flash('You have successfully stopped your membership.', 'success')
|
|
elif action == 'change_password':
|
|
current_password = request.form['current_password']
|
|
new_password = request.form['new_password']
|
|
confirm_password = request.form['confirm_password']
|
|
if not current_user.check_password(current_password):
|
|
flash('Current password is incorrect.', 'danger')
|
|
elif new_password != confirm_password:
|
|
flash('New password and confirm password do not match.', 'danger')
|
|
else:
|
|
current_user.set_password(new_password)
|
|
db.session.commit()
|
|
flash('Password changed successfully.', 'success')
|
|
return redirect(url_for('my_account'))
|
|
|
|
return render_template('my_account.html', user=current_user)
|
|
|
|
@app.route('/delete_account', methods=['POST'])
|
|
@login_required
|
|
def delete_account():
|
|
delete_password = request.form['delete_password']
|
|
if current_user.check_password(delete_password):
|
|
db.session.delete(current_user)
|
|
db.session.commit()
|
|
logout_user()
|
|
flash('Your account has been deleted.', 'success')
|
|
return redirect(url_for('index'))
|
|
else:
|
|
flash('Incorrect password. Account deletion failed.', 'danger')
|
|
return redirect(url_for('my_account'))
|
|
|
|
@app.route('/bike_recommendation', methods=['GET', 'POST'])
|
|
async def bike_recommendation():
|
|
def get_all_bike_types():
|
|
bike_types = Bike.query.with_entities(Bike.bike_type).distinct().all()
|
|
return [bt[0] for bt in bike_types]
|
|
bike_types = get_all_bike_types()
|
|
if request.method == 'POST':
|
|
user_prompt = request.form['user_prompt']
|
|
api_key = os.environ.get('GROQ_API_KEY')
|
|
groq = Groq(api_key, bike_types)
|
|
try:
|
|
response = await groq.process_request(user_prompt)
|
|
if response == "Malformed request, please make sure you requested correctly.":
|
|
return render_template('bike_recommendation.html', response=response, bike_types=bike_types), HTTPStatus.BAD_GATEWAY
|
|
return render_template('bike_recommendation.html', response=response, bike_types=bike_types)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
return render_template('bike_recommendation.html', bike_types=bike_types)
|
|
|
|
# Admin Dashboard and Analytics
|
|
@app.route('/admin')
|
|
@login_required
|
|
def admin_dashboard():
|
|
if not current_user.is_admin:
|
|
abort(401)
|
|
total_rentals = Rental.query.count()
|
|
total_revenue = sum(r.total_price for r in Rental.query.all() if r.total_price is not None)
|
|
member_rentals = Rental.query.join(User).filter(User.is_member == True).count()
|
|
non_member_rentals = Rental.query.join(User).filter(User.is_member == False).count()
|
|
return render_template('admin_dashboard.html',
|
|
total_rentals=total_rentals,
|
|
total_revenue=total_revenue,
|
|
member_rentals=member_rentals,
|
|
non_member_rentals=non_member_rentals)
|
|
|
|
@app.route('/admin/analytics')
|
|
@login_required
|
|
def admin_analytics():
|
|
if not current_user.is_admin:
|
|
abort(401)
|
|
|
|
# Retrieve data for the charts
|
|
rentals = Rental.query.all()
|
|
users = User.query.all()
|
|
bikes = Bike.query.all()
|
|
|
|
# Chart 1: Rental Duration Distribution
|
|
rental_durations = [
|
|
(rental.end_time - rental.start_time).total_seconds() / 3600
|
|
for rental in rentals if rental.end_time
|
|
]
|
|
fig1, ax1 = plt.subplots(figsize=(12, 6))
|
|
ax1.barh(range(len(rental_durations)), rental_durations, color=[f'#{"%06x" % random.randint(0, 0xFFFFFF)}' for _ in range(len(rental_durations))])
|
|
ax1.set_yticks(range(len(rental_durations)))
|
|
ax1.set_yticklabels([f'Rental {i+1}' for i in range(len(rental_durations))])
|
|
ax1.set_title('Rental Duration Distribution', fontsize=18)
|
|
ax1.set_xlabel('Duration (hours)', fontsize=14)
|
|
ax1.set_ylabel('Rental', fontsize=14)
|
|
ax1.tick_params(axis='both', labelsize=12)
|
|
img_io = io.BytesIO()
|
|
fig1.savefig(img_io, format='png')
|
|
img_io.seek(0)
|
|
chart1_base64 = b64encode(img_io.getvalue()).decode('utf-8')
|
|
plt.close(fig1)
|
|
|
|
# Chart 2: Rental Cost Distribution
|
|
rental_costs = [rental.total_price for rental in rentals if rental.total_price]
|
|
fig2, ax2 = plt.subplots(figsize=(12, 6))
|
|
ax2.barh(range(len(rental_costs)), rental_costs, color=[f'#{"%06x" % random.randint(0, 0xFFFFFF)}' for _ in range(len(rental_costs))])
|
|
ax2.set_yticks(range(len(rental_costs)))
|
|
ax2.set_yticklabels([f'Rental {i+1}' for i in range(len(rental_costs))])
|
|
ax2.set_title('Rental Cost Distribution', fontsize=18)
|
|
ax2.set_xlabel('Cost ($)', fontsize=14)
|
|
ax2.set_ylabel('Rental', fontsize=14)
|
|
ax2.tick_params(axis='both', labelsize=12)
|
|
img_io = io.BytesIO()
|
|
fig2.savefig(img_io, format='png')
|
|
img_io.seek(0)
|
|
chart2_base64 = b64encode(img_io.getvalue()).decode('utf-8')
|
|
plt.close(fig2)
|
|
|
|
# Chart 3: Bike Availability
|
|
bike_availability = {bike.bike_type: bike.availability for bike in bikes}
|
|
fig3, ax3 = plt.subplots(figsize=(12, 6))
|
|
ax3.pie(bike_availability.values(), labels=bike_availability.keys(), autopct='%1.1f%%', colors=[f'#{"%06x" % random.randint(0, 0xFFFFFF)}' for _ in range(len(bike_availability))])
|
|
ax3.set_title('Bike Availability', fontsize=18)
|
|
img_io = io.BytesIO()
|
|
fig3.savefig(img_io, format='png')
|
|
img_io.seek(0)
|
|
chart3_base64 = b64encode(img_io.getvalue()).decode('utf-8')
|
|
plt.close(fig3)
|
|
|
|
# Chart 4: Member vs Non-Member Rentals
|
|
member_rentals = Rental.query.join(User).filter(User.is_member == True).count()
|
|
non_member_rentals = Rental.query.join(User).filter(User.is_member == False).count()
|
|
fig4, ax4 = plt.subplots(figsize=(12, 6))
|
|
ax4.pie([member_rentals, non_member_rentals], labels=['Member Rentals', 'Non-Member Rentals'], autopct='%1.1f%%', colors=['#ff9999', '#99ff99'])
|
|
ax4.set_title('Member vs Non-Member Rentals', fontsize=18)
|
|
img_io = io.BytesIO()
|
|
fig4.savefig(img_io, format='png')
|
|
img_io.seek(0)
|
|
chart4_base64 = b64encode(img_io.getvalue()).decode('utf-8')
|
|
plt.close(fig4)
|
|
|
|
return render_template('admin_analytics.html',
|
|
chart1_base64=chart1_base64,
|
|
chart2_base64=chart2_base64,
|
|
chart3_base64=chart3_base64,
|
|
chart4_base64=chart4_base64)
|
|
|
|
@app.route('/estimate_rental_price', methods=['GET', 'POST'])
|
|
def estimate_rental_price():
|
|
if request.method == 'POST':
|
|
try:
|
|
start_station_id = request.form['start_station']
|
|
end_station_id = request.form['end_station']
|
|
duration_hours = float(request.form['duration'])
|
|
bike_id = request.form['bike']
|
|
is_member = 'is_member' in request.form
|
|
|
|
if duration_hours <= 0:
|
|
flash('Duration must be a positive number.', 'danger')
|
|
return redirect(url_for('estimate_rental_price'))
|
|
|
|
start_station = db.session.get(Station, start_station_id)
|
|
if not start_station:
|
|
flash('Invalid start station.', 'danger')
|
|
return redirect(url_for('estimate_rental_price'))
|
|
|
|
end_station = db.session.get(Station, end_station_id)
|
|
if not end_station:
|
|
flash('Invalid end station.', 'danger')
|
|
return redirect(url_for('estimate_rental_price'))
|
|
|
|
bike = db.session.get(Bike, bike_id)
|
|
if not bike:
|
|
flash('Invalid bike.', 'danger')
|
|
return redirect(url_for('estimate_rental_price'))
|
|
|
|
if bike.availability < 4:
|
|
availability_text = f"Estimating with a {bike.bike_type} bike, which is currently not fully available."
|
|
else:
|
|
availability_text = f"Estimating with a {bike.bike_type} bike, which is currently fully available."
|
|
current_time = get_utc_time()
|
|
|
|
total_price = calculate_rental_price(
|
|
current_time,
|
|
current_time + timedelta(hours=duration_hours),
|
|
bike_id,
|
|
1,
|
|
current_user if current_user.is_authenticated else None,
|
|
start_station,
|
|
end_station,
|
|
'hourly',
|
|
is_member
|
|
)
|
|
|
|
return render_template('estimate_rental_price.html',
|
|
start_station=start_station,
|
|
end_station=end_station,
|
|
bike=bike,
|
|
duration_hours=duration_hours,
|
|
total_price=total_price,
|
|
availability_text=availability_text,
|
|
is_member=is_member,
|
|
show_result=True)
|
|
except ValueError:
|
|
flash('Invalid input. Please check your entries.', 'danger')
|
|
return redirect(url_for('estimate_rental_price'))
|
|
|
|
stations = Station.query.all()
|
|
bikes = Bike.query.all()
|
|
return render_template('estimate_rental_price.html', stations=stations, bikes=bikes, show_result=False)
|
|
|
|
def calculate_rental_price(start_time, end_time, bike_id, num_bikes, user=None, start_station=None, end_station=None, rent_option='hourly', is_member=False):
|
|
if end_time is None:
|
|
end_time = get_utc_time().astimezone(pytz.timezone('Asia/Hong_Kong'))
|
|
|
|
bike = db.session.get(Bike, bike_id)
|
|
price_per_hour = bike.price_per_hour
|
|
rental_duration_hours = (end_time - start_time).total_seconds() / 3600 # Convert to hours
|
|
print(rental_duration_hours)
|
|
if rental_duration_hours.as_integer_ratio()[1] == 1:
|
|
rental_duration_hours = int(rental_duration_hours)
|
|
else:
|
|
rental_duration_hours = int(rental_duration_hours) + 1
|
|
rental_duration_days = rental_duration_hours // 24 # Calculate number of days
|
|
|
|
if rent_option == 'hourly':
|
|
base_price = rental_duration_hours * price_per_hour * num_bikes
|
|
else: # Daily
|
|
daily_price = price_per_hour * 24 # Assuming daily price is 24 times the hourly rate
|
|
base_price = rental_duration_days * daily_price * num_bikes
|
|
remaining_hours = rental_duration_hours % 24
|
|
base_price += remaining_hours * price_per_hour * num_bikes
|
|
|
|
total_price = base_price
|
|
|
|
if is_member:
|
|
total_price *= 0.8 # 20% discount for members
|
|
if start_station and end_station and start_station != end_station:
|
|
if is_member:
|
|
total_price += 10 # Additional $10 for different start and end stations for members
|
|
else:
|
|
total_price += 30 # Additional $30 for different start and end stations for non-members
|
|
|
|
return round(total_price, 2)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host="0.0.0.0", port=8080, debug=True)
|