001 /*
002 * SPDX-License-Identifier: Apache-2.0
003 *
004 * Copyright 2020-2022 Agorapulse.
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * https://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package com.agorapulse.micronaut.bigquery.mock;
019
020 import com.agorapulse.micronaut.bigquery.BigQueryService;
021 import com.agorapulse.micronaut.bigquery.impl.DefaultBigQueryService;
022 import com.agorapulse.micronaut.bigquery.RowResult;
023 import com.axiomalaska.jdbc.NamedParameterPreparedStatement;
024 import io.micronaut.context.annotation.Replaces;
025 import io.reactivex.Flowable;
026
027 import javax.inject.Singleton;
028 import javax.sql.DataSource;
029 import java.sql.Connection;
030 import java.sql.PreparedStatement;
031 import java.sql.ResultSet;
032 import java.sql.SQLException;
033 import java.sql.Timestamp;
034 import java.time.Instant;
035 import java.util.Map;
036 import java.util.function.Function;
037
038 @Singleton
039 @Replaces(DefaultBigQueryService.class)
040 public class SqlBigQueryService implements BigQueryService {
041
042 private static class Database {
043 private final Connection connection;
044 private final PreparedStatement statement;
045 private final ResultSet resultSet;
046
047 public Database(Connection connection, PreparedStatement statement, ResultSet resultSet) {
048 this.connection = connection;
049 this.statement = statement;
050 this.resultSet = resultSet;
051 }
052 }
053
054 private final DataSource dataSource;
055
056 public SqlBigQueryService(DataSource dataSource) {
057 this.dataSource = dataSource;
058 }
059
060 @Override
061 public <T> Flowable<T> query(Map<String, ?> namedParameters, String sqlString, final Function<RowResult, T> builder) {
062
063 return Flowable.generate(
064 () -> {
065 Connection connection = dataSource.getConnection();
066 String sql = fixPlaceholders(sqlString, namedParameters);
067 NamedParameterPreparedStatement stmt = NamedParameterPreparedStatement.createNamedParameterPreparedStatement(connection, sql);
068 fillNamedParameters(namedParameters, stmt);
069 return new Database(connection, stmt, stmt.executeQuery());
070 },
071 (database, emitter) -> {
072 try {
073 SqlRowResult rowResult = new SqlRowResult(database.resultSet);
074 if (database.resultSet.next()) {
075 emitter.onNext(builder.apply(rowResult));
076 } else {
077 emitter.onComplete();
078 }
079 } catch (Exception e) {
080 emitter.onError(e);
081 }
082 return database;
083 },
084 database -> {
085 database.resultSet.close();
086 database.statement.close();
087 database.connection.close();
088 }
089 );
090 }
091
092 @Override
093 public void execute(Map<String, ?> namedParameters, String sqlString) {
094 String sql = fixPlaceholders(sqlString, namedParameters);
095 try (
096 Connection connection = dataSource.getConnection();
097 NamedParameterPreparedStatement stmt = NamedParameterPreparedStatement.createNamedParameterPreparedStatement(connection, sql)
098 ) {
099 fillNamedParameters(namedParameters, stmt);
100 stmt.execute();
101 } catch (SQLException e) {
102 throw new IllegalArgumentException("Cannot execute " + sqlString, e);
103 }
104 }
105
106 private void fillNamedParameters(Map<String, ?> namedParameters, NamedParameterPreparedStatement stmt) {
107 namedParameters.forEach((parameter, x) -> {
108 try {
109 stmt.setObject(parameter, convertIfNecessary(x));
110 } catch (SQLException throwables) {
111 throw new IllegalStateException("Cannot set named parameter " + parameter + " with value " + x, throwables);
112 }
113 });
114 }
115
116 @Override
117 public Object convertIfNecessary(Object object) {
118 if (object instanceof Instant) {
119 return new Timestamp(((Instant) object).toEpochMilli());
120 }
121 return BigQueryService.super.convertIfNecessary(object);
122 }
123
124 private static String fixPlaceholders(String sqlString, Map<String, ?> namedParameters) {
125 String result = sqlString;
126 for (String key : namedParameters.keySet()) {
127 result = result.replace("@" + key, ":" + key);
128 }
129 return result;
130 }
131 }
|