mirror of
				https://git.collinwebdesigns.de/oscar.krause/fastapi-dls.git
				synced 2025-11-04 06:50:22 +01:00 
			
		
		
		
	main.py - added endpoint to release single lease
This commit is contained in:
		
							
								
								
									
										25
									
								
								app/main.py
									
									
									
									
									
								
							
							
						
						
									
										25
									
								
								app/main.py
									
									
									
									
									
								
							@@ -422,6 +422,31 @@ async def leasing_v1_lease_renew(request: Request, lease_ref: str):
 | 
			
		||||
    return JSONResponse(response)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.delete('/leasing/v1/lease/{lease_ref}', description='release (return) a lease')
 | 
			
		||||
async def leasing_v1_lease_delete(request: Request, lease_ref: str):
 | 
			
		||||
    token, cur_time = get_token(request), datetime.utcnow()
 | 
			
		||||
 | 
			
		||||
    origin_ref = token.get('origin_ref')
 | 
			
		||||
    logging.info(f'> [  return  ]: {origin_ref}: return {lease_ref}')
 | 
			
		||||
 | 
			
		||||
    entity = Lease.find_by_lease_ref(db, lease_ref)
 | 
			
		||||
    if entity.origin_ref != origin_ref:
 | 
			
		||||
        raise HTTPException(status_code=403, detail='access or operation forbidden')
 | 
			
		||||
    if entity is None:
 | 
			
		||||
        raise HTTPException(status_code=404, detail='requested lease not available')
 | 
			
		||||
 | 
			
		||||
    if Lease.delete(db, lease_ref) == 0:
 | 
			
		||||
        raise HTTPException(status_code=404, detail='lease not found')
 | 
			
		||||
 | 
			
		||||
    response = {
 | 
			
		||||
        "lease_ref": lease_ref,
 | 
			
		||||
        "prompts": None,
 | 
			
		||||
        "sync_timestamp": cur_time.isoformat(),
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return JSONResponse(response)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.delete('/leasing/v1/lessor/leases', description='release all leases')
 | 
			
		||||
async def leasing_v1_lessor_lease_remove(request: Request):
 | 
			
		||||
    token, cur_time = get_token(request), datetime.utcnow()
 | 
			
		||||
 
 | 
			
		||||
@@ -115,6 +115,13 @@ class Lease(Base):
 | 
			
		||||
        session.close()
 | 
			
		||||
        return entities
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def find_by_lease_ref(engine: Engine, lease_ref: str) -> "Lease":
 | 
			
		||||
        session = sessionmaker(bind=engine)()
 | 
			
		||||
        entity = session.query(Lease).filter(Lease.lease_ref == lease_ref).first()
 | 
			
		||||
        session.close()
 | 
			
		||||
        return entity
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def find_by_origin_ref_and_lease_ref(engine: Engine, origin_ref: str, lease_ref: str) -> "Lease":
 | 
			
		||||
        session = sessionmaker(bind=engine)()
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										11
									
								
								test/main.py
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								test/main.py
									
									
									
									
									
								
							@@ -205,7 +205,18 @@ def test_leasing_v1_lease_renew():
 | 
			
		||||
    assert response.json()['lease_ref'] == LEASE_REF
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_leasing_v1_lease_delete():
 | 
			
		||||
    bearer_token = jwt.encode({"origin_ref": ORIGIN_REF}, key=jwt_encode_key, algorithm=ALGORITHMS.RS256)
 | 
			
		||||
    bearer_token = f'Bearer {bearer_token}'
 | 
			
		||||
    response = client.delete(f'/leasing/v1/lease/{LEASE_REF}', headers={'authorization': bearer_token})
 | 
			
		||||
    assert response.status_code == 200
 | 
			
		||||
 | 
			
		||||
    assert response.json()['lease_ref'] == LEASE_REF
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_leasing_v1_lessor_lease_remove():
 | 
			
		||||
    test_leasing_v1_lessor()
 | 
			
		||||
    
 | 
			
		||||
    bearer_token = jwt.encode({"origin_ref": ORIGIN_REF}, key=jwt_encode_key, algorithm=ALGORITHMS.RS256)
 | 
			
		||||
    bearer_token = f'Bearer {bearer_token}'
 | 
			
		||||
    response = client.delete('/leasing/v1/lessor/leases', headers={'authorization': bearer_token})
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user